From 4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 16 May 2025 19:51:36 +0200 Subject: parallel work handle dispatch exception (#400) - Bugfix: Wait for async threads if dispatching of work using ParallellWork throws exception --- src/zenutil/parallelwork.cpp | 192 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 src/zenutil/parallelwork.cpp (limited to 'src/zenutil/parallelwork.cpp') diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp new file mode 100644 index 000000000..516d70e28 --- /dev/null +++ b/src/zenutil/parallelwork.cpp @@ -0,0 +1,192 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include + +#include +#include +#include + +#include + +#if ZEN_WITH_TESTS +# include +#endif // ZEN_WITH_TESTS + +namespace zen { + +ParallelWork::ParallelWork(std::atomic& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) +{ +} + +ParallelWork::~ParallelWork() +{ + try + { + if (!m_DispatchComplete) + { + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + ZEN_WARN( + "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads " + "to complete"); + m_PendingWork.CountDown(); + } + m_AbortFlag.store(true); + m_PendingWork.Wait(); + ZEN_ASSERT(m_PendingWork.Remaining() == 0); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Exception in ~ParallelWork: {}", Ex.what()); + } +} + +ParallelWork::ExceptionCallback +ParallelWork::DefaultErrorFunction() +{ + return [&](std::exception_ptr Ex, std::atomic& AbortFlag) { + m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); }); + AbortFlag = true; + }; +} + +void +ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) +{ + ZEN_ASSERT(!m_DispatchComplete); + m_DispatchComplete = true; + + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + m_PendingWork.CountDown(); + + while (!m_PendingWork.Wait(UpdateIntervalMS)) + { + UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining()); + } + + RethrowErrors(); +} + +void +ParallelWork::Wait() +{ + ZEN_ASSERT(!m_DispatchComplete); + m_DispatchComplete = true; + + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + m_PendingWork.CountDown(); + m_PendingWork.Wait(); + + RethrowErrors(); +} + +void +ParallelWork::RethrowErrors() +{ + if (!m_Errors.empty()) + { + if (m_Errors.size() > 1) + { + ZEN_INFO("Multiple exceptions throwm during ParallelWork execution, dropping the following exceptions:"); + auto It = m_Errors.begin() + 1; + while (It != m_Errors.end()) + { + try + { + std::rethrow_exception(*It); + } + catch (const std::exception& Ex) + { + ZEN_INFO(" {}", Ex.what()); + } + It++; + } + } + std::exception_ptr Ex = m_Errors.front(); + m_Errors.clear(); + std::rethrow_exception(Ex); + } +} + +#if ZEN_WITH_TESTS + +TEST_CASE("parallellwork.nowork") +{ + std::atomic AbortFlag; + ParallelWork Work(AbortFlag); + Work.Wait(); +} + +TEST_CASE("parallellwork.basic") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic AbortFlag; + ParallelWork Work(AbortFlag); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [](std::atomic& AbortFlag) { CHECK(!AbortFlag); }); + } + Work.Wait(); +} + +TEST_CASE("parallellwork.throws_in_work") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic AbortFlag; + ParallelWork Work(AbortFlag); + for (uint32_t I = 0; I < 10; I++) + { + Work.ScheduleWork(WorkerPool, [I](std::atomic& AbortFlag) { + ZEN_UNUSED(AbortFlag); + if (I > 3) + { + throw std::runtime_error("We throw in async thread"); + } + else + { + Sleep(10); + } + }); + } + CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread"); +} + +TEST_CASE("parallellwork.throws_in_dispatch") +{ + WorkerThreadPool WorkerPool(2); + std::atomic ExecutedCount; + try + { + std::atomic AbortFlag; + ParallelWork Work(AbortFlag); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + ExecutedCount++; + }); + if (I == 3) + { + throw std::runtime_error("We throw in dispatcher thread"); + } + } + CHECK(false); + } + catch (const std::runtime_error& Ex) + { + CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what())); + CHECK_LE(ExecutedCount.load(), 4); + } +} + +void +parallellwork_forcelink() +{ +} +#endif // ZEN_WITH_TESTS + +} // namespace zen -- cgit v1.2.3