aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/include
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-09-10 16:38:33 +0200
committerGitHub Enterprise <[email protected]>2025-09-10 16:38:33 +0200
commit339668ac935f781c06225d2d685642e27348772b (patch)
treea5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zenutil/include
parentfaster oplog entries with referenceset (#488) (diff)
downloadzen-339668ac935f781c06225d2d685642e27348772b.tar.xz
zen-339668ac935f781c06225d2d685642e27348772b.zip
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zenutil/include')
-rw-r--r--src/zenutil/include/zenutil/parallelwork.h39
1 files changed, 21 insertions, 18 deletions
diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h
index 639c6968c..05146d644 100644
--- a/src/zenutil/include/zenutil/parallelwork.h
+++ b/src/zenutil/include/zenutil/parallelwork.h
@@ -13,7 +13,7 @@ namespace zen {
class ParallelWork
{
public:
- ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag);
+ ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode);
~ParallelWork();
@@ -26,21 +26,23 @@ public:
m_PendingWork.AddCount(1);
try
{
- WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] {
- auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); });
- try
- {
- while (m_PauseFlag && !m_AbortFlag)
+ WorkerPool.ScheduleWork(
+ [this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] {
+ auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); });
+ try
{
- Sleep(2000);
+ while (m_PauseFlag && !m_AbortFlag)
+ {
+ Sleep(2000);
+ }
+ Work(m_AbortFlag);
}
- Work(m_AbortFlag);
- }
- catch (...)
- {
- OnError(std::current_exception(), m_AbortFlag);
- }
- });
+ catch (...)
+ {
+ OnError(std::current_exception(), m_AbortFlag);
+ }
+ },
+ m_Mode);
}
catch (const std::exception&)
{
@@ -63,10 +65,11 @@ private:
ExceptionCallback DefaultErrorFunction();
void RethrowErrors();
- std::atomic<bool>& m_AbortFlag;
- std::atomic<bool>& m_PauseFlag;
- bool m_DispatchComplete = false;
- Latch m_PendingWork;
+ std::atomic<bool>& m_AbortFlag;
+ std::atomic<bool>& m_PauseFlag;
+ const WorkerThreadPool::EMode m_Mode;
+ bool m_DispatchComplete = false;
+ Latch m_PendingWork;
RwLock m_ErrorLock;
std::vector<std::exception_ptr> m_Errors;