aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/parallelwork.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-09-10 16:38:33 +0200
committerGitHub Enterprise <[email protected]>2025-09-10 16:38:33 +0200
commit339668ac935f781c06225d2d685642e27348772b (patch)
treea5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zenutil/parallelwork.cpp
parentfaster oplog entries with referenceset (#488) (diff)
downloadzen-339668ac935f781c06225d2d685642e27348772b.tar.xz
zen-339668ac935f781c06225d2d685642e27348772b.zip
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zenutil/parallelwork.cpp')
-rw-r--r--src/zenutil/parallelwork.cpp31
1 files changed, 26 insertions, 5 deletions
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
index a571d1d11..95417078a 100644
--- a/src/zenutil/parallelwork.cpp
+++ b/src/zenutil/parallelwork.cpp
@@ -15,9 +15,10 @@
namespace zen {
-ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag)
+ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode)
: m_AbortFlag(AbortFlag)
, m_PauseFlag(PauseFlag)
+, m_Mode(Mode)
, m_PendingWork(1)
{
}
@@ -160,7 +161,7 @@ TEST_CASE("parallellwork.nowork")
{
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
Work.Wait();
}
@@ -170,7 +171,7 @@ TEST_CASE("parallellwork.basic")
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (uint32_t I = 0; I < 5; I++)
{
Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); });
@@ -184,7 +185,7 @@ TEST_CASE("parallellwork.throws_in_work")
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (uint32_t I = 0; I < 10; I++)
{
Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) {
@@ -210,7 +211,7 @@ TEST_CASE("parallellwork.throws_in_dispatch")
{
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (uint32_t I = 0; I < 5; I++)
{
Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) {
@@ -234,6 +235,26 @@ TEST_CASE("parallellwork.throws_in_dispatch")
}
}
+TEST_CASE("parallellwork.limitqueue")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ Sleep(10);
+ });
+ }
+ Work.Wait();
+}
+
void
parallellwork_forcelink()
{