aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/filecas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-06-16 13:17:54 +0200
committerGitHub Enterprise <[email protected]>2025-06-16 13:17:54 +0200
commitd000167e12c6dde651ef86be9f67552291ff1b7d (patch)
tree17fb42c4c7d61b3064c33d6aa6f8787bef329586 /src/zenstore/filecas.cpp
parentfix build store range check (#437) (diff)
downloadzen-d000167e12c6dde651ef86be9f67552291ff1b7d.tar.xz
zen-d000167e12c6dde651ef86be9f67552291ff1b7d.zip
graceful wait in parallelwork destructor (#438)
* exception safety when issuing ParallelWork * add asserts to Latch usage to catch usage errors * extended error messaging and recovery handling in ParallelWork destructor to help find issues
Diffstat (limited to 'src/zenstore/filecas.cpp')
-rw-r--r--src/zenstore/filecas.cpp86
1 files changed, 49 insertions, 37 deletions
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 11a266f1c..68644be2d 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -666,52 +666,64 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
ParallelWork Work(AbortFlag, PauseFlag);
- for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++)
+ try
{
- if (!AsyncContinue)
+ for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++)
{
- break;
- }
- size_t ChunkIndex = FoundChunkIndexes[Index];
- uint64_t ExpectedSize = FoundChunkExpectedSizes[Index];
- if (OptionalWorkerPool)
- {
- Work.ScheduleWork(
- *OptionalWorkerPool,
- [this, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &AsyncContinue](std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- AsyncContinue.store(false);
- }
- if (!AsyncContinue)
- {
- return;
- }
- try
- {
- if (!ProcessOne(ChunkIndex, ExpectedSize))
+ if (AbortFlag)
+ {
+ AsyncContinue.store(false);
+ }
+ if (!AsyncContinue)
+ {
+ break;
+ }
+ size_t ChunkIndex = FoundChunkIndexes[Index];
+ uint64_t ExpectedSize = FoundChunkExpectedSizes[Index];
+ if (OptionalWorkerPool)
+ {
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &AsyncContinue](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
{
AsyncContinue.store(false);
}
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'",
- m_RootDirectory,
- ChunkHashes[ChunkIndex],
- Ex.what());
- AsyncContinue.store(false);
- }
- });
- }
- else
- {
- if (!ProcessOne(ChunkIndex, ExpectedSize))
+ if (!AsyncContinue)
+ {
+ return;
+ }
+ try
+ {
+ if (!ProcessOne(ChunkIndex, ExpectedSize))
+ {
+ AsyncContinue.store(false);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'",
+ m_RootDirectory,
+ ChunkHashes[ChunkIndex],
+ Ex.what());
+ AsyncContinue.store(false);
+ }
+ });
+ }
+ else
{
- AsyncContinue.store(false);
+ if (!ProcessOne(ChunkIndex, ExpectedSize))
+ {
+ AsyncContinue.store(false);
+ }
}
}
}
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed iterating chunks in {}. Reason: '{}'", this->m_RootDirectory, Ex.what());
+ }
Work.Wait();
}
return AsyncContinue.load();