aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-06-16 13:17:54 +0200
committerGitHub Enterprise <[email protected]>2025-06-16 13:17:54 +0200
commitd000167e12c6dde651ef86be9f67552291ff1b7d (patch)
tree17fb42c4c7d61b3064c33d6aa6f8787bef329586 /src/zenstore/compactcas.cpp
parentfix build store range check (#437) (diff)
downloadzen-d000167e12c6dde651ef86be9f67552291ff1b7d.tar.xz
zen-d000167e12c6dde651ef86be9f67552291ff1b7d.zip
graceful wait in parallelwork destructor (#438)
* exception safety when issuing ParallelWork * add asserts to Latch usage to catch usage errors * extended error messaging and recovery handling in ParallelWork destructor to help find issues
Diffstat (limited to 'src/zenstore/compactcas.cpp')
-rw-r--r--src/zenstore/compactcas.cpp137
1 files changed, 72 insertions, 65 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 2ab5752ff..b00abb2cb 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -396,89 +396,96 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
return m_BlockStore.IterateBlock(
FoundChunkLocations,
ChunkIndexes,
- [AsyncCallback, FoundChunkIndexes, LargeSizeLimit](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ [AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) {
if (Data == nullptr)
{
return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer());
}
return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size));
},
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ [AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size));
},
LargeSizeLimit);
};
- std::atomic<bool> AsyncContinue = true;
+ std::atomic<bool> AbortFlag;
{
- std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
ParallelWork Work(AbortFlag, PauseFlag);
- const bool Continue = m_BlockStore.IterateChunks(
- FoundChunkLocations,
- [this,
- &Work,
- &AsyncContinue,
- &AsyncCallback,
- LargeSizeLimit,
- DoOneBlock,
- &FoundChunkIndexes,
- &FoundChunkLocations,
- OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
- if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
- {
- std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
- Work.ScheduleWork(
- *OptionalWorkerPool,
- [this,
- &AsyncContinue,
- &AsyncCallback,
- LargeSizeLimit,
- DoOneBlock,
- BlockIndex,
- &FoundChunkIndexes,
- &FoundChunkLocations,
- ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- AsyncContinue.store(false);
- }
- if (!AsyncContinue)
- {
- return;
- }
- try
- {
- bool Continue =
- DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
- if (!Continue)
- {
- AsyncContinue.store(false);
- }
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
- m_RootDirectory,
- BlockIndex,
- Ex.what());
- AsyncContinue.store(false);
- }
- });
- return AsyncContinue.load();
- }
- else
- {
- return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
- }
- });
- if (!Continue)
+ try
{
- AsyncContinue.store(false);
+ const bool Continue = m_BlockStore.IterateChunks(
+ FoundChunkLocations,
+ [this,
+ &Work,
+ &AbortFlag,
+ &AsyncCallback,
+ LargeSizeLimit,
+ DoOneBlock,
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
+ if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
+ {
+ std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this,
+ &AsyncCallback,
+ LargeSizeLimit,
+ DoOneBlock,
+ BlockIndex,
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+ try
+ {
+ bool Continue =
+ DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
+ if (!Continue)
+ {
+ AbortFlag.store(true);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
+ m_RootDirectory,
+ BlockIndex,
+ Ex.what());
+ AbortFlag.store(true);
+ }
+ });
+ return !AbortFlag.load();
+ }
+ else
+ {
+ if (!DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes))
+ {
+ AbortFlag.store(true);
+ }
+ return !AbortFlag.load();
+ }
+ });
+ if (!Continue)
+ {
+ AbortFlag.store(true);
+ }
}
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed iterating chunks for cas root path {}. Reason: '{}'", m_RootDirectory, Ex.what());
+ }
+
Work.Wait();
}
- return AsyncContinue.load();
+ return !AbortFlag.load();
}
void