From 573907447db3e19d49c0bcaf3f659cf2d599c738 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 16 Nov 2023 18:50:27 +0100 Subject: blocking queue fix (#550) * make BlockingQueue::m_CompleteAdding non-atomic * ZenCacheDiskLayer::Flush logging * name worker threads in ZenCacheDiskLayer::DiscoverBuckets * name worker threads in gcv2 * improved logging in ZenServerInstance * scrub threadpool naming * remove waitpid handling, we should just call wait to kill zombie processes --- src/zenserver/cache/cachedisklayer.cpp | 43 ++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 15 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index a6cb54444..afb974d76 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -3210,7 +3210,7 @@ ZenCacheDiskLayer::DiscoverBuckets() const size_t MaxHwTreadUse = std::thread::hardware_concurrency(); const int WorkerThreadPoolCount = gsl::narrow(Min(MaxHwTreadUse, FoundBucketDirectories.size())); - WorkerThreadPool Pool(WorkerThreadPoolCount); + WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::OpenOrCreate"); Latch WorkLatch(1); for (auto& BucketPath : FoundBucketDirectories) { @@ -3300,9 +3300,17 @@ void ZenCacheDiskLayer::Flush() { std::vector Buckets; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (Buckets.empty()) + { + return; + } + ZEN_INFO("Flushed {} buckets at '{}' in {}", Buckets.size(), m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); { - RwLock::SharedLockScope _(m_Lock); + RwLock::SharedLockScope __(m_Lock); if (m_Buckets.empty()) { return; @@ -3314,21 +3322,26 @@ ZenCacheDiskLayer::Flush() Buckets.push_back(Bucket); } } - const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); - const int WorkerThreadPoolCount = gsl::narrow(Min(MaxHwTreadUse, Buckets.size())); - - WorkerThreadPool Pool(WorkerThreadPoolCount); - Latch WorkLatch(1); - for (auto& Bucket : Buckets) { - WorkLatch.AddCount(1); - Pool.ScheduleWork([&]() { - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); - Bucket->Flush(); - }); + const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); + const int WorkerThreadPoolCount = gsl::narrow(Min(MaxHwTreadUse, Buckets.size())); + + WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::Flush"); + Latch WorkLatch(1); + for (auto& Bucket : Buckets) + { + WorkLatch.AddCount(1); + Pool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); + Bucket->Flush(); + }); + } + WorkLatch.CountDown(); + while (!WorkLatch.Wait(1000)) + { + ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir); + } } - WorkLatch.CountDown(); - WorkLatch.Wait(); } void -- cgit v1.2.3