diff options
| author | Dan Engelbrecht <[email protected]> | 2025-06-16 13:17:54 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-06-16 13:17:54 +0200 |
| commit | d000167e12c6dde651ef86be9f67552291ff1b7d (patch) | |
| tree | 17fb42c4c7d61b3064c33d6aa6f8787bef329586 /src/zenstore/compactcas.cpp | |
| parent | fix build store range check (#437) (diff) | |
| download | zen-d000167e12c6dde651ef86be9f67552291ff1b7d.tar.xz zen-d000167e12c6dde651ef86be9f67552291ff1b7d.zip | |
graceful wait in parallelwork destructor (#438)
* exception safety when issuing ParallelWork
* add asserts to Latch usage to catch usage errors
* extended error messaging and recovery handling in ParallelWork destructor to help find issues
Diffstat (limited to 'src/zenstore/compactcas.cpp')
| -rw-r--r-- | src/zenstore/compactcas.cpp | 137 |
1 files changed, 72 insertions, 65 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 2ab5752ff..b00abb2cb 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -396,89 +396,96 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas return m_BlockStore.IterateBlock( FoundChunkLocations, ChunkIndexes, - [AsyncCallback, FoundChunkIndexes, LargeSizeLimit](size_t ChunkIndex, const void* Data, uint64_t Size) { + [AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) { if (Data == nullptr) { return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer()); } return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size)); }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + [AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size)); }, LargeSizeLimit); }; - std::atomic<bool> AsyncContinue = true; + std::atomic<bool> AbortFlag; { - std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); - const bool Continue = m_BlockStore.IterateChunks( - FoundChunkLocations, - [this, - &Work, - &AsyncContinue, - &AsyncCallback, - LargeSizeLimit, - DoOneBlock, - &FoundChunkIndexes, - &FoundChunkLocations, - OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { - if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) - { - std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); - Work.ScheduleWork( - *OptionalWorkerPool, - [this, - &AsyncContinue, - &AsyncCallback, - LargeSizeLimit, - DoOneBlock, - BlockIndex, - &FoundChunkIndexes, - &FoundChunkLocations, - ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) { - if (AbortFlag) - { - AsyncContinue.store(false); - } - if (!AsyncContinue) - { - return; - } - try - { - bool Continue = - DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); - if (!Continue) - { - AsyncContinue.store(false); - } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", - m_RootDirectory, - BlockIndex, - Ex.what()); - AsyncContinue.store(false); - } - }); - return AsyncContinue.load(); - } - else - { - return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); - } - }); - if (!Continue) + try { - AsyncContinue.store(false); + const bool Continue = m_BlockStore.IterateChunks( + FoundChunkLocations, + [this, + &Work, + &AbortFlag, + &AsyncCallback, + LargeSizeLimit, + DoOneBlock, + &FoundChunkIndexes, + &FoundChunkLocations, + OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { + if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) + { + std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); + Work.ScheduleWork( + *OptionalWorkerPool, + [this, + &AsyncCallback, + LargeSizeLimit, + DoOneBlock, + BlockIndex, + &FoundChunkIndexes, + &FoundChunkLocations, + ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } + try + { + bool Continue = + DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); + if (!Continue) + { + AbortFlag.store(true); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", + m_RootDirectory, + BlockIndex, + Ex.what()); + AbortFlag.store(true); + } + }); + return !AbortFlag.load(); + } + else + { + if (!DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes)) + { + AbortFlag.store(true); + } + return !AbortFlag.load(); + } + }); + if (!Continue) + { + AbortFlag.store(true); + } } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed iterating chunks for cas root path {}. Reason: '{}'", m_RootDirectory, Ex.what()); + } + Work.Wait(); } - return AsyncContinue.load(); + return !AbortFlag.load(); } void |