diff options
| author | Dan Engelbrecht <[email protected]> | 2025-05-19 22:25:58 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-05-19 22:25:58 +0200 |
| commit | 49701314f570da3622f11eb37cc889c7d39d9a93 (patch) | |
| tree | 6159bfc2ba7974a453ded7a58813134e523e9a62 /src/zenstore/compactcas.cpp | |
| parent | parallel work handle dispatch exception (#400) (diff) | |
| download | zen-49701314f570da3622f11eb37cc889c7d39d9a93.tar.xz zen-49701314f570da3622f11eb37cc889c7d39d9a93.zip | |
handle exception with batch work (#401)
* use ParallelWork in rpc playback
* use ParallelWork in projectstore
* use ParallelWork in buildstore
* use ParallelWork in cachedisklayer
* use ParallelWork in compactcas
* use ParallelWork in filecas
* don't set abort flag in ParallelWork destructor
* add PrepareFileForScatteredWrite for temp files in httpclient
* Use PrepareFileForScatteredWrite when stream-decompressing files
* be more relaxed when deleting temp files
* allow explicit zen-cache when using direct host url without resolving
* fix lambda capture when writing loose chunks
* no delay when attempting to remove temp files
Diffstat (limited to 'src/zenstore/compactcas.cpp')
| -rw-r--r-- | src/zenstore/compactcas.cpp | 128 |
1 files changed, 70 insertions, 58 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 8cf241e34..15bea272b 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -15,6 +15,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenstore/scrubcontext.h> +#include <zenutil/parallelwork.h> #include <gsl/gsl-lite.hpp> @@ -393,64 +394,75 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas LargeSizeLimit); }; - Latch WorkLatch(1); - std::atomic_bool AsyncContinue = true; - bool Continue = m_BlockStore.IterateChunks( - FoundChunkLocations, - [this, - &AsyncContinue, - &WorkLatch, - &AsyncCallback, - LargeSizeLimit, - DoOneBlock, - &FoundChunkIndexes, - &FoundChunkLocations, - OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { - if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) - { - std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([this, - &AsyncContinue, - &WorkLatch, - &AsyncCallback, - LargeSizeLimit, - DoOneBlock, - BlockIndex, - &FoundChunkIndexes, - &FoundChunkLocations, - ChunkIndexes = std::move(TmpChunkIndexes)]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - if (!AsyncContinue) - { - return; - } - try - { - bool Continue = DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); - if (!Continue) - { - AsyncContinue.store(false); - } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", - m_RootDirectory, - BlockIndex, - Ex.what()); - } - }); - return AsyncContinue.load(); - } - else - { - return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); - } - }); - WorkLatch.CountDown(); - WorkLatch.Wait(); - return AsyncContinue.load() && Continue; + std::atomic<bool> AsyncContinue = true; + { + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); + const bool Continue = m_BlockStore.IterateChunks( + FoundChunkLocations, + [this, + &Work, + &AsyncContinue, + &AsyncCallback, + LargeSizeLimit, + DoOneBlock, + &FoundChunkIndexes, + &FoundChunkLocations, + OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { + if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) + { + std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); + Work.ScheduleWork( + *OptionalWorkerPool, + [this, + &AsyncContinue, + &AsyncCallback, + LargeSizeLimit, + DoOneBlock, + BlockIndex, + &FoundChunkIndexes, + &FoundChunkLocations, + ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + AsyncContinue.store(false); + } + if (!AsyncContinue) + { + return; + } + try + { + bool Continue = + DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); + if (!Continue) + { + AsyncContinue.store(false); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", + m_RootDirectory, + BlockIndex, + Ex.what()); + AsyncContinue.store(false); + } + }); + return AsyncContinue.load(); + } + else + { + return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); + } + }); + if (!Continue) + { + AsyncContinue.store(false); + } + Work.Wait(); + } + return AsyncContinue.load(); } void |