diff options
| author | Dan Engelbrecht <[email protected]> | 2025-09-10 16:38:33 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-10 16:38:33 +0200 |
| commit | 339668ac935f781c06225d2d685642e27348772b (patch) | |
| tree | a5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zenstore/cache/structuredcachestore.cpp | |
| parent | faster oplog entries with referenceset (#488) (diff) | |
| download | zen-339668ac935f781c06225d2d685642e27348772b.tar.xz zen-339668ac935f781c06225d2d685642e27348772b.zip | |
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zenstore/cache/structuredcachestore.cpp')
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 104 |
1 files changed, 59 insertions, 45 deletions
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index 1f2d6c37f..3f27e6d21 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -1574,10 +1574,12 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, &Chunk]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1612,16 +1614,18 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { - std::string Bucket = Chunk.second.Bucket; - IoHash ChunkHash = Chunk.first; - ZenCacheValue CacheValue; + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, &Chunk]() { + std::string Bucket = Chunk.second.Bucket; + IoHash ChunkHash = Chunk.first; + ZenCacheValue CacheValue; - CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); - IoHash Hash = IoHash::HashBuffer(CacheValue.Value); - CHECK(ChunkHash == Hash); - WorkCompleted.fetch_add(1); - }); + CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); + IoHash Hash = IoHash::HashBuffer(CacheValue.Value); + CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1655,23 +1659,27 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) std::atomic_uint32_t AddedChunkCount = 0; for (const auto& Chunk : NewChunks) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); - AddedChunkCount.fetch_add(1); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); + AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { - ZenCacheValue CacheValue; - if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) - { - CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); - } - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, Chunk]() { + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) + { + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); + } + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } while (AddedChunkCount.load() < NewChunks.size()) { @@ -1710,12 +1718,14 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : GcChunkHashes) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { - ZenCacheValue CacheValue; - CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); - CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, Chunk]() { + ZenCacheValue CacheValue; + CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } while (WorkCompleted < GcChunkHashes.size()) { @@ -1848,11 +1858,13 @@ TEST_CASE("cachestore.drop.bucket") CHECK(Value1.Value); std::atomic_bool WorkComplete = false; - Workers.ScheduleWork([&]() { - zen::Sleep(100); - Value1.Value = IoBuffer{}; - WorkComplete = true; - }); + Workers.ScheduleWork( + [&]() { + zen::Sleep(100); + Value1.Value = IoBuffer{}; + WorkComplete = true; + }, + WorkerThreadPool::EMode::EnableBacklog); // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket // Our DropBucket execution blocks any incoming request from completing until we are done with the drop CHECK(Zcs.DropBucket(Namespace, Bucket)); @@ -1931,14 +1943,16 @@ TEST_CASE("cachestore.drop.namespace") CHECK(Value4.Value); std::atomic_bool WorkComplete = false; - Workers.ScheduleWork([&]() { - zen::Sleep(100); - Value1.Value = IoBuffer{}; - Value2.Value = IoBuffer{}; - Value3.Value = IoBuffer{}; - Value4.Value = IoBuffer{}; - WorkComplete = true; - }); + Workers.ScheduleWork( + [&]() { + zen::Sleep(100); + Value1.Value = IoBuffer{}; + Value2.Value = IoBuffer{}; + Value3.Value = IoBuffer{}; + Value4.Value = IoBuffer{}; + WorkComplete = true; + }, + WorkerThreadPool::EMode::EnableBacklog); // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket // Our DropBucket execution blocks any incoming request from completing until we are done with the drop CHECK(Zcs.DropNamespace(Namespace1)); |