diff options
Diffstat (limited to 'src/zencore/parallelwork.cpp')
| -rw-r--r-- | src/zencore/parallelwork.cpp | 156 |
1 files changed, 156 insertions, 0 deletions
diff --git a/src/zencore/parallelwork.cpp b/src/zencore/parallelwork.cpp index 94696f479..ec00fe0bc 100644 --- a/src/zencore/parallelwork.cpp +++ b/src/zencore/parallelwork.cpp @@ -2,6 +2,7 @@ #include <zencore/parallelwork.h> +#include <zencore/assertfmt.h> #include <zencore/callstack.h> #include <zencore/except.h> #include <zencore/fmtutils.h> @@ -11,6 +12,8 @@ #if ZEN_WITH_TESTS # include <zencore/testing.h> + +# include <thread> #endif // ZEN_WITH_TESTS namespace zen { @@ -90,6 +93,65 @@ ParallelWork::DefaultErrorFunction() } void +ParallelWork::RecordExternalError(std::exception_ptr Ex) +{ + m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); }); + m_AbortFlag = true; +} + +ParallelWork::ExternalWorkToken +ParallelWork::RegisterExternal() +{ + ZEN_ASSERT(!m_DispatchComplete); + m_PendingWork.AddCount(1); + return ExternalWorkToken(this); +} + +void +ParallelWork::ExternalWorkToken::Complete() +{ + ZEN_ASSERT(m_Owner != nullptr); + m_Owner->m_PendingWork.CountDown(); + m_Owner = nullptr; +} + +void +ParallelWork::ExternalWorkToken::Fail(std::exception_ptr Ex) +{ + ZEN_ASSERT(m_Owner != nullptr); + // Null exception_ptr would propagate as std::bad_exception via + // rethrow_exception(nullptr) and mask the real failure mode. Catches + // patterns like MakeGuard([Token]{ Token->Fail(std::current_exception()); }) + // firing on a normal-return path where no exception is in flight. + ZEN_ASSERT(Ex != nullptr); + m_Owner->RecordExternalError(Ex); + m_Owner->m_PendingWork.CountDown(); + m_Owner = nullptr; +} + +void +ParallelWork::ExternalWorkToken::Release() +{ + if (m_Owner != nullptr) + { + // Tests should fail loudly so that any leaked path surfaces immediately; + // in production we keep the safety-net countdown so a leak does not deadlock + // the latch but log it as an error rather than a warning - this is always + // a programming bug. +#if ZEN_WITH_TESTS + ZEN_ASSERT_FORMAT(false, "ParallelWork::ExternalWorkToken destroyed without Complete()/Fail()"); +#else + ZEN_ERROR("ParallelWork::ExternalWorkToken destroyed without Complete()/Fail(); decrementing latch as safety net"); + // Surface as an error from Wait()/RethrowErrors() so the caller does not see a phantom success. + m_Owner->RecordExternalError( + std::make_exception_ptr(std::runtime_error("ParallelWork::ExternalWorkToken destroyed without Complete()/Fail()"))); +#endif + m_Owner->m_PendingWork.CountDown(); + m_Owner = nullptr; + } +} + +void ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) { ZEN_ASSERT(!m_DispatchComplete); @@ -257,6 +319,100 @@ TEST_CASE("parallellwork.limitqueue") Work.Wait(); } +TEST_CASE("parallellwork.external_basic") +{ + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::vector<ParallelWork::ExternalWorkToken> Tokens; + for (uint32_t I = 0; I < 5; I++) + { + Tokens.push_back(Work.RegisterExternal()); + } + for (auto& Token : Tokens) + { + Token.Complete(); + } + + Work.Wait(); + CHECK_FALSE(AbortFlag.load()); +} + +TEST_CASE("parallellwork.external_completes_from_other_thread") +{ + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + auto Token = Work.RegisterExternal(); + std::thread Worker([Token = std::move(Token)]() mutable { + Sleep(20); + Token.Complete(); + }); + + Work.Wait(); + Worker.join(); + CHECK_FALSE(AbortFlag.load()); +} + +TEST_CASE("parallellwork.external_fail_propagates_exception") +{ + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + auto Token = Work.RegisterExternal(); + try + { + throw std::runtime_error("external work failed"); + } + catch (...) + { + Token.Fail(std::current_exception()); + } + + CHECK_THROWS_WITH(Work.Wait(), "external work failed"); + CHECK(AbortFlag.load()); +} + +TEST_CASE("parallellwork.external_mixed_with_scheduled") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::atomic<uint32_t> ScheduledCount = 0; + for (uint32_t I = 0; I < 3; I++) + { + Work.ScheduleWork(WorkerPool, [&ScheduledCount](std::atomic<bool>& AbortFlag) { + ZEN_UNUSED(AbortFlag); + ScheduledCount++; + }); + } + + std::vector<ParallelWork::ExternalWorkToken> Tokens; + for (uint32_t I = 0; I < 3; I++) + { + Tokens.push_back(Work.RegisterExternal()); + } + + std::thread Completer([&]() { + for (auto& Token : Tokens) + { + Sleep(5); + Token.Complete(); + } + }); + + Work.Wait(); + Completer.join(); + + CHECK_EQ(ScheduledCount.load(), 3u); +} + TEST_SUITE_END(); void |