diff options
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/buildstoragecache.cpp | 28 | ||||
| -rw-r--r-- | src/zenutil/chunkedcontent.cpp | 2 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/parallelwork.h | 39 | ||||
| -rw-r--r-- | src/zenutil/parallelwork.cpp | 31 |
4 files changed, 63 insertions, 37 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp index 2171f4d62..e5e8db8d2 100644 --- a/src/zenutil/buildstoragecache.cpp +++ b/src/zenutil/buildstoragecache.cpp @@ -65,21 +65,23 @@ public: m_PendingBackgroundWorkCount.AddCount(1); try { - m_BackgroundWorkPool.ScheduleWork([this, Work = std::move(Work)]() { - ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork"); - auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); }); - if (!m_CancelBackgroundWork) - { - try - { - Work(); - } - catch (const std::exception& Ex) + m_BackgroundWorkPool.ScheduleWork( + [this, Work = std::move(Work)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork"); + auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); }); + if (!m_CancelBackgroundWork) { - ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what()); + try + { + Work(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what()); + } } - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& Ex) { diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp index cd1bf7dd7..4f2aad95f 100644 --- a/src/zenutil/chunkedcontent.cpp +++ b/src/zenutil/chunkedcontent.cpp @@ -805,7 +805,7 @@ ChunkFolderContent(ChunkingStatistics& Stats, RwLock Lock; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t PathIndex : Order) { diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h index 639c6968c..05146d644 100644 --- a/src/zenutil/include/zenutil/parallelwork.h +++ b/src/zenutil/include/zenutil/parallelwork.h @@ -13,7 +13,7 @@ namespace zen { class ParallelWork { public: - ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag); + ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode); ~ParallelWork(); @@ -26,21 +26,23 @@ public: m_PendingWork.AddCount(1); try { - WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { - auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); }); - try - { - while (m_PauseFlag && !m_AbortFlag) + WorkerPool.ScheduleWork( + [this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { + auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); }); + try { - Sleep(2000); + while (m_PauseFlag && !m_AbortFlag) + { + Sleep(2000); + } + Work(m_AbortFlag); } - Work(m_AbortFlag); - } - catch (...) - { - OnError(std::current_exception(), m_AbortFlag); - } - }); + catch (...) + { + OnError(std::current_exception(), m_AbortFlag); + } + }, + m_Mode); } catch (const std::exception&) { @@ -63,10 +65,11 @@ private: ExceptionCallback DefaultErrorFunction(); void RethrowErrors(); - std::atomic<bool>& m_AbortFlag; - std::atomic<bool>& m_PauseFlag; - bool m_DispatchComplete = false; - Latch m_PendingWork; + std::atomic<bool>& m_AbortFlag; + std::atomic<bool>& m_PauseFlag; + const WorkerThreadPool::EMode m_Mode; + bool m_DispatchComplete = false; + Latch m_PendingWork; RwLock m_ErrorLock; std::vector<std::exception_ptr> m_Errors; diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp index a571d1d11..95417078a 100644 --- a/src/zenutil/parallelwork.cpp +++ b/src/zenutil/parallelwork.cpp @@ -15,9 +15,10 @@ namespace zen { -ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag) +ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode) : m_AbortFlag(AbortFlag) , m_PauseFlag(PauseFlag) +, m_Mode(Mode) , m_PendingWork(1) { } @@ -160,7 +161,7 @@ TEST_CASE("parallellwork.nowork") { std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); Work.Wait(); } @@ -170,7 +171,7 @@ TEST_CASE("parallellwork.basic") std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); }); @@ -184,7 +185,7 @@ TEST_CASE("parallellwork.throws_in_work") std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t I = 0; I < 10; I++) { Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) { @@ -210,7 +211,7 @@ TEST_CASE("parallellwork.throws_in_dispatch") { std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) { @@ -234,6 +235,26 @@ TEST_CASE("parallellwork.throws_in_dispatch") } } +TEST_CASE("parallellwork.limitqueue") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + Sleep(10); + }); + } + Work.Wait(); +} + void parallellwork_forcelink() { |