aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-05-19 22:25:58 +0200
committerGitHub Enterprise <[email protected]>2025-05-19 22:25:58 +0200
commit49701314f570da3622f11eb37cc889c7d39d9a93 (patch)
tree6159bfc2ba7974a453ded7a58813134e523e9a62 /src/zenstore/cache/cachedisklayer.cpp
parentparallel work handle dispatch exception (#400) (diff)
downloadzen-49701314f570da3622f11eb37cc889c7d39d9a93.tar.xz
zen-49701314f570da3622f11eb37cc889c7d39d9a93.zip
handle exception with batch work (#401)
* use ParallelWork in rpc playback * use ParallelWork in projectstore * use ParallelWork in buildstore * use ParallelWork in cachedisklayer * use ParallelWork in compactcas * use ParallelWork in filecas * don't set abort flag in ParallelWork destructor * add PrepareFileForScatteredWrite for temp files in httpclient * Use PrepareFileForScatteredWrite when stream-decompressing files * be more relaxed when deleting temp files * allow explicit zen-cache when using direct host url without resolving * fix lambda capture when writing loose chunks * no delay when attempting to remove temp files
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp25
1 files changed, 10 insertions, 15 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 91bd9cba8..d80da6ea6 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -14,6 +14,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zencore/xxhash.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/referencemetadata.h>
#include <zenutil/workerpools.h>
@@ -3936,14 +3937,13 @@ ZenCacheDiskLayer::DiscoverBuckets()
RwLock SyncLock;
WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst);
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
for (auto& BucketPath : FoundBucketDirectories)
{
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([this, &WorkLatch, &SyncLock, BucketPath]() {
+ Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) {
ZEN_MEMSCOPE(GetCacheDiskTag());
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
const std::string BucketName = PathToUtf8(BucketPath.stem());
try
{
@@ -3984,8 +3984,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
}
});
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
}
bool
@@ -4062,16 +4061,15 @@ ZenCacheDiskLayer::Flush()
}
{
WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst);
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
try
{
for (auto& Bucket : Buckets)
{
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([&WorkLatch, Bucket]() {
+ Work.ScheduleWork(Pool, [Bucket](std::atomic<bool>&) {
ZEN_MEMSCOPE(GetCacheDiskTag());
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
try
{
Bucket->Flush();
@@ -4087,11 +4085,8 @@ ZenCacheDiskLayer::Flush()
{
ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what());
}
- WorkLatch.CountDown();
- while (!WorkLatch.Wait(1000))
- {
- ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir);
- }
+ Work.Wait(1000,
+ [&](std::ptrdiff_t Remaining, bool) { ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", Remaining, m_RootDir); });
}
}