// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include #endif // ZEN_WITH_TESTS namespace zen { ParallelWork::ParallelWork(std::atomic& AbortFlag, std::atomic& PauseFlag, WorkerThreadPool::EMode Mode) : m_AbortFlag(AbortFlag) , m_PauseFlag(PauseFlag) , m_Mode(Mode) , m_PendingWork(1) { } ParallelWork::~ParallelWork() { try { if (!m_DispatchComplete) { ZEN_ASSERT(m_PendingWork.Remaining() > 0); ZEN_WARN( "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads " "to complete"); m_PendingWork.CountDown(); m_DispatchComplete = true; } const bool WaitSucceeded = m_PendingWork.Wait(); const ptrdiff_t RemainingWork = m_PendingWork.Remaining(); if (!WaitSucceeded) { ZEN_ERROR("ParallelWork::~ParallelWork(): waiting for latch failed, pending work count at {}", RemainingWork); } if (RemainingWork != 0) { void* Frames[8]; uint32_t FrameCount = GetCallstack(2, 8, Frames); CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames); auto _ = MakeGuard([Callstack]() { FreeCallstack(Callstack); }); ZEN_WARN("ParallelWork::~ParallelWork(): waited for outstanding work but pending work count is {} instead of 0", RemainingWork); uint32_t WaitedMs = 0; while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000) { Sleep(50); WaitedMs += 50; } ptrdiff_t RemainingWorkAfterSafetyWait = m_PendingWork.Remaining(); if (RemainingWorkAfterSafetyWait != 0) { ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks failed, pending work count at {} after {}\n{}", RemainingWork, RemainingWorkAfterSafetyWait, NiceLatencyNs(WaitedMs * 1000000u), CallstackToString(Callstack, " ")) } else { ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks completed after {}\n{}", RemainingWork, NiceLatencyNs(WaitedMs * 1000000u), CallstackToString(Callstack, " ")); } } } catch (const std::exception& Ex) { ZEN_ERROR("Exception in ParallelWork::~ParallelWork(): {}", Ex.what()); } } ParallelWork::ExceptionCallback ParallelWork::DefaultErrorFunction() { return [&](std::exception_ptr Ex, std::atomic& AbortFlag) { m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); }); AbortFlag = true; }; } void ParallelWork::RecordExternalError(std::exception_ptr Ex) { m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); }); m_AbortFlag = true; } ParallelWork::ExternalWorkToken ParallelWork::RegisterExternal() { ZEN_ASSERT(!m_DispatchComplete); m_PendingWork.AddCount(1); return ExternalWorkToken(this); } void ParallelWork::ExternalWorkToken::Complete() { ZEN_ASSERT(m_Owner != nullptr); m_Owner->m_PendingWork.CountDown(); m_Owner = nullptr; } void ParallelWork::ExternalWorkToken::Fail(std::exception_ptr Ex) { ZEN_ASSERT(m_Owner != nullptr); // Null exception_ptr would propagate as std::bad_exception via // rethrow_exception(nullptr) and mask the real failure mode. Catches // patterns like MakeGuard([Token]{ Token->Fail(std::current_exception()); }) // firing on a normal-return path where no exception is in flight. ZEN_ASSERT(Ex != nullptr); m_Owner->RecordExternalError(Ex); m_Owner->m_PendingWork.CountDown(); m_Owner = nullptr; } void ParallelWork::ExternalWorkToken::Release() { if (m_Owner != nullptr) { // Tests should fail loudly so that any leaked path surfaces immediately; // in production we keep the safety-net countdown so a leak does not deadlock // the latch but log it as an error rather than a warning - this is always // a programming bug. #if ZEN_WITH_TESTS ZEN_ASSERT_FORMAT(false, "ParallelWork::ExternalWorkToken destroyed without Complete()/Fail()"); #else ZEN_ERROR("ParallelWork::ExternalWorkToken destroyed without Complete()/Fail(); decrementing latch as safety net"); // Surface as an error from Wait()/RethrowErrors() so the caller does not see a phantom success. m_Owner->RecordExternalError( std::make_exception_ptr(std::runtime_error("ParallelWork::ExternalWorkToken destroyed without Complete()/Fail()"))); #endif m_Owner->m_PendingWork.CountDown(); m_Owner = nullptr; } } void ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) { ZEN_ASSERT(!m_DispatchComplete); ZEN_ASSERT(m_PendingWork.Remaining() > 0); m_PendingWork.CountDown(); m_DispatchComplete = true; while (!m_PendingWork.Wait(UpdateIntervalMS)) { UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining()); } RethrowErrors(); } void ParallelWork::Wait() { ZEN_ASSERT(!m_DispatchComplete); ZEN_ASSERT(m_PendingWork.Remaining() > 0); m_PendingWork.CountDown(); m_DispatchComplete = true; const bool WaitSucceeded = m_PendingWork.Wait(); const ptrdiff_t RemainingWork = m_PendingWork.Remaining(); if (!WaitSucceeded) { ZEN_ERROR("ParallelWork::Wait(): waiting for latch failed, pending work count at {}", RemainingWork); } else if (RemainingWork != 0) { ZEN_ERROR("ParallelWork::Wait(): pending work count at {} after successful wait for latch", RemainingWork); } RethrowErrors(); } void ParallelWork::RethrowErrors() { if (!m_Errors.empty()) { if (m_Errors.size() > 1) { ZEN_INFO("Multiple exceptions thrown during ParallelWork execution, dropping the following exceptions:"); auto It = m_Errors.begin() + 1; while (It != m_Errors.end()) { try { std::rethrow_exception(*It); } catch (const std::exception& Ex) { ZEN_INFO(" {}", Ex.what()); } It++; } } std::exception_ptr Ex = m_Errors.front(); m_Errors.clear(); std::rethrow_exception(Ex); } } #if ZEN_WITH_TESTS TEST_SUITE_BEGIN("core.parallelwork"); TEST_CASE("parallellwork.nowork") { std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); Work.Wait(); } TEST_CASE("parallellwork.basic") { WorkerThreadPool WorkerPool(2); std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [](std::atomic& AbortFlag) { CHECK(!AbortFlag); }); } Work.Wait(); } TEST_CASE("parallellwork.throws_in_work") { WorkerThreadPool WorkerPool(2); std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t I = 0; I < 10; I++) { Work.ScheduleWork(WorkerPool, [I](std::atomic& AbortFlag) { ZEN_UNUSED(AbortFlag); if (I > 3) { throw std::runtime_error("We throw in async thread"); } else { Sleep(10); } }); } CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread"); } TEST_CASE("parallellwork.throws_in_dispatch") { WorkerThreadPool WorkerPool(2); std::atomic ExecutedCount; try { std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic& AbortFlag) { if (AbortFlag.load()) { return; } ExecutedCount++; }); if (I == 3) { throw std::runtime_error("We throw in dispatcher thread"); } } CHECK(false); } catch (const std::runtime_error& Ex) { CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what())); CHECK_LE(ExecutedCount.load(), 4); } } TEST_CASE("parallellwork.limitqueue") { WorkerThreadPool WorkerPool(2); std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [](std::atomic& AbortFlag) { if (AbortFlag.load()) { return; } Sleep(10); }); } Work.Wait(); } TEST_CASE("parallellwork.external_basic") { std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::vector Tokens; for (uint32_t I = 0; I < 5; I++) { Tokens.push_back(Work.RegisterExternal()); } for (auto& Token : Tokens) { Token.Complete(); } Work.Wait(); CHECK_FALSE(AbortFlag.load()); } TEST_CASE("parallellwork.external_completes_from_other_thread") { std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); auto Token = Work.RegisterExternal(); std::thread Worker([Token = std::move(Token)]() mutable { Sleep(20); Token.Complete(); }); Work.Wait(); Worker.join(); CHECK_FALSE(AbortFlag.load()); } TEST_CASE("parallellwork.external_fail_propagates_exception") { std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); auto Token = Work.RegisterExternal(); try { throw std::runtime_error("external work failed"); } catch (...) { Token.Fail(std::current_exception()); } CHECK_THROWS_WITH(Work.Wait(), "external work failed"); CHECK(AbortFlag.load()); } TEST_CASE("parallellwork.external_mixed_with_scheduled") { WorkerThreadPool WorkerPool(2); std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::atomic ScheduledCount = 0; for (uint32_t I = 0; I < 3; I++) { Work.ScheduleWork(WorkerPool, [&ScheduledCount](std::atomic& AbortFlag) { ZEN_UNUSED(AbortFlag); ScheduledCount++; }); } std::vector Tokens; for (uint32_t I = 0; I < 3; I++) { Tokens.push_back(Work.RegisterExternal()); } std::thread Completer([&]() { for (auto& Token : Tokens) { Sleep(5); Token.Complete(); } }); Work.Wait(); Completer.join(); CHECK_EQ(ScheduledCount.load(), 3u); } TEST_SUITE_END(); void parallellwork_forcelink() { } #endif // ZEN_WITH_TESTS } // namespace zen