diff options
| author | Dan Engelbrecht <[email protected]> | 2025-05-19 22:25:58 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-05-19 22:25:58 +0200 |
| commit | 49701314f570da3622f11eb37cc889c7d39d9a93 (patch) | |
| tree | 6159bfc2ba7974a453ded7a58813134e523e9a62 /src/zenstore/filecas.cpp | |
| parent | parallel work handle dispatch exception (#400) (diff) | |
| download | zen-49701314f570da3622f11eb37cc889c7d39d9a93.tar.xz zen-49701314f570da3622f11eb37cc889c7d39d9a93.zip | |
handle exception with batch work (#401)
* use ParallelWork in rpc playback
* use ParallelWork in projectstore
* use ParallelWork in buildstore
* use ParallelWork in cachedisklayer
* use ParallelWork in compactcas
* use ParallelWork in filecas
* don't set abort flag in ParallelWork destructor
* add PrepareFileForScatteredWrite for temp files in httpclient
* Use PrepareFileForScatteredWrite when stream-decompressing files
* be more relaxed when deleting temp files
* allow explicit zen-cache when using direct host url without resolving
* fix lambda capture when writing loose chunks
* no delay when attempting to remove temp files
Diffstat (limited to 'src/zenstore/filecas.cpp')
| -rw-r--r-- | src/zenstore/filecas.cpp | 70 |
1 files changed, 40 insertions, 30 deletions
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 4911bffb9..6354edf70 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -20,6 +20,7 @@ #include <zencore/workthreadpool.h> #include <zenstore/gc.h> #include <zenstore/scrubcontext.h> +#include <zenutil/parallelwork.h> #if ZEN_WITH_TESTS # include <zencore/compactbinarybuilder.h> @@ -632,10 +633,11 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, } } } - std::atomic_bool Continue = true; + std::atomic<bool> AsyncContinue = true; if (!FoundChunkIndexes.empty()) { - auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) { + auto ProcessOne = [this, &ChunkHashes, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) { + ZEN_ASSERT(ChunkIndex < ChunkHashes.size()); const IoHash& ChunkHash = ChunkHashes[ChunkIndex]; IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize); if (!AsyncCallback(ChunkIndex, std::move(Payload))) @@ -645,49 +647,57 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, return true; }; - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) { - size_t ChunkIndex = FoundChunkIndexes[Index]; - uint64_t ExpectedSize = FoundChunkExpectedSizes[Index]; - if (!Continue) + if (!AsyncContinue) { break; } + size_t ChunkIndex = FoundChunkIndexes[Index]; + uint64_t ExpectedSize = FoundChunkExpectedSizes[Index]; if (OptionalWorkerPool) { - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &Continue]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - if (!Continue) - { - return; - } - try - { - if (!ProcessOne(ChunkIndex, ExpectedSize)) + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &AsyncContinue](std::atomic<bool>& AbortFlag) { + if (AbortFlag) { - Continue = false; + AsyncContinue.store(false); } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'", - m_RootDirectory, - ChunkHashes[ChunkIndex], - Ex.what()); - } - }); + if (!AsyncContinue) + { + return; + } + try + { + if (!ProcessOne(ChunkIndex, ExpectedSize)) + { + AsyncContinue.store(false); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'", + m_RootDirectory, + ChunkHashes[ChunkIndex], + Ex.what()); + AsyncContinue.store(false); + } + }); } else { - Continue = Continue && ProcessOne(ChunkIndex, ExpectedSize); + if (!ProcessOne(ChunkIndex, ExpectedSize)) + { + AsyncContinue.store(false); + } } } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } - return Continue; + return AsyncContinue.load(); } void |