aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/parallelwork.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/parallelwork.cpp')
-rw-r--r--src/zenutil/parallelwork.cpp192
1 files changed, 192 insertions, 0 deletions
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
new file mode 100644
index 000000000..516d70e28
--- /dev/null
+++ b/src/zenutil/parallelwork.cpp
@@ -0,0 +1,192 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/parallelwork.h>
+
+#include <zencore/except.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+
+#include <typeinfo>
+
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+#endif // ZEN_WITH_TESTS
+
+namespace zen {
+
+ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1)
+{
+}
+
+ParallelWork::~ParallelWork()
+{
+ try
+ {
+ if (!m_DispatchComplete)
+ {
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ ZEN_WARN(
+ "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads "
+ "to complete");
+ m_PendingWork.CountDown();
+ }
+ m_AbortFlag.store(true);
+ m_PendingWork.Wait();
+ ZEN_ASSERT(m_PendingWork.Remaining() == 0);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in ~ParallelWork: {}", Ex.what());
+ }
+}
+
+ParallelWork::ExceptionCallback
+ParallelWork::DefaultErrorFunction()
+{
+ return [&](std::exception_ptr Ex, std::atomic<bool>& AbortFlag) {
+ m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); });
+ AbortFlag = true;
+ };
+}
+
+void
+ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback)
+{
+ ZEN_ASSERT(!m_DispatchComplete);
+ m_DispatchComplete = true;
+
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ m_PendingWork.CountDown();
+
+ while (!m_PendingWork.Wait(UpdateIntervalMS))
+ {
+ UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining());
+ }
+
+ RethrowErrors();
+}
+
+void
+ParallelWork::Wait()
+{
+ ZEN_ASSERT(!m_DispatchComplete);
+ m_DispatchComplete = true;
+
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ m_PendingWork.CountDown();
+ m_PendingWork.Wait();
+
+ RethrowErrors();
+}
+
+void
+ParallelWork::RethrowErrors()
+{
+ if (!m_Errors.empty())
+ {
+ if (m_Errors.size() > 1)
+ {
+ ZEN_INFO("Multiple exceptions throwm during ParallelWork execution, dropping the following exceptions:");
+ auto It = m_Errors.begin() + 1;
+ while (It != m_Errors.end())
+ {
+ try
+ {
+ std::rethrow_exception(*It);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_INFO(" {}", Ex.what());
+ }
+ It++;
+ }
+ }
+ std::exception_ptr Ex = m_Errors.front();
+ m_Errors.clear();
+ std::rethrow_exception(Ex);
+ }
+}
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("parallellwork.nowork")
+{
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ Work.Wait();
+}
+
+TEST_CASE("parallellwork.basic")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); });
+ }
+ Work.Wait();
+}
+
+TEST_CASE("parallellwork.throws_in_work")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ for (uint32_t I = 0; I < 10; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) {
+ ZEN_UNUSED(AbortFlag);
+ if (I > 3)
+ {
+ throw std::runtime_error("We throw in async thread");
+ }
+ else
+ {
+ Sleep(10);
+ }
+ });
+ }
+ CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread");
+}
+
+TEST_CASE("parallellwork.throws_in_dispatch")
+{
+ WorkerThreadPool WorkerPool(2);
+ std::atomic<uint32_t> ExecutedCount;
+ try
+ {
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ ExecutedCount++;
+ });
+ if (I == 3)
+ {
+ throw std::runtime_error("We throw in dispatcher thread");
+ }
+ }
+ CHECK(false);
+ }
+ catch (const std::runtime_error& Ex)
+ {
+ CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what()));
+ CHECK_LE(ExecutedCount.load(), 4);
+ }
+}
+
+void
+parallellwork_forcelink()
+{
+}
+#endif // ZEN_WITH_TESTS
+
+} // namespace zen