aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-09-10 16:38:33 +0200
committerGitHub Enterprise <[email protected]>2025-09-10 16:38:33 +0200
commit339668ac935f781c06225d2d685642e27348772b (patch)
treea5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zenstore/cache/structuredcachestore.cpp
parentfaster oplog entries with referenceset (#488) (diff)
downloadzen-339668ac935f781c06225d2d685642e27348772b.tar.xz
zen-339668ac935f781c06225d2d685642e27348772b.zip
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zenstore/cache/structuredcachestore.cpp')
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp104
1 files changed, 59 insertions, 45 deletions
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index 1f2d6c37f..3f27e6d21 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -1574,10 +1574,12 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, &Chunk]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1612,16 +1614,18 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- std::string Bucket = Chunk.second.Bucket;
- IoHash ChunkHash = Chunk.first;
- ZenCacheValue CacheValue;
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, &Chunk]() {
+ std::string Bucket = Chunk.second.Bucket;
+ IoHash ChunkHash = Chunk.first;
+ ZenCacheValue CacheValue;
- CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
- IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
- CHECK(ChunkHash == Hash);
- WorkCompleted.fetch_add(1);
- });
+ CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
+ IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
+ CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1655,23 +1659,27 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic_uint32_t AddedChunkCount = 0;
for (const auto& Chunk : NewChunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
- AddedChunkCount.fetch_add(1);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
+ AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
- ZenCacheValue CacheValue;
- if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
- {
- CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
- }
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ }
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (AddedChunkCount.load() < NewChunks.size())
{
@@ -1710,12 +1718,14 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : GcChunkHashes)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
- ZenCacheValue CacheValue;
- CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
- CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (WorkCompleted < GcChunkHashes.size())
{
@@ -1848,11 +1858,13 @@ TEST_CASE("cachestore.drop.bucket")
CHECK(Value1.Value);
std::atomic_bool WorkComplete = false;
- Workers.ScheduleWork([&]() {
- zen::Sleep(100);
- Value1.Value = IoBuffer{};
- WorkComplete = true;
- });
+ Workers.ScheduleWork(
+ [&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ WorkComplete = true;
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
// On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
// Our DropBucket execution blocks any incoming request from completing until we are done with the drop
CHECK(Zcs.DropBucket(Namespace, Bucket));
@@ -1931,14 +1943,16 @@ TEST_CASE("cachestore.drop.namespace")
CHECK(Value4.Value);
std::atomic_bool WorkComplete = false;
- Workers.ScheduleWork([&]() {
- zen::Sleep(100);
- Value1.Value = IoBuffer{};
- Value2.Value = IoBuffer{};
- Value3.Value = IoBuffer{};
- Value4.Value = IoBuffer{};
- WorkComplete = true;
- });
+ Workers.ScheduleWork(
+ [&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ Value2.Value = IoBuffer{};
+ Value3.Value = IoBuffer{};
+ Value4.Value = IoBuffer{};
+ WorkComplete = true;
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
// On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
// Our DropBucket execution blocks any incoming request from completing until we are done with the drop
CHECK(Zcs.DropNamespace(Namespace1));