aboutsummaryrefslogtreecommitdiff
path: root/src/zencore/jobqueue.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-09-10 16:38:33 +0200
committerGitHub Enterprise <[email protected]>2025-09-10 16:38:33 +0200
commit339668ac935f781c06225d2d685642e27348772b (patch)
treea5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zencore/jobqueue.cpp
parentfaster oplog entries with referenceset (#488) (diff)
downloadzen-339668ac935f781c06225d2d685642e27348772b.tar.xz
zen-339668ac935f781c06225d2d685642e27348772b.zip
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zencore/jobqueue.cpp')
-rw-r--r--src/zencore/jobqueue.cpp68
1 files changed, 36 insertions, 32 deletions
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp
index 5d727b69c..4aa8c113e 100644
--- a/src/zencore/jobqueue.cpp
+++ b/src/zencore/jobqueue.cpp
@@ -109,10 +109,12 @@ public:
WorkerCounter.AddCount(1);
try
{
- WorkerPool.ScheduleWork([&]() {
- auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); });
- Worker();
- });
+ WorkerPool.ScheduleWork(
+ [&]() {
+ auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); });
+ Worker();
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
return {.Id = NewJobId};
}
catch (const std::exception& Ex)
@@ -466,34 +468,36 @@ TEST_CASE("JobQueue")
for (uint32_t I = 0; I < 100; I++)
{
JobsLatch.AddCount(1);
- Pool.ScheduleWork([&Queue, &JobsLatch, I]() {
- auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
- JobsLatch.AddCount(1);
- auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) {
- auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
- if (Context.IsCancelled())
- {
- return;
- }
- Context.ReportProgress("going to sleep", "", 100, 100);
- Sleep(10);
- if (Context.IsCancelled())
- {
- return;
- }
- Context.ReportProgress("going to sleep again", "", 100, 50);
- if ((I & 0xFF) == 0x10)
- {
- zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I));
- }
- Sleep(10);
- if (Context.IsCancelled())
- {
- return;
- }
- Context.ReportProgress("done", "", 100, 0);
- });
- });
+ Pool.ScheduleWork(
+ [&Queue, &JobsLatch, I]() {
+ auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
+ JobsLatch.AddCount(1);
+ auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) {
+ auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
+ if (Context.IsCancelled())
+ {
+ return;
+ }
+ Context.ReportProgress("going to sleep", "", 100, 100);
+ Sleep(10);
+ if (Context.IsCancelled())
+ {
+ return;
+ }
+ Context.ReportProgress("going to sleep again", "", 100, 50);
+ if ((I & 0xFF) == 0x10)
+ {
+ zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I));
+ }
+ Sleep(10);
+ if (Context.IsCancelled())
+ {
+ return;
+ }
+ Context.ReportProgress("done", "", 100, 0);
+ });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
auto Join = [](std::span<std::string> Strings, std::string_view Delimiter) -> std::string {