aboutsummaryrefslogtreecommitdiff
path: root/src/zencore/parallelwork.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencore/parallelwork.cpp')
-rw-r--r--src/zencore/parallelwork.cpp156
1 files changed, 156 insertions, 0 deletions
diff --git a/src/zencore/parallelwork.cpp b/src/zencore/parallelwork.cpp
index 94696f479..ec00fe0bc 100644
--- a/src/zencore/parallelwork.cpp
+++ b/src/zencore/parallelwork.cpp
@@ -2,6 +2,7 @@
#include <zencore/parallelwork.h>
+#include <zencore/assertfmt.h>
#include <zencore/callstack.h>
#include <zencore/except.h>
#include <zencore/fmtutils.h>
@@ -11,6 +12,8 @@
#if ZEN_WITH_TESTS
# include <zencore/testing.h>
+
+# include <thread>
#endif // ZEN_WITH_TESTS
namespace zen {
@@ -90,6 +93,65 @@ ParallelWork::DefaultErrorFunction()
}
void
+ParallelWork::RecordExternalError(std::exception_ptr Ex)
+{
+ m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); });
+ m_AbortFlag = true;
+}
+
+ParallelWork::ExternalWorkToken
+ParallelWork::RegisterExternal()
+{
+ ZEN_ASSERT(!m_DispatchComplete);
+ m_PendingWork.AddCount(1);
+ return ExternalWorkToken(this);
+}
+
+void
+ParallelWork::ExternalWorkToken::Complete()
+{
+ ZEN_ASSERT(m_Owner != nullptr);
+ m_Owner->m_PendingWork.CountDown();
+ m_Owner = nullptr;
+}
+
+void
+ParallelWork::ExternalWorkToken::Fail(std::exception_ptr Ex)
+{
+ ZEN_ASSERT(m_Owner != nullptr);
+ // Null exception_ptr would propagate as std::bad_exception via
+ // rethrow_exception(nullptr) and mask the real failure mode. Catches
+ // patterns like MakeGuard([Token]{ Token->Fail(std::current_exception()); })
+ // firing on a normal-return path where no exception is in flight.
+ ZEN_ASSERT(Ex != nullptr);
+ m_Owner->RecordExternalError(Ex);
+ m_Owner->m_PendingWork.CountDown();
+ m_Owner = nullptr;
+}
+
+void
+ParallelWork::ExternalWorkToken::Release()
+{
+ if (m_Owner != nullptr)
+ {
+ // Tests should fail loudly so that any leaked path surfaces immediately;
+ // in production we keep the safety-net countdown so a leak does not deadlock
+ // the latch but log it as an error rather than a warning - this is always
+ // a programming bug.
+#if ZEN_WITH_TESTS
+ ZEN_ASSERT_FORMAT(false, "ParallelWork::ExternalWorkToken destroyed without Complete()/Fail()");
+#else
+ ZEN_ERROR("ParallelWork::ExternalWorkToken destroyed without Complete()/Fail(); decrementing latch as safety net");
+ // Surface as an error from Wait()/RethrowErrors() so the caller does not see a phantom success.
+ m_Owner->RecordExternalError(
+ std::make_exception_ptr(std::runtime_error("ParallelWork::ExternalWorkToken destroyed without Complete()/Fail()")));
+#endif
+ m_Owner->m_PendingWork.CountDown();
+ m_Owner = nullptr;
+ }
+}
+
+void
ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback)
{
ZEN_ASSERT(!m_DispatchComplete);
@@ -257,6 +319,100 @@ TEST_CASE("parallellwork.limitqueue")
Work.Wait();
}
+TEST_CASE("parallellwork.external_basic")
+{
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::vector<ParallelWork::ExternalWorkToken> Tokens;
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Tokens.push_back(Work.RegisterExternal());
+ }
+ for (auto& Token : Tokens)
+ {
+ Token.Complete();
+ }
+
+ Work.Wait();
+ CHECK_FALSE(AbortFlag.load());
+}
+
+TEST_CASE("parallellwork.external_completes_from_other_thread")
+{
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ auto Token = Work.RegisterExternal();
+ std::thread Worker([Token = std::move(Token)]() mutable {
+ Sleep(20);
+ Token.Complete();
+ });
+
+ Work.Wait();
+ Worker.join();
+ CHECK_FALSE(AbortFlag.load());
+}
+
+TEST_CASE("parallellwork.external_fail_propagates_exception")
+{
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ auto Token = Work.RegisterExternal();
+ try
+ {
+ throw std::runtime_error("external work failed");
+ }
+ catch (...)
+ {
+ Token.Fail(std::current_exception());
+ }
+
+ CHECK_THROWS_WITH(Work.Wait(), "external work failed");
+ CHECK(AbortFlag.load());
+}
+
+TEST_CASE("parallellwork.external_mixed_with_scheduled")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::atomic<uint32_t> ScheduledCount = 0;
+ for (uint32_t I = 0; I < 3; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [&ScheduledCount](std::atomic<bool>& AbortFlag) {
+ ZEN_UNUSED(AbortFlag);
+ ScheduledCount++;
+ });
+ }
+
+ std::vector<ParallelWork::ExternalWorkToken> Tokens;
+ for (uint32_t I = 0; I < 3; I++)
+ {
+ Tokens.push_back(Work.RegisterExternal());
+ }
+
+ std::thread Completer([&]() {
+ for (auto& Token : Tokens)
+ {
+ Sleep(5);
+ Token.Complete();
+ }
+ });
+
+ Work.Wait();
+ Completer.join();
+
+ CHECK_EQ(ScheduledCount.load(), 3u);
+}
+
TEST_SUITE_END();
void