diff options
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 25 |
1 files changed, 10 insertions, 15 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 91bd9cba8..d80da6ea6 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -14,6 +14,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zencore/xxhash.h> +#include <zenutil/parallelwork.h> #include <zenutil/referencemetadata.h> #include <zenutil/workerpools.h> @@ -3936,14 +3937,13 @@ ZenCacheDiskLayer::DiscoverBuckets() RwLock SyncLock; WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst); - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); for (auto& BucketPath : FoundBucketDirectories) { - WorkLatch.AddCount(1); - Pool.ScheduleWork([this, &WorkLatch, &SyncLock, BucketPath]() { + Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) { ZEN_MEMSCOPE(GetCacheDiskTag()); - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); const std::string BucketName = PathToUtf8(BucketPath.stem()); try { @@ -3984,8 +3984,7 @@ ZenCacheDiskLayer::DiscoverBuckets() } }); } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } bool @@ -4062,16 +4061,15 @@ ZenCacheDiskLayer::Flush() } { WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst); - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); try { for (auto& Bucket : Buckets) { - WorkLatch.AddCount(1); - Pool.ScheduleWork([&WorkLatch, Bucket]() { + Work.ScheduleWork(Pool, [Bucket](std::atomic<bool>&) { ZEN_MEMSCOPE(GetCacheDiskTag()); - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); try { Bucket->Flush(); @@ -4087,11 +4085,8 @@ ZenCacheDiskLayer::Flush() { ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what()); } - WorkLatch.CountDown(); - while (!WorkLatch.Wait(1000)) - { - ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir); - } + Work.Wait(1000, + [&](std::ptrdiff_t Remaining, bool) { ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", Remaining, m_RootDir); }); } } |