diff options
| author | Dan Engelbrecht <[email protected]> | 2025-09-10 16:38:33 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-10 16:38:33 +0200 |
| commit | 339668ac935f781c06225d2d685642e27348772b (patch) | |
| tree | a5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zenutil/include | |
| parent | faster oplog entries with referenceset (#488) (diff) | |
| download | zen-339668ac935f781c06225d2d685642e27348772b.tar.xz zen-339668ac935f781c06225d2d685642e27348772b.zip | |
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zenutil/include')
| -rw-r--r-- | src/zenutil/include/zenutil/parallelwork.h | 39 |
1 files changed, 21 insertions, 18 deletions
diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h index 639c6968c..05146d644 100644 --- a/src/zenutil/include/zenutil/parallelwork.h +++ b/src/zenutil/include/zenutil/parallelwork.h @@ -13,7 +13,7 @@ namespace zen { class ParallelWork { public: - ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag); + ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode); ~ParallelWork(); @@ -26,21 +26,23 @@ public: m_PendingWork.AddCount(1); try { - WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { - auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); }); - try - { - while (m_PauseFlag && !m_AbortFlag) + WorkerPool.ScheduleWork( + [this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { + auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); }); + try { - Sleep(2000); + while (m_PauseFlag && !m_AbortFlag) + { + Sleep(2000); + } + Work(m_AbortFlag); } - Work(m_AbortFlag); - } - catch (...) - { - OnError(std::current_exception(), m_AbortFlag); - } - }); + catch (...) + { + OnError(std::current_exception(), m_AbortFlag); + } + }, + m_Mode); } catch (const std::exception&) { @@ -63,10 +65,11 @@ private: ExceptionCallback DefaultErrorFunction(); void RethrowErrors(); - std::atomic<bool>& m_AbortFlag; - std::atomic<bool>& m_PauseFlag; - bool m_DispatchComplete = false; - Latch m_PendingWork; + std::atomic<bool>& m_AbortFlag; + std::atomic<bool>& m_PauseFlag; + const WorkerThreadPool::EMode m_Mode; + bool m_DispatchComplete = false; + Latch m_PendingWork; RwLock m_ErrorLock; std::vector<std::exception_ptr> m_Errors; |