aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-05-19 22:25:58 +0200
committerGitHub Enterprise <[email protected]>2025-05-19 22:25:58 +0200
commit49701314f570da3622f11eb37cc889c7d39d9a93 (patch)
tree6159bfc2ba7974a453ded7a58813134e523e9a62 /src/zenstore/compactcas.cpp
parentparallel work handle dispatch exception (#400) (diff)
downloadzen-49701314f570da3622f11eb37cc889c7d39d9a93.tar.xz
zen-49701314f570da3622f11eb37cc889c7d39d9a93.zip
handle exception with batch work (#401)
* use ParallelWork in rpc playback * use ParallelWork in projectstore * use ParallelWork in buildstore * use ParallelWork in cachedisklayer * use ParallelWork in compactcas * use ParallelWork in filecas * don't set abort flag in ParallelWork destructor * add PrepareFileForScatteredWrite for temp files in httpclient * Use PrepareFileForScatteredWrite when stream-decompressing files * be more relaxed when deleting temp files * allow explicit zen-cache when using direct host url without resolving * fix lambda capture when writing loose chunks * no delay when attempting to remove temp files
Diffstat (limited to 'src/zenstore/compactcas.cpp')
-rw-r--r--src/zenstore/compactcas.cpp128
1 files changed, 70 insertions, 58 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 8cf241e34..15bea272b 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -15,6 +15,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenstore/scrubcontext.h>
+#include <zenutil/parallelwork.h>
#include <gsl/gsl-lite.hpp>
@@ -393,64 +394,75 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
LargeSizeLimit);
};
- Latch WorkLatch(1);
- std::atomic_bool AsyncContinue = true;
- bool Continue = m_BlockStore.IterateChunks(
- FoundChunkLocations,
- [this,
- &AsyncContinue,
- &WorkLatch,
- &AsyncCallback,
- LargeSizeLimit,
- DoOneBlock,
- &FoundChunkIndexes,
- &FoundChunkLocations,
- OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
- if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
- {
- std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([this,
- &AsyncContinue,
- &WorkLatch,
- &AsyncCallback,
- LargeSizeLimit,
- DoOneBlock,
- BlockIndex,
- &FoundChunkIndexes,
- &FoundChunkLocations,
- ChunkIndexes = std::move(TmpChunkIndexes)]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- if (!AsyncContinue)
- {
- return;
- }
- try
- {
- bool Continue = DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
- if (!Continue)
- {
- AsyncContinue.store(false);
- }
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
- m_RootDirectory,
- BlockIndex,
- Ex.what());
- }
- });
- return AsyncContinue.load();
- }
- else
- {
- return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
- }
- });
- WorkLatch.CountDown();
- WorkLatch.Wait();
- return AsyncContinue.load() && Continue;
+ std::atomic<bool> AsyncContinue = true;
+ {
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ const bool Continue = m_BlockStore.IterateChunks(
+ FoundChunkLocations,
+ [this,
+ &Work,
+ &AsyncContinue,
+ &AsyncCallback,
+ LargeSizeLimit,
+ DoOneBlock,
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
+ if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
+ {
+ std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this,
+ &AsyncContinue,
+ &AsyncCallback,
+ LargeSizeLimit,
+ DoOneBlock,
+ BlockIndex,
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ AsyncContinue.store(false);
+ }
+ if (!AsyncContinue)
+ {
+ return;
+ }
+ try
+ {
+ bool Continue =
+ DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
+ if (!Continue)
+ {
+ AsyncContinue.store(false);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
+ m_RootDirectory,
+ BlockIndex,
+ Ex.what());
+ AsyncContinue.store(false);
+ }
+ });
+ return AsyncContinue.load();
+ }
+ else
+ {
+ return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
+ }
+ });
+ if (!Continue)
+ {
+ AsyncContinue.store(false);
+ }
+ Work.Wait();
+ }
+ return AsyncContinue.load();
}
void