diff options
| author | Dan Engelbrecht <[email protected]> | 2025-09-10 16:38:33 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-10 16:38:33 +0200 |
| commit | 339668ac935f781c06225d2d685642e27348772b (patch) | |
| tree | a5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zencore/jobqueue.cpp | |
| parent | faster oplog entries with referenceset (#488) (diff) | |
| download | zen-339668ac935f781c06225d2d685642e27348772b.tar.xz zen-339668ac935f781c06225d2d685642e27348772b.zip | |
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zencore/jobqueue.cpp')
| -rw-r--r-- | src/zencore/jobqueue.cpp | 68 |
1 files changed, 36 insertions, 32 deletions
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp index 5d727b69c..4aa8c113e 100644 --- a/src/zencore/jobqueue.cpp +++ b/src/zencore/jobqueue.cpp @@ -109,10 +109,12 @@ public: WorkerCounter.AddCount(1); try { - WorkerPool.ScheduleWork([&]() { - auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); }); - Worker(); - }); + WorkerPool.ScheduleWork( + [&]() { + auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); }); + Worker(); + }, + WorkerThreadPool::EMode::EnableBacklog); return {.Id = NewJobId}; } catch (const std::exception& Ex) @@ -466,34 +468,36 @@ TEST_CASE("JobQueue") for (uint32_t I = 0; I < 100; I++) { JobsLatch.AddCount(1); - Pool.ScheduleWork([&Queue, &JobsLatch, I]() { - auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); - JobsLatch.AddCount(1); - auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) { - auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); - if (Context.IsCancelled()) - { - return; - } - Context.ReportProgress("going to sleep", "", 100, 100); - Sleep(10); - if (Context.IsCancelled()) - { - return; - } - Context.ReportProgress("going to sleep again", "", 100, 50); - if ((I & 0xFF) == 0x10) - { - zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I)); - } - Sleep(10); - if (Context.IsCancelled()) - { - return; - } - Context.ReportProgress("done", "", 100, 0); - }); - }); + Pool.ScheduleWork( + [&Queue, &JobsLatch, I]() { + auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); + JobsLatch.AddCount(1); + auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) { + auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); + if (Context.IsCancelled()) + { + return; + } + Context.ReportProgress("going to sleep", "", 100, 100); + Sleep(10); + if (Context.IsCancelled()) + { + return; + } + Context.ReportProgress("going to sleep again", "", 100, 50); + if ((I & 0xFF) == 0x10) + { + zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I)); + } + Sleep(10); + if (Context.IsCancelled()) + { + return; + } + Context.ReportProgress("done", "", 100, 0); + }); + }, + WorkerThreadPool::EMode::EnableBacklog); } auto Join = [](std::span<std::string> Strings, std::string_view Delimiter) -> std::string { |