aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-24 11:58:09 +0200
committerGitHub Enterprise <[email protected]>2025-10-24 11:58:09 +0200
commitb8fc9040370d6faf119eea3d43f49d6470ea3498 (patch)
tree657b534e25747933e2023d58abf1868bfc32b656 /src/zenstore/compactcas.cpp
parentin-tree spdlog (#602) (diff)
downloadzen-b8fc9040370d6faf119eea3d43f49d6470ea3498.tar.xz
zen-b8fc9040370d6faf119eea3d43f49d6470ea3498.zip
refactor CasContainerStrategy::IterateOneBlock to make it more readable (#607)
Diffstat (limited to 'src/zenstore/compactcas.cpp')
-rw-r--r--src/zenstore/compactcas.cpp187
1 files changed, 96 insertions, 91 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index bf843171e..26af5b4b1 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -339,6 +339,60 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks)
}
bool
+CasContainerStrategy::IterateOneBlock(const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ uint64_t LargeSizeLimit,
+ std::span<const IoHash> ChunkHashes,
+ std::span<const size_t> FoundChunkIndexes,
+ std::span<const BlockStoreLocation> FoundChunkLocations,
+ std::span<const size_t> ChunkIndexes)
+{
+ if (ChunkIndexes.size() < 4)
+ {
+ for (size_t ChunkIndex : ChunkIndexes)
+ {
+ size_t OuterIndex = FoundChunkIndexes[ChunkIndex];
+ IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]);
+ if (!Chunk)
+ {
+ ZEN_WARN("Failed to fetch chunk {} from block {}, Offset {}, Size {}",
+ ChunkHashes[OuterIndex],
+ FoundChunkLocations[ChunkIndex].BlockIndex,
+ FoundChunkLocations[ChunkIndex].Offset,
+ FoundChunkLocations[ChunkIndex].Size);
+ }
+ if (!AsyncCallback(OuterIndex, Chunk))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ return m_BlockStore.IterateBlock(
+ FoundChunkLocations,
+ ChunkIndexes,
+ [this, ChunkHashes, &AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ size_t OuterIndex = FoundChunkIndexes[ChunkIndex];
+ if (Data == nullptr)
+ {
+ ZEN_WARN("Failed to fetch chunk {}, Size {}", ChunkHashes[OuterIndex], Size);
+ return AsyncCallback(OuterIndex, IoBuffer());
+ }
+ return AsyncCallback(OuterIndex, IoBuffer(IoBuffer::Wrap, Data, Size));
+ },
+ [this, ChunkHashes, &AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ size_t OuterIndex = FoundChunkIndexes[ChunkIndex];
+ IoBuffer Chunk = File.GetChunk(Offset, Size);
+ if (!Chunk)
+ {
+ ZEN_WARN("Failed to fetch chunk {} from '{}', Offset {}, Size {}", ChunkHashes[OuterIndex], File.GetPath(), Offset, Size);
+ }
+ return AsyncCallback(OuterIndex, Chunk);
+ },
+ LargeSizeLimit);
+}
+
+bool
CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
WorkerThreadPool* OptionalWorkerPool,
@@ -384,63 +438,6 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
return true;
}
- auto DoOneBlock = [this, &ChunkHashes](const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- uint64_t LargeSizeLimit,
- std::span<const size_t> FoundChunkIndexes,
- std::span<const BlockStoreLocation> FoundChunkLocations,
- std::span<const size_t> ChunkIndexes) {
- if (ChunkIndexes.size() < 4)
- {
- for (size_t ChunkIndex : ChunkIndexes)
- {
- size_t OuterIndex = FoundChunkIndexes[ChunkIndex];
- IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]);
- if (!Chunk)
- {
- ZEN_WARN("Failed to fetch chunk {} from block {}, Offset {}, Size {}",
- ChunkHashes[OuterIndex],
- FoundChunkLocations[ChunkIndex].BlockIndex,
- FoundChunkLocations[ChunkIndex].Offset,
- FoundChunkLocations[ChunkIndex].Size);
- }
- if (!AsyncCallback(OuterIndex, Chunk))
- {
- return false;
- }
- }
- return true;
- }
- return m_BlockStore.IterateBlock(
- FoundChunkLocations,
- ChunkIndexes,
- [this, &ChunkHashes, AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) {
- size_t OuterIndex = FoundChunkIndexes[ChunkIndex];
- if (Data == nullptr)
- {
- ZEN_WARN("Failed to fetch chunk {}, Size {}", ChunkHashes[OuterIndex], Size);
- return AsyncCallback(OuterIndex, IoBuffer());
- }
- return AsyncCallback(OuterIndex, IoBuffer(IoBuffer::Wrap, Data, Size));
- },
- [this, &ChunkHashes, AsyncCallback, FoundChunkIndexes](size_t ChunkIndex,
- BlockStoreFile& File,
- uint64_t Offset,
- uint64_t Size) {
- size_t OuterIndex = FoundChunkIndexes[ChunkIndex];
- IoBuffer Chunk = File.GetChunk(Offset, Size);
- if (!Chunk)
- {
- ZEN_WARN("Failed to fetch chunk {} from '{}', Offset {}, Size {}",
- ChunkHashes[OuterIndex],
- File.GetPath(),
- Offset,
- Size);
- }
- return AsyncCallback(OuterIndex, Chunk);
- },
- LargeSizeLimit);
- };
-
std::atomic<bool> AbortFlag;
{
std::atomic<bool> PauseFlag;
@@ -454,50 +451,58 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
&AbortFlag,
&AsyncCallback,
LargeSizeLimit,
- DoOneBlock,
+ ChunkHashes,
&FoundChunkIndexes,
&FoundChunkLocations,
OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
{
std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
- Work.ScheduleWork(
- *OptionalWorkerPool,
- [this,
- &AsyncCallback,
- LargeSizeLimit,
- DoOneBlock,
- BlockIndex,
- &FoundChunkIndexes,
- &FoundChunkLocations,
- ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- return;
- }
- try
- {
- bool Continue =
- DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
- if (!Continue)
- {
- AbortFlag.store(true);
- }
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
- m_RootDirectory,
- BlockIndex,
- Ex.what());
- AbortFlag.store(true);
- }
- });
+ Work.ScheduleWork(*OptionalWorkerPool,
+ [this,
+ &AsyncCallback,
+ LargeSizeLimit,
+ BlockIndex,
+ ChunkHashes,
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+ try
+ {
+ bool Continue = IterateOneBlock(AsyncCallback,
+ LargeSizeLimit,
+ ChunkHashes,
+ FoundChunkIndexes,
+ FoundChunkLocations,
+ ChunkIndexes);
+ if (!Continue)
+ {
+ AbortFlag.store(true);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
+ m_RootDirectory,
+ BlockIndex,
+ Ex.what());
+ AbortFlag.store(true);
+ }
+ });
return !AbortFlag.load();
}
else
{
- if (!DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes))
+ if (!IterateOneBlock(AsyncCallback,
+ LargeSizeLimit,
+ ChunkHashes,
+ FoundChunkIndexes,
+ FoundChunkLocations,
+ ChunkIndexes))
{
AbortFlag.store(true);
}