From 49701314f570da3622f11eb37cc889c7d39d9a93 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 19 May 2025 22:25:58 +0200 Subject: handle exception with batch work (#401) * use ParallelWork in rpc playback * use ParallelWork in projectstore * use ParallelWork in buildstore * use ParallelWork in cachedisklayer * use ParallelWork in compactcas * use ParallelWork in filecas * don't set abort flag in ParallelWork destructor * add PrepareFileForScatteredWrite for temp files in httpclient * Use PrepareFileForScatteredWrite when stream-decompressing files * be more relaxed when deleting temp files * allow explicit zen-cache when using direct host url without resolving * fix lambda capture when writing loose chunks * no delay when attempting to remove temp files --- src/zenstore/compactcas.cpp | 128 ++++++++++++++++++++++++-------------------- 1 file changed, 70 insertions(+), 58 deletions(-) (limited to 'src/zenstore/compactcas.cpp') diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 8cf241e34..15bea272b 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include @@ -393,64 +394,75 @@ CasContainerStrategy::IterateChunks(std::span ChunkHas LargeSizeLimit); }; - Latch WorkLatch(1); - std::atomic_bool AsyncContinue = true; - bool Continue = m_BlockStore.IterateChunks( - FoundChunkLocations, - [this, - &AsyncContinue, - &WorkLatch, - &AsyncCallback, - LargeSizeLimit, - DoOneBlock, - &FoundChunkIndexes, - &FoundChunkLocations, - OptionalWorkerPool](uint32_t BlockIndex, std::span ChunkIndexes) { - if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) - { - std::vector TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([this, - &AsyncContinue, - &WorkLatch, - &AsyncCallback, - LargeSizeLimit, - DoOneBlock, - BlockIndex, - &FoundChunkIndexes, - &FoundChunkLocations, - ChunkIndexes = std::move(TmpChunkIndexes)]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - if (!AsyncContinue) - { - return; - } - try - { - bool Continue = DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); - if (!Continue) - { - AsyncContinue.store(false); - } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", - m_RootDirectory, - BlockIndex, - Ex.what()); - } - }); - return AsyncContinue.load(); - } - else - { - return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); - } - }); - WorkLatch.CountDown(); - WorkLatch.Wait(); - return AsyncContinue.load() && Continue; + std::atomic AsyncContinue = true; + { + std::atomic AbortFlag; + ParallelWork Work(AbortFlag); + const bool Continue = m_BlockStore.IterateChunks( + FoundChunkLocations, + [this, + &Work, + &AsyncContinue, + &AsyncCallback, + LargeSizeLimit, + DoOneBlock, + &FoundChunkIndexes, + &FoundChunkLocations, + OptionalWorkerPool](uint32_t BlockIndex, std::span ChunkIndexes) { + if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) + { + std::vector TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); + Work.ScheduleWork( + *OptionalWorkerPool, + [this, + &AsyncContinue, + &AsyncCallback, + LargeSizeLimit, + DoOneBlock, + BlockIndex, + &FoundChunkIndexes, + &FoundChunkLocations, + ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic& AbortFlag) { + if (AbortFlag) + { + AsyncContinue.store(false); + } + if (!AsyncContinue) + { + return; + } + try + { + bool Continue = + DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); + if (!Continue) + { + AsyncContinue.store(false); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", + m_RootDirectory, + BlockIndex, + Ex.what()); + AsyncContinue.store(false); + } + }); + return AsyncContinue.load(); + } + else + { + return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); + } + }); + if (!Continue) + { + AsyncContinue.store(false); + } + Work.Wait(); + } + return AsyncContinue.load(); } void -- cgit v1.2.3