aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-09-10 16:38:33 +0200
committerGitHub Enterprise <[email protected]>2025-09-10 16:38:33 +0200
commit339668ac935f781c06225d2d685642e27348772b (patch)
treea5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zenstore/cache
parentfaster oplog entries with referenceset (#488) (diff)
downloadzen-339668ac935f781c06225d2d685642e27348772b.tar.xz
zen-339668ac935f781c06225d2d685642e27348772b.zip
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zenstore/cache')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp7
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp104
2 files changed, 63 insertions, 48 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index cacbbd966..fd52cdab5 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -4215,7 +4215,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst);
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
for (auto& BucketPath : FoundBucketDirectories)
@@ -4387,7 +4387,7 @@ ZenCacheDiskLayer::Flush()
WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst);
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
for (auto& Bucket : Buckets)
@@ -4434,7 +4434,8 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
{
# if 1
Results.push_back(Ctx.ThreadPool().EnqueueTask(
- std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
+ std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }},
+ WorkerThreadPool::EMode::EnableBacklog));
# else
CacheBucket& Bucket = *Kv.second;
Bucket.ScrubStorage(Ctx);
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index 1f2d6c37f..3f27e6d21 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -1574,10 +1574,12 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, &Chunk]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1612,16 +1614,18 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- std::string Bucket = Chunk.second.Bucket;
- IoHash ChunkHash = Chunk.first;
- ZenCacheValue CacheValue;
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, &Chunk]() {
+ std::string Bucket = Chunk.second.Bucket;
+ IoHash ChunkHash = Chunk.first;
+ ZenCacheValue CacheValue;
- CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
- IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
- CHECK(ChunkHash == Hash);
- WorkCompleted.fetch_add(1);
- });
+ CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
+ IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
+ CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1655,23 +1659,27 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic_uint32_t AddedChunkCount = 0;
for (const auto& Chunk : NewChunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
- AddedChunkCount.fetch_add(1);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
+ AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
- ZenCacheValue CacheValue;
- if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
- {
- CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
- }
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ }
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (AddedChunkCount.load() < NewChunks.size())
{
@@ -1710,12 +1718,14 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : GcChunkHashes)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
- ZenCacheValue CacheValue;
- CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
- CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (WorkCompleted < GcChunkHashes.size())
{
@@ -1848,11 +1858,13 @@ TEST_CASE("cachestore.drop.bucket")
CHECK(Value1.Value);
std::atomic_bool WorkComplete = false;
- Workers.ScheduleWork([&]() {
- zen::Sleep(100);
- Value1.Value = IoBuffer{};
- WorkComplete = true;
- });
+ Workers.ScheduleWork(
+ [&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ WorkComplete = true;
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
// On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
// Our DropBucket execution blocks any incoming request from completing until we are done with the drop
CHECK(Zcs.DropBucket(Namespace, Bucket));
@@ -1931,14 +1943,16 @@ TEST_CASE("cachestore.drop.namespace")
CHECK(Value4.Value);
std::atomic_bool WorkComplete = false;
- Workers.ScheduleWork([&]() {
- zen::Sleep(100);
- Value1.Value = IoBuffer{};
- Value2.Value = IoBuffer{};
- Value3.Value = IoBuffer{};
- Value4.Value = IoBuffer{};
- WorkComplete = true;
- });
+ Workers.ScheduleWork(
+ [&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ Value2.Value = IoBuffer{};
+ Value3.Value = IoBuffer{};
+ Value4.Value = IoBuffer{};
+ WorkComplete = true;
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
// On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
// Our DropBucket execution blocks any incoming request from completing until we are done with the drop
CHECK(Zcs.DropNamespace(Namespace1));