aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/filecas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-05-19 22:25:58 +0200
committerGitHub Enterprise <[email protected]>2025-05-19 22:25:58 +0200
commit49701314f570da3622f11eb37cc889c7d39d9a93 (patch)
tree6159bfc2ba7974a453ded7a58813134e523e9a62 /src/zenstore/filecas.cpp
parentparallel work handle dispatch exception (#400) (diff)
downloadzen-49701314f570da3622f11eb37cc889c7d39d9a93.tar.xz
zen-49701314f570da3622f11eb37cc889c7d39d9a93.zip
handle exception with batch work (#401)
* use ParallelWork in rpc playback * use ParallelWork in projectstore * use ParallelWork in buildstore * use ParallelWork in cachedisklayer * use ParallelWork in compactcas * use ParallelWork in filecas * don't set abort flag in ParallelWork destructor * add PrepareFileForScatteredWrite for temp files in httpclient * Use PrepareFileForScatteredWrite when stream-decompressing files * be more relaxed when deleting temp files * allow explicit zen-cache when using direct host url without resolving * fix lambda capture when writing loose chunks * no delay when attempting to remove temp files
Diffstat (limited to 'src/zenstore/filecas.cpp')
-rw-r--r--src/zenstore/filecas.cpp70
1 files changed, 40 insertions, 30 deletions
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 4911bffb9..6354edf70 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -20,6 +20,7 @@
#include <zencore/workthreadpool.h>
#include <zenstore/gc.h>
#include <zenstore/scrubcontext.h>
+#include <zenutil/parallelwork.h>
#if ZEN_WITH_TESTS
# include <zencore/compactbinarybuilder.h>
@@ -632,10 +633,11 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
}
}
}
- std::atomic_bool Continue = true;
+ std::atomic<bool> AsyncContinue = true;
if (!FoundChunkIndexes.empty())
{
- auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) {
+ auto ProcessOne = [this, &ChunkHashes, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) {
+ ZEN_ASSERT(ChunkIndex < ChunkHashes.size());
const IoHash& ChunkHash = ChunkHashes[ChunkIndex];
IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize);
if (!AsyncCallback(ChunkIndex, std::move(Payload)))
@@ -645,49 +647,57 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
return true;
};
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++)
{
- size_t ChunkIndex = FoundChunkIndexes[Index];
- uint64_t ExpectedSize = FoundChunkExpectedSizes[Index];
- if (!Continue)
+ if (!AsyncContinue)
{
break;
}
+ size_t ChunkIndex = FoundChunkIndexes[Index];
+ uint64_t ExpectedSize = FoundChunkExpectedSizes[Index];
if (OptionalWorkerPool)
{
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &Continue]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- if (!Continue)
- {
- return;
- }
- try
- {
- if (!ProcessOne(ChunkIndex, ExpectedSize))
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &AsyncContinue](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
{
- Continue = false;
+ AsyncContinue.store(false);
}
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'",
- m_RootDirectory,
- ChunkHashes[ChunkIndex],
- Ex.what());
- }
- });
+ if (!AsyncContinue)
+ {
+ return;
+ }
+ try
+ {
+ if (!ProcessOne(ChunkIndex, ExpectedSize))
+ {
+ AsyncContinue.store(false);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'",
+ m_RootDirectory,
+ ChunkHashes[ChunkIndex],
+ Ex.what());
+ AsyncContinue.store(false);
+ }
+ });
}
else
{
- Continue = Continue && ProcessOne(ChunkIndex, ExpectedSize);
+ if (!ProcessOne(ChunkIndex, ExpectedSize))
+ {
+ AsyncContinue.store(false);
+ }
}
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
}
- return Continue;
+ return AsyncContinue.load();
}
void