From 339668ac935f781c06225d2d685642e27348772b Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 10 Sep 2025 16:38:33 +0200 Subject: add EMode to WorkerTheadPool to avoid thread starvation (#492) - Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498 --- src/zenutil/parallelwork.cpp | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) (limited to 'src/zenutil/parallelwork.cpp') diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp index a571d1d11..95417078a 100644 --- a/src/zenutil/parallelwork.cpp +++ b/src/zenutil/parallelwork.cpp @@ -15,9 +15,10 @@ namespace zen { -ParallelWork::ParallelWork(std::atomic& AbortFlag, std::atomic& PauseFlag) +ParallelWork::ParallelWork(std::atomic& AbortFlag, std::atomic& PauseFlag, WorkerThreadPool::EMode Mode) : m_AbortFlag(AbortFlag) , m_PauseFlag(PauseFlag) +, m_Mode(Mode) , m_PendingWork(1) { } @@ -160,7 +161,7 @@ TEST_CASE("parallellwork.nowork") { std::atomic AbortFlag; std::atomic PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); Work.Wait(); } @@ -170,7 +171,7 @@ TEST_CASE("parallellwork.basic") std::atomic AbortFlag; std::atomic PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [](std::atomic& AbortFlag) { CHECK(!AbortFlag); }); @@ -184,7 +185,7 @@ TEST_CASE("parallellwork.throws_in_work") std::atomic AbortFlag; std::atomic PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t I = 0; I < 10; I++) { Work.ScheduleWork(WorkerPool, [I](std::atomic& AbortFlag) { @@ -210,7 +211,7 @@ TEST_CASE("parallellwork.throws_in_dispatch") { std::atomic AbortFlag; std::atomic PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic& AbortFlag) { @@ -234,6 +235,26 @@ TEST_CASE("parallellwork.throws_in_dispatch") } } +TEST_CASE("parallellwork.limitqueue") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic AbortFlag; + std::atomic PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [](std::atomic& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + Sleep(10); + }); + } + Work.Wait(); +} + void parallellwork_forcelink() { -- cgit v1.2.3