diff options
| author | Dan Engelbrecht <[email protected]> | 2023-11-06 11:38:56 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-11-06 11:38:56 +0100 |
| commit | ba972a77cb75facefd3f0da962cf34ea5a391884 (patch) | |
| tree | 9e0fe9ad41dd6ba20b966065eaa9f1dc214ab531 /src/zenserver/cache/cachedisklayer.cpp | |
| parent | keep a "null" iobuffer core to reduce redundant memory allocations (#507) (diff) | |
| download | zen-ba972a77cb75facefd3f0da962cf34ea5a391884.tar.xz zen-ba972a77cb75facefd3f0da962cf34ea5a391884.zip | |
multithread cache bucket (#508)
* Multithread init and flush of cache bucket
* tweaked threading cound for bucket discovery, disklayer flush and gc v2
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 87 |
1 files changed, 66 insertions, 21 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index fa80ed414..c0efdc76d 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -945,6 +945,8 @@ ZenCacheDiskLayer::CacheBucket::Flush() } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); + ZEN_INFO("Flushing bucket {}", m_BucketDir); + m_BlockStore.Flush(/*ForceNewBlock*/ false); m_SlogFile.Flush(); @@ -3018,6 +3020,7 @@ ZenCacheDiskLayer::DiscoverBuckets() // Initialize buckets std::vector<std::filesystem::path> BadBucketDirectories; + std::vector<std::filesystem::path> FoundBucketDirectories; RwLock::ExclusiveLockScope _(m_Lock); @@ -3037,26 +3040,7 @@ ZenCacheDiskLayer::DiscoverBuckets() continue; } - auto InsertResult = - m_Buckets.emplace(BucketName, - std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); - CacheBucket& Bucket = *InsertResult.first->second; - - try - { - if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); - - m_Buckets.erase(InsertResult.first); - continue; - } - } - catch (const std::exception& Err) - { - ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - return; - } + FoundBucketDirectories.push_back(BucketPath); ZEN_INFO("Discovered bucket '{}'", BucketName); } @@ -3081,6 +3065,53 @@ ZenCacheDiskLayer::DiscoverBuckets() ZEN_WARN("bad bucket delete failed for '{}'", BadBucketPath); } } + + RwLock SyncLock; + + const size_t MaxHwTreadUse = std::thread::hardware_concurrency(); + const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, FoundBucketDirectories.size())); + + WorkerThreadPool Pool(WorkerThreadPoolCount); + Latch WorkLatch(1); + for (auto& BucketPath : FoundBucketDirectories) + { + WorkLatch.AddCount(1); + Pool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); + + const std::string BucketName = PathToUtf8(BucketPath.stem()); + try + { + std::unique_ptr<CacheBucket> NewBucket = + std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig); + + CacheBucket* Bucket = nullptr; + { + RwLock::ExclusiveLockScope __(SyncLock); + auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket)); + Bucket = InsertResult.first->second.get(); + } + ZEN_ASSERT(Bucket); + + if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false)) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + + { + RwLock::ExclusiveLockScope __(SyncLock); + m_Buckets.erase(BucketName); + } + } + } + catch (const std::exception& Err) + { + ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + return; + } + }); + } + WorkLatch.CountDown(); + WorkLatch.Wait(); } bool @@ -3133,6 +3164,10 @@ ZenCacheDiskLayer::Flush() { RwLock::SharedLockScope _(m_Lock); + if (m_Buckets.empty()) + { + return; + } Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { @@ -3140,11 +3175,21 @@ ZenCacheDiskLayer::Flush() Buckets.push_back(Bucket); } } + const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); + const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, Buckets.size())); + WorkerThreadPool Pool(WorkerThreadPoolCount); + Latch WorkLatch(1); for (auto& Bucket : Buckets) { - Bucket->Flush(); + WorkLatch.AddCount(1); + Pool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); + Bucket->Flush(); + }); } + WorkLatch.CountDown(); + WorkLatch.Wait(); } void |