diff options
| author | Dan Engelbrecht <[email protected]> | 2025-05-19 22:25:58 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-05-19 22:25:58 +0200 |
| commit | 49701314f570da3622f11eb37cc889c7d39d9a93 (patch) | |
| tree | 6159bfc2ba7974a453ded7a58813134e523e9a62 /src/zenstore/cache/cachedisklayer.cpp | |
| parent | parallel work handle dispatch exception (#400) (diff) | |
| download | zen-49701314f570da3622f11eb37cc889c7d39d9a93.tar.xz zen-49701314f570da3622f11eb37cc889c7d39d9a93.zip | |
handle exception with batch work (#401)
* use ParallelWork in rpc playback
* use ParallelWork in projectstore
* use ParallelWork in buildstore
* use ParallelWork in cachedisklayer
* use ParallelWork in compactcas
* use ParallelWork in filecas
* don't set abort flag in ParallelWork destructor
* add PrepareFileForScatteredWrite for temp files in httpclient
* Use PrepareFileForScatteredWrite when stream-decompressing files
* be more relaxed when deleting temp files
* allow explicit zen-cache when using direct host url without resolving
* fix lambda capture when writing loose chunks
* no delay when attempting to remove temp files
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 25 |
1 files changed, 10 insertions, 15 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 91bd9cba8..d80da6ea6 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -14,6 +14,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zencore/xxhash.h> +#include <zenutil/parallelwork.h> #include <zenutil/referencemetadata.h> #include <zenutil/workerpools.h> @@ -3936,14 +3937,13 @@ ZenCacheDiskLayer::DiscoverBuckets() RwLock SyncLock; WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst); - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); for (auto& BucketPath : FoundBucketDirectories) { - WorkLatch.AddCount(1); - Pool.ScheduleWork([this, &WorkLatch, &SyncLock, BucketPath]() { + Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) { ZEN_MEMSCOPE(GetCacheDiskTag()); - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); const std::string BucketName = PathToUtf8(BucketPath.stem()); try { @@ -3984,8 +3984,7 @@ ZenCacheDiskLayer::DiscoverBuckets() } }); } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } bool @@ -4062,16 +4061,15 @@ ZenCacheDiskLayer::Flush() } { WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst); - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); try { for (auto& Bucket : Buckets) { - WorkLatch.AddCount(1); - Pool.ScheduleWork([&WorkLatch, Bucket]() { + Work.ScheduleWork(Pool, [Bucket](std::atomic<bool>&) { ZEN_MEMSCOPE(GetCacheDiskTag()); - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); try { Bucket->Flush(); @@ -4087,11 +4085,8 @@ ZenCacheDiskLayer::Flush() { ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what()); } - WorkLatch.CountDown(); - while (!WorkLatch.Wait(1000)) - { - ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir); - } + Work.Wait(1000, + [&](std::ptrdiff_t Remaining, bool) { ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", Remaining, m_RootDir); }); } } |