// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #if ZEN_WITH_TESTS # include #endif // ZEN_WITH_TESTS namespace zen { ParallelWork::ParallelWork(std::atomic& AbortFlag, std::atomic& PauseFlag) : m_AbortFlag(AbortFlag) , m_PauseFlag(PauseFlag) , m_PendingWork(1) { } ParallelWork::~ParallelWork() { try { if (!m_DispatchComplete) { ZEN_ASSERT(m_PendingWork.Remaining() > 0); ZEN_WARN( "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads " "to complete"); m_PendingWork.CountDown(); m_DispatchComplete = true; } const bool WaitSucceeded = m_PendingWork.Wait(); const ptrdiff_t RemainingWork = m_PendingWork.Remaining(); if (!WaitSucceeded) { ZEN_ERROR("ParallelWork::~ParallelWork(): waiting for latch failed, pending work count at {}", RemainingWork); } if (RemainingWork != 0) { void* Frames[8]; uint32_t FrameCount = GetCallstack(2, 8, Frames); CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames); auto _ = MakeGuard([Callstack]() { FreeCallstack(Callstack); }); ZEN_WARN("ParallelWork::~ParallelWork(): waited for outstanding work but pending work count is {} instead of 0", RemainingWork); uint32_t WaitedMs = 0; while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000) { Sleep(50); WaitedMs += 50; } ptrdiff_t RemainingWorkAfterSafetyWait = m_PendingWork.Remaining(); if (RemainingWorkAfterSafetyWait != 0) { ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks failed, pending work count at {} after {}\n{}", RemainingWork, RemainingWorkAfterSafetyWait, NiceLatencyNs(WaitedMs * 1000000u), CallstackToString(Callstack, " ")) } else { ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks completed after {}\n{}", RemainingWork, NiceLatencyNs(WaitedMs * 1000000u), CallstackToString(Callstack, " ")); } } } catch (const std::exception& Ex) { ZEN_ERROR("Exception in ParallelWork::~ParallelWork(): {}", Ex.what()); } } ParallelWork::ExceptionCallback ParallelWork::DefaultErrorFunction() { return [&](std::exception_ptr Ex, std::atomic& AbortFlag) { m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); }); AbortFlag = true; }; } void ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) { ZEN_ASSERT(!m_DispatchComplete); ZEN_ASSERT(m_PendingWork.Remaining() > 0); m_PendingWork.CountDown(); m_DispatchComplete = true; while (!m_PendingWork.Wait(UpdateIntervalMS)) { UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining()); } RethrowErrors(); } void ParallelWork::Wait() { ZEN_ASSERT(!m_DispatchComplete); ZEN_ASSERT(m_PendingWork.Remaining() > 0); m_PendingWork.CountDown(); m_DispatchComplete = true; const bool WaitSucceeded = m_PendingWork.Wait(); const ptrdiff_t RemainingWork = m_PendingWork.Remaining(); if (!WaitSucceeded) { ZEN_ERROR("ParallelWork::Wait(): waiting for latch failed, pending work count at {}", RemainingWork); } else if (RemainingWork != 0) { ZEN_ERROR("ParallelWork::Wait(): pending work count at {} after successful wait for latch", RemainingWork); } RethrowErrors(); } void ParallelWork::RethrowErrors() { if (!m_Errors.empty()) { if (m_Errors.size() > 1) { ZEN_INFO("Multiple exceptions thrown during ParallelWork execution, dropping the following exceptions:"); auto It = m_Errors.begin() + 1; while (It != m_Errors.end()) { try { std::rethrow_exception(*It); } catch (const std::exception& Ex) { ZEN_INFO(" {}", Ex.what()); } It++; } } std::exception_ptr Ex = m_Errors.front(); m_Errors.clear(); std::rethrow_exception(Ex); } } #if ZEN_WITH_TESTS TEST_CASE("parallellwork.nowork") { std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); Work.Wait(); } TEST_CASE("parallellwork.basic") { WorkerThreadPool WorkerPool(2); std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [](std::atomic& AbortFlag) { CHECK(!AbortFlag); }); } Work.Wait(); } TEST_CASE("parallellwork.throws_in_work") { WorkerThreadPool WorkerPool(2); std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t I = 0; I < 10; I++) { Work.ScheduleWork(WorkerPool, [I](std::atomic& AbortFlag) { ZEN_UNUSED(AbortFlag); if (I > 3) { throw std::runtime_error("We throw in async thread"); } else { Sleep(10); } }); } CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread"); } TEST_CASE("parallellwork.throws_in_dispatch") { WorkerThreadPool WorkerPool(2); std::atomic ExecutedCount; try { std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic& AbortFlag) { if (AbortFlag.load()) { return; } ExecutedCount++; }); if (I == 3) { throw std::runtime_error("We throw in dispatcher thread"); } } CHECK(false); } catch (const std::runtime_error& Ex) { CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what())); CHECK_LE(ExecutedCount.load(), 4); } } void parallellwork_forcelink() { } #endif // ZEN_WITH_TESTS } // namespace zen