diff options
| author | Dan Engelbrecht <[email protected]> | 2025-06-16 13:17:54 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-06-16 13:17:54 +0200 |
| commit | d000167e12c6dde651ef86be9f67552291ff1b7d (patch) | |
| tree | 17fb42c4c7d61b3064c33d6aa6f8787bef329586 /src/zenstore/filecas.cpp | |
| parent | fix build store range check (#437) (diff) | |
| download | zen-d000167e12c6dde651ef86be9f67552291ff1b7d.tar.xz zen-d000167e12c6dde651ef86be9f67552291ff1b7d.zip | |
graceful wait in parallelwork destructor (#438)
* exception safety when issuing ParallelWork
* add asserts to Latch usage to catch usage errors
* extended error messaging and recovery handling in ParallelWork destructor to help find issues
Diffstat (limited to 'src/zenstore/filecas.cpp')
| -rw-r--r-- | src/zenstore/filecas.cpp | 86 |
1 files changed, 49 insertions, 37 deletions
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 11a266f1c..68644be2d 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -666,52 +666,64 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); - for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) + try { - if (!AsyncContinue) + for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) { - break; - } - size_t ChunkIndex = FoundChunkIndexes[Index]; - uint64_t ExpectedSize = FoundChunkExpectedSizes[Index]; - if (OptionalWorkerPool) - { - Work.ScheduleWork( - *OptionalWorkerPool, - [this, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &AsyncContinue](std::atomic<bool>& AbortFlag) { - if (AbortFlag) - { - AsyncContinue.store(false); - } - if (!AsyncContinue) - { - return; - } - try - { - if (!ProcessOne(ChunkIndex, ExpectedSize)) + if (AbortFlag) + { + AsyncContinue.store(false); + } + if (!AsyncContinue) + { + break; + } + size_t ChunkIndex = FoundChunkIndexes[Index]; + uint64_t ExpectedSize = FoundChunkExpectedSizes[Index]; + if (OptionalWorkerPool) + { + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &AsyncContinue](std::atomic<bool>& AbortFlag) { + if (AbortFlag) { AsyncContinue.store(false); } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'", - m_RootDirectory, - ChunkHashes[ChunkIndex], - Ex.what()); - AsyncContinue.store(false); - } - }); - } - else - { - if (!ProcessOne(ChunkIndex, ExpectedSize)) + if (!AsyncContinue) + { + return; + } + try + { + if (!ProcessOne(ChunkIndex, ExpectedSize)) + { + AsyncContinue.store(false); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'", + m_RootDirectory, + ChunkHashes[ChunkIndex], + Ex.what()); + AsyncContinue.store(false); + } + }); + } + else { - AsyncContinue.store(false); + if (!ProcessOne(ChunkIndex, ExpectedSize)) + { + AsyncContinue.store(false); + } } } } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed iterating chunks in {}. Reason: '{}'", this->m_RootDirectory, Ex.what()); + } Work.Wait(); } return AsyncContinue.load(); |