diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-24 11:58:09 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-24 11:58:09 +0200 |
| commit | b8fc9040370d6faf119eea3d43f49d6470ea3498 (patch) | |
| tree | 657b534e25747933e2023d58abf1868bfc32b656 /src/zenstore/compactcas.cpp | |
| parent | in-tree spdlog (#602) (diff) | |
| download | zen-b8fc9040370d6faf119eea3d43f49d6470ea3498.tar.xz zen-b8fc9040370d6faf119eea3d43f49d6470ea3498.zip | |
refactor CasContainerStrategy::IterateOneBlock to make it more readable (#607)
Diffstat (limited to 'src/zenstore/compactcas.cpp')
| -rw-r--r-- | src/zenstore/compactcas.cpp | 187 |
1 files changed, 96 insertions, 91 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index bf843171e..26af5b4b1 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -339,6 +339,60 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) } bool +CasContainerStrategy::IterateOneBlock(const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + uint64_t LargeSizeLimit, + std::span<const IoHash> ChunkHashes, + std::span<const size_t> FoundChunkIndexes, + std::span<const BlockStoreLocation> FoundChunkLocations, + std::span<const size_t> ChunkIndexes) +{ + if (ChunkIndexes.size() < 4) + { + for (size_t ChunkIndex : ChunkIndexes) + { + size_t OuterIndex = FoundChunkIndexes[ChunkIndex]; + IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]); + if (!Chunk) + { + ZEN_WARN("Failed to fetch chunk {} from block {}, Offset {}, Size {}", + ChunkHashes[OuterIndex], + FoundChunkLocations[ChunkIndex].BlockIndex, + FoundChunkLocations[ChunkIndex].Offset, + FoundChunkLocations[ChunkIndex].Size); + } + if (!AsyncCallback(OuterIndex, Chunk)) + { + return false; + } + } + return true; + } + + return m_BlockStore.IterateBlock( + FoundChunkLocations, + ChunkIndexes, + [this, ChunkHashes, &AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) { + size_t OuterIndex = FoundChunkIndexes[ChunkIndex]; + if (Data == nullptr) + { + ZEN_WARN("Failed to fetch chunk {}, Size {}", ChunkHashes[OuterIndex], Size); + return AsyncCallback(OuterIndex, IoBuffer()); + } + return AsyncCallback(OuterIndex, IoBuffer(IoBuffer::Wrap, Data, Size)); + }, + [this, ChunkHashes, &AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + size_t OuterIndex = FoundChunkIndexes[ChunkIndex]; + IoBuffer Chunk = File.GetChunk(Offset, Size); + if (!Chunk) + { + ZEN_WARN("Failed to fetch chunk {} from '{}', Offset {}, Size {}", ChunkHashes[OuterIndex], File.GetPath(), Offset, Size); + } + return AsyncCallback(OuterIndex, Chunk); + }, + LargeSizeLimit); +} + +bool CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, @@ -384,63 +438,6 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas return true; } - auto DoOneBlock = [this, &ChunkHashes](const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - uint64_t LargeSizeLimit, - std::span<const size_t> FoundChunkIndexes, - std::span<const BlockStoreLocation> FoundChunkLocations, - std::span<const size_t> ChunkIndexes) { - if (ChunkIndexes.size() < 4) - { - for (size_t ChunkIndex : ChunkIndexes) - { - size_t OuterIndex = FoundChunkIndexes[ChunkIndex]; - IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]); - if (!Chunk) - { - ZEN_WARN("Failed to fetch chunk {} from block {}, Offset {}, Size {}", - ChunkHashes[OuterIndex], - FoundChunkLocations[ChunkIndex].BlockIndex, - FoundChunkLocations[ChunkIndex].Offset, - FoundChunkLocations[ChunkIndex].Size); - } - if (!AsyncCallback(OuterIndex, Chunk)) - { - return false; - } - } - return true; - } - return m_BlockStore.IterateBlock( - FoundChunkLocations, - ChunkIndexes, - [this, &ChunkHashes, AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) { - size_t OuterIndex = FoundChunkIndexes[ChunkIndex]; - if (Data == nullptr) - { - ZEN_WARN("Failed to fetch chunk {}, Size {}", ChunkHashes[OuterIndex], Size); - return AsyncCallback(OuterIndex, IoBuffer()); - } - return AsyncCallback(OuterIndex, IoBuffer(IoBuffer::Wrap, Data, Size)); - }, - [this, &ChunkHashes, AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, - BlockStoreFile& File, - uint64_t Offset, - uint64_t Size) { - size_t OuterIndex = FoundChunkIndexes[ChunkIndex]; - IoBuffer Chunk = File.GetChunk(Offset, Size); - if (!Chunk) - { - ZEN_WARN("Failed to fetch chunk {} from '{}', Offset {}, Size {}", - ChunkHashes[OuterIndex], - File.GetPath(), - Offset, - Size); - } - return AsyncCallback(OuterIndex, Chunk); - }, - LargeSizeLimit); - }; - std::atomic<bool> AbortFlag; { std::atomic<bool> PauseFlag; @@ -454,50 +451,58 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas &AbortFlag, &AsyncCallback, LargeSizeLimit, - DoOneBlock, + ChunkHashes, &FoundChunkIndexes, &FoundChunkLocations, OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) { std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); - Work.ScheduleWork( - *OptionalWorkerPool, - [this, - &AsyncCallback, - LargeSizeLimit, - DoOneBlock, - BlockIndex, - &FoundChunkIndexes, - &FoundChunkLocations, - ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) { - if (AbortFlag) - { - return; - } - try - { - bool Continue = - DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); - if (!Continue) - { - AbortFlag.store(true); - } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", - m_RootDirectory, - BlockIndex, - Ex.what()); - AbortFlag.store(true); - } - }); + Work.ScheduleWork(*OptionalWorkerPool, + [this, + &AsyncCallback, + LargeSizeLimit, + BlockIndex, + ChunkHashes, + &FoundChunkIndexes, + &FoundChunkLocations, + ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } + try + { + bool Continue = IterateOneBlock(AsyncCallback, + LargeSizeLimit, + ChunkHashes, + FoundChunkIndexes, + FoundChunkLocations, + ChunkIndexes); + if (!Continue) + { + AbortFlag.store(true); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", + m_RootDirectory, + BlockIndex, + Ex.what()); + AbortFlag.store(true); + } + }); return !AbortFlag.load(); } else { - if (!DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes)) + if (!IterateOneBlock(AsyncCallback, + LargeSizeLimit, + ChunkHashes, + FoundChunkIndexes, + FoundChunkLocations, + ChunkIndexes)) { AbortFlag.store(true); } |