aboutsummaryrefslogtreecommitdiff
path: root/src/zencore/parallelwork.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-03 12:38:35 +0200
committerGitHub Enterprise <[email protected]>2025-10-03 12:38:35 +0200
commit5361ee1c77b68bb14237169660840d6d63a74892 (patch)
tree3ad259133e09485a14506be38e43ec5b62a050f2 /src/zencore/parallelwork.cpp
parentmove chunking code to zenremotestore lib (#545) (diff)
downloadzen-5361ee1c77b68bb14237169660840d6d63a74892.tar.xz
zen-5361ee1c77b68bb14237169660840d6d63a74892.zip
remove zenutil dependency in zenremotestore (#547)
* remove dependency to zenutil/workerpools.h from remoteprojectstore.cpp * remove dependency to zenutil/workerpools.h from buildstoragecache.cpp * remove unneded include * move jupiter helpers to zenremotestore * move parallelwork to zencore * remove zenutil dependency from zenremotestore * clean up test project dependencies - use indirect dependencies
Diffstat (limited to 'src/zencore/parallelwork.cpp')
-rw-r--r--src/zencore/parallelwork.cpp264
1 files changed, 264 insertions, 0 deletions
diff --git a/src/zencore/parallelwork.cpp b/src/zencore/parallelwork.cpp
new file mode 100644
index 000000000..d86d5815f
--- /dev/null
+++ b/src/zencore/parallelwork.cpp
@@ -0,0 +1,264 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zencore/parallelwork.h>
+
+#include <zencore/callstack.h>
+#include <zencore/except.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+
+#include <typeinfo>
+
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+#endif // ZEN_WITH_TESTS
+
+namespace zen {
+
+ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode)
+: m_AbortFlag(AbortFlag)
+, m_PauseFlag(PauseFlag)
+, m_Mode(Mode)
+, m_PendingWork(1)
+{
+}
+
+ParallelWork::~ParallelWork()
+{
+ try
+ {
+ if (!m_DispatchComplete)
+ {
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ ZEN_WARN(
+ "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads "
+ "to complete");
+ m_PendingWork.CountDown();
+ m_DispatchComplete = true;
+ }
+ const bool WaitSucceeded = m_PendingWork.Wait();
+ const ptrdiff_t RemainingWork = m_PendingWork.Remaining();
+ if (!WaitSucceeded)
+ {
+ ZEN_ERROR("ParallelWork::~ParallelWork(): waiting for latch failed, pending work count at {}", RemainingWork);
+ }
+ if (RemainingWork != 0)
+ {
+ void* Frames[8];
+ uint32_t FrameCount = GetCallstack(2, 8, Frames);
+ CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames);
+ auto _ = MakeGuard([Callstack]() { FreeCallstack(Callstack); });
+ ZEN_WARN("ParallelWork::~ParallelWork(): waited for outstanding work but pending work count is {} instead of 0", RemainingWork);
+
+ uint32_t WaitedMs = 0;
+ while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000)
+ {
+ Sleep(50);
+ WaitedMs += 50;
+ }
+ ptrdiff_t RemainingWorkAfterSafetyWait = m_PendingWork.Remaining();
+ if (RemainingWorkAfterSafetyWait != 0)
+ {
+ ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks failed, pending work count at {} after {}\n{}",
+ RemainingWork,
+ RemainingWorkAfterSafetyWait,
+ NiceLatencyNs(WaitedMs * 1000000u),
+ CallstackToString(Callstack, " "))
+ }
+ else
+ {
+ ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks completed after {}\n{}",
+ RemainingWork,
+ NiceLatencyNs(WaitedMs * 1000000u),
+ CallstackToString(Callstack, " "));
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in ParallelWork::~ParallelWork(): {}", Ex.what());
+ }
+}
+
+ParallelWork::ExceptionCallback
+ParallelWork::DefaultErrorFunction()
+{
+ return [&](std::exception_ptr Ex, std::atomic<bool>& AbortFlag) {
+ m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); });
+ AbortFlag = true;
+ };
+}
+
+void
+ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback)
+{
+ ZEN_ASSERT(!m_DispatchComplete);
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ m_PendingWork.CountDown();
+ m_DispatchComplete = true;
+
+ while (!m_PendingWork.Wait(UpdateIntervalMS))
+ {
+ UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining());
+ }
+
+ RethrowErrors();
+}
+
+void
+ParallelWork::Wait()
+{
+ ZEN_ASSERT(!m_DispatchComplete);
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ m_PendingWork.CountDown();
+ m_DispatchComplete = true;
+
+ const bool WaitSucceeded = m_PendingWork.Wait();
+ const ptrdiff_t RemainingWork = m_PendingWork.Remaining();
+ if (!WaitSucceeded)
+ {
+ ZEN_ERROR("ParallelWork::Wait(): waiting for latch failed, pending work count at {}", RemainingWork);
+ }
+ else if (RemainingWork != 0)
+ {
+ ZEN_ERROR("ParallelWork::Wait(): pending work count at {} after successful wait for latch", RemainingWork);
+ }
+
+ RethrowErrors();
+}
+
+void
+ParallelWork::RethrowErrors()
+{
+ if (!m_Errors.empty())
+ {
+ if (m_Errors.size() > 1)
+ {
+ ZEN_INFO("Multiple exceptions thrown during ParallelWork execution, dropping the following exceptions:");
+ auto It = m_Errors.begin() + 1;
+ while (It != m_Errors.end())
+ {
+ try
+ {
+ std::rethrow_exception(*It);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_INFO(" {}", Ex.what());
+ }
+ It++;
+ }
+ }
+ std::exception_ptr Ex = m_Errors.front();
+ m_Errors.clear();
+ std::rethrow_exception(Ex);
+ }
+}
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("parallellwork.nowork")
+{
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ Work.Wait();
+}
+
+TEST_CASE("parallellwork.basic")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); });
+ }
+ Work.Wait();
+}
+
+TEST_CASE("parallellwork.throws_in_work")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ for (uint32_t I = 0; I < 10; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) {
+ ZEN_UNUSED(AbortFlag);
+ if (I > 3)
+ {
+ throw std::runtime_error("We throw in async thread");
+ }
+ else
+ {
+ Sleep(10);
+ }
+ });
+ }
+ CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread");
+}
+
+TEST_CASE("parallellwork.throws_in_dispatch")
+{
+ WorkerThreadPool WorkerPool(2);
+ std::atomic<uint32_t> ExecutedCount;
+ try
+ {
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ ExecutedCount++;
+ });
+ if (I == 3)
+ {
+ throw std::runtime_error("We throw in dispatcher thread");
+ }
+ }
+ CHECK(false);
+ }
+ catch (const std::runtime_error& Ex)
+ {
+ CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what()));
+ CHECK_LE(ExecutedCount.load(), 4);
+ }
+}
+
+TEST_CASE("parallellwork.limitqueue")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ Sleep(10);
+ });
+ }
+ Work.Wait();
+}
+
+void
+parallellwork_forcelink()
+{
+}
+#endif // ZEN_WITH_TESTS
+
+} // namespace zen