From ba972a77cb75facefd3f0da962cf34ea5a391884 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 6 Nov 2023 11:38:56 +0100 Subject: multithread cache bucket (#508) * Multithread init and flush of cache bucket * tweaked threading cound for bucket discovery, disklayer flush and gc v2 --- src/zenserver/cache/cachedisklayer.cpp | 87 ++++++++++++++++++++++++++-------- 1 file changed, 66 insertions(+), 21 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index fa80ed414..c0efdc76d 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -945,6 +945,8 @@ ZenCacheDiskLayer::CacheBucket::Flush() } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); + ZEN_INFO("Flushing bucket {}", m_BucketDir); + m_BlockStore.Flush(/*ForceNewBlock*/ false); m_SlogFile.Flush(); @@ -3018,6 +3020,7 @@ ZenCacheDiskLayer::DiscoverBuckets() // Initialize buckets std::vector BadBucketDirectories; + std::vector FoundBucketDirectories; RwLock::ExclusiveLockScope _(m_Lock); @@ -3037,26 +3040,7 @@ ZenCacheDiskLayer::DiscoverBuckets() continue; } - auto InsertResult = - m_Buckets.emplace(BucketName, - std::make_unique(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); - CacheBucket& Bucket = *InsertResult.first->second; - - try - { - if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); - - m_Buckets.erase(InsertResult.first); - continue; - } - } - catch (const std::exception& Err) - { - ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - return; - } + FoundBucketDirectories.push_back(BucketPath); ZEN_INFO("Discovered bucket '{}'", BucketName); } @@ -3081,6 +3065,53 @@ ZenCacheDiskLayer::DiscoverBuckets() ZEN_WARN("bad bucket delete failed for '{}'", BadBucketPath); } } + + RwLock SyncLock; + + const size_t MaxHwTreadUse = std::thread::hardware_concurrency(); + const int WorkerThreadPoolCount = gsl::narrow(Min(MaxHwTreadUse, FoundBucketDirectories.size())); + + WorkerThreadPool Pool(WorkerThreadPoolCount); + Latch WorkLatch(1); + for (auto& BucketPath : FoundBucketDirectories) + { + WorkLatch.AddCount(1); + Pool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); + + const std::string BucketName = PathToUtf8(BucketPath.stem()); + try + { + std::unique_ptr NewBucket = + std::make_unique(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig); + + CacheBucket* Bucket = nullptr; + { + RwLock::ExclusiveLockScope __(SyncLock); + auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket)); + Bucket = InsertResult.first->second.get(); + } + ZEN_ASSERT(Bucket); + + if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false)) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + + { + RwLock::ExclusiveLockScope __(SyncLock); + m_Buckets.erase(BucketName); + } + } + } + catch (const std::exception& Err) + { + ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + return; + } + }); + } + WorkLatch.CountDown(); + WorkLatch.Wait(); } bool @@ -3133,6 +3164,10 @@ ZenCacheDiskLayer::Flush() { RwLock::SharedLockScope _(m_Lock); + if (m_Buckets.empty()) + { + return; + } Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { @@ -3140,11 +3175,21 @@ ZenCacheDiskLayer::Flush() Buckets.push_back(Bucket); } } + const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); + const int WorkerThreadPoolCount = gsl::narrow(Min(MaxHwTreadUse, Buckets.size())); + WorkerThreadPool Pool(WorkerThreadPoolCount); + Latch WorkLatch(1); for (auto& Bucket : Buckets) { - Bucket->Flush(); + WorkLatch.AddCount(1); + Pool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); + Bucket->Flush(); + }); } + WorkLatch.CountDown(); + WorkLatch.Wait(); } void -- cgit v1.2.3