aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-11-06 11:38:56 +0100
committerGitHub <[email protected]>2023-11-06 11:38:56 +0100
commitba972a77cb75facefd3f0da962cf34ea5a391884 (patch)
tree9e0fe9ad41dd6ba20b966065eaa9f1dc214ab531 /src/zenserver/cache/cachedisklayer.cpp
parentkeep a "null" iobuffer core to reduce redundant memory allocations (#507) (diff)
downloadzen-ba972a77cb75facefd3f0da962cf34ea5a391884.tar.xz
zen-ba972a77cb75facefd3f0da962cf34ea5a391884.zip
multithread cache bucket (#508)
* Multithread init and flush of cache bucket * tweaked threading cound for bucket discovery, disklayer flush and gc v2
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp87
1 files changed, 66 insertions, 21 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index fa80ed414..c0efdc76d 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -945,6 +945,8 @@ ZenCacheDiskLayer::CacheBucket::Flush()
}
auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
+ ZEN_INFO("Flushing bucket {}", m_BucketDir);
+
m_BlockStore.Flush(/*ForceNewBlock*/ false);
m_SlogFile.Flush();
@@ -3018,6 +3020,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
// Initialize buckets
std::vector<std::filesystem::path> BadBucketDirectories;
+ std::vector<std::filesystem::path> FoundBucketDirectories;
RwLock::ExclusiveLockScope _(m_Lock);
@@ -3037,26 +3040,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
continue;
}
- auto InsertResult =
- m_Buckets.emplace(BucketName,
- std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
- CacheBucket& Bucket = *InsertResult.first->second;
-
- try
- {
- if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false))
- {
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
-
- m_Buckets.erase(InsertResult.first);
- continue;
- }
- }
- catch (const std::exception& Err)
- {
- ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
- return;
- }
+ FoundBucketDirectories.push_back(BucketPath);
ZEN_INFO("Discovered bucket '{}'", BucketName);
}
@@ -3081,6 +3065,53 @@ ZenCacheDiskLayer::DiscoverBuckets()
ZEN_WARN("bad bucket delete failed for '{}'", BadBucketPath);
}
}
+
+ RwLock SyncLock;
+
+ const size_t MaxHwTreadUse = std::thread::hardware_concurrency();
+ const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, FoundBucketDirectories.size()));
+
+ WorkerThreadPool Pool(WorkerThreadPoolCount);
+ Latch WorkLatch(1);
+ for (auto& BucketPath : FoundBucketDirectories)
+ {
+ WorkLatch.AddCount(1);
+ Pool.ScheduleWork([&]() {
+ auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
+
+ const std::string BucketName = PathToUtf8(BucketPath.stem());
+ try
+ {
+ std::unique_ptr<CacheBucket> NewBucket =
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig);
+
+ CacheBucket* Bucket = nullptr;
+ {
+ RwLock::ExclusiveLockScope __(SyncLock);
+ auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket));
+ Bucket = InsertResult.first->second.get();
+ }
+ ZEN_ASSERT(Bucket);
+
+ if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false))
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+
+ {
+ RwLock::ExclusiveLockScope __(SyncLock);
+ m_Buckets.erase(BucketName);
+ }
+ }
+ }
+ catch (const std::exception& Err)
+ {
+ ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ return;
+ }
+ });
+ }
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
}
bool
@@ -3133,6 +3164,10 @@ ZenCacheDiskLayer::Flush()
{
RwLock::SharedLockScope _(m_Lock);
+ if (m_Buckets.empty())
+ {
+ return;
+ }
Buckets.reserve(m_Buckets.size());
for (auto& Kv : m_Buckets)
{
@@ -3140,11 +3175,21 @@ ZenCacheDiskLayer::Flush()
Buckets.push_back(Bucket);
}
}
+ const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u);
+ const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, Buckets.size()));
+ WorkerThreadPool Pool(WorkerThreadPoolCount);
+ Latch WorkLatch(1);
for (auto& Bucket : Buckets)
{
- Bucket->Flush();
+ WorkLatch.AddCount(1);
+ Pool.ScheduleWork([&]() {
+ auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
+ Bucket->Flush();
+ });
}
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
}
void