aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/buildstoragecache.cpp28
-rw-r--r--src/zenutil/chunkedcontent.cpp2
-rw-r--r--src/zenutil/include/zenutil/parallelwork.h39
-rw-r--r--src/zenutil/parallelwork.cpp31
4 files changed, 63 insertions, 37 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp
index 2171f4d62..e5e8db8d2 100644
--- a/src/zenutil/buildstoragecache.cpp
+++ b/src/zenutil/buildstoragecache.cpp
@@ -65,21 +65,23 @@ public:
m_PendingBackgroundWorkCount.AddCount(1);
try
{
- m_BackgroundWorkPool.ScheduleWork([this, Work = std::move(Work)]() {
- ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork");
- auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); });
- if (!m_CancelBackgroundWork)
- {
- try
- {
- Work();
- }
- catch (const std::exception& Ex)
+ m_BackgroundWorkPool.ScheduleWork(
+ [this, Work = std::move(Work)]() {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork");
+ auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); });
+ if (!m_CancelBackgroundWork)
{
- ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what());
+ try
+ {
+ Work();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what());
+ }
}
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
catch (const std::exception& Ex)
{
diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp
index cd1bf7dd7..4f2aad95f 100644
--- a/src/zenutil/chunkedcontent.cpp
+++ b/src/zenutil/chunkedcontent.cpp
@@ -805,7 +805,7 @@ ChunkFolderContent(ChunkingStatistics& Stats,
RwLock Lock;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (uint32_t PathIndex : Order)
{
diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h
index 639c6968c..05146d644 100644
--- a/src/zenutil/include/zenutil/parallelwork.h
+++ b/src/zenutil/include/zenutil/parallelwork.h
@@ -13,7 +13,7 @@ namespace zen {
class ParallelWork
{
public:
- ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag);
+ ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode);
~ParallelWork();
@@ -26,21 +26,23 @@ public:
m_PendingWork.AddCount(1);
try
{
- WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] {
- auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); });
- try
- {
- while (m_PauseFlag && !m_AbortFlag)
+ WorkerPool.ScheduleWork(
+ [this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] {
+ auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); });
+ try
{
- Sleep(2000);
+ while (m_PauseFlag && !m_AbortFlag)
+ {
+ Sleep(2000);
+ }
+ Work(m_AbortFlag);
}
- Work(m_AbortFlag);
- }
- catch (...)
- {
- OnError(std::current_exception(), m_AbortFlag);
- }
- });
+ catch (...)
+ {
+ OnError(std::current_exception(), m_AbortFlag);
+ }
+ },
+ m_Mode);
}
catch (const std::exception&)
{
@@ -63,10 +65,11 @@ private:
ExceptionCallback DefaultErrorFunction();
void RethrowErrors();
- std::atomic<bool>& m_AbortFlag;
- std::atomic<bool>& m_PauseFlag;
- bool m_DispatchComplete = false;
- Latch m_PendingWork;
+ std::atomic<bool>& m_AbortFlag;
+ std::atomic<bool>& m_PauseFlag;
+ const WorkerThreadPool::EMode m_Mode;
+ bool m_DispatchComplete = false;
+ Latch m_PendingWork;
RwLock m_ErrorLock;
std::vector<std::exception_ptr> m_Errors;
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
index a571d1d11..95417078a 100644
--- a/src/zenutil/parallelwork.cpp
+++ b/src/zenutil/parallelwork.cpp
@@ -15,9 +15,10 @@
namespace zen {
-ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag)
+ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode)
: m_AbortFlag(AbortFlag)
, m_PauseFlag(PauseFlag)
+, m_Mode(Mode)
, m_PendingWork(1)
{
}
@@ -160,7 +161,7 @@ TEST_CASE("parallellwork.nowork")
{
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
Work.Wait();
}
@@ -170,7 +171,7 @@ TEST_CASE("parallellwork.basic")
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (uint32_t I = 0; I < 5; I++)
{
Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); });
@@ -184,7 +185,7 @@ TEST_CASE("parallellwork.throws_in_work")
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (uint32_t I = 0; I < 10; I++)
{
Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) {
@@ -210,7 +211,7 @@ TEST_CASE("parallellwork.throws_in_dispatch")
{
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (uint32_t I = 0; I < 5; I++)
{
Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) {
@@ -234,6 +235,26 @@ TEST_CASE("parallellwork.throws_in_dispatch")
}
}
+TEST_CASE("parallellwork.limitqueue")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ Sleep(10);
+ });
+ }
+ Work.Wait();
+}
+
void
parallellwork_forcelink()
{