aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-11-25 13:36:53 +0100
committerGitHub Enterprise <[email protected]>2024-11-25 13:36:53 +0100
commite75c5860277681be7b4a18d8d630f76c051719b4 (patch)
tree52533fe3c0e60cfe89e08ff5a4be00b215933670 /src/zenstore/compactcas.cpp
parentadd missing projectstore expire time in gc log (#227) (diff)
downloadzen-e75c5860277681be7b4a18d8d630f76c051719b4.tar.xz
zen-e75c5860277681be7b4a18d8d630f76c051719b4.zip
stronger validation of payload existance (#229)
- Don't add RawSize and Size in ProjectStore::GetProjectFiles response if we can't get the payload - Use validation of payload size/existance in all chunk fetch operations in file cas - In project store oplog validate, make sure we can reach all the payloads - Add threading to oplog validate request
Diffstat (limited to 'src/zenstore/compactcas.cpp')
-rw-r--r--src/zenstore/compactcas.cpp28
1 files changed, 26 insertions, 2 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index b3309f7a7..bc30301d1 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -307,6 +307,18 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
WorkerThreadPool* OptionalWorkerPool)
{
+ if (ChunkHashes.size() < 3)
+ {
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++)
+ {
+ IoBuffer Chunk = FindChunk(ChunkHashes[ChunkIndex]);
+ if (!AsyncCallback(ChunkIndex, Chunk))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
std::vector<size_t> FoundChunkIndexes;
std::vector<BlockStoreLocation> FoundChunkLocations;
RwLock::SharedLockScope _(m_LocationMapLock);
@@ -326,7 +338,7 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
[&](size_t ChunkIndex, const void* Data, uint64_t Size) {
if (Data == nullptr)
{
- return true;
+ return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer());
}
return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size));
},
@@ -338,7 +350,19 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
Latch WorkLatch(1);
std::atomic_bool AsyncContinue = true;
bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
- if (OptionalWorkerPool)
+ if (ChunkIndexes.size() < 3)
+ {
+ for (size_t ChunkIndex : ChunkIndexes)
+ {
+ IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]);
+ if (!AsyncCallback(FoundChunkIndexes[ChunkIndex], Chunk))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+ else if (OptionalWorkerPool)
{
WorkLatch.AddCount(1);
OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {