From 4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 16 May 2025 19:51:36 +0200 Subject: parallel work handle dispatch exception (#400) - Bugfix: Wait for async threads if dispatching of work using ParallellWork throws exception --- src/zenutil/parallelwork.cpp | 192 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 src/zenutil/parallelwork.cpp (limited to 'src/zenutil/parallelwork.cpp') diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp new file mode 100644 index 000000000..516d70e28 --- /dev/null +++ b/src/zenutil/parallelwork.cpp @@ -0,0 +1,192 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include + +#include +#include +#include + +#include + +#if ZEN_WITH_TESTS +# include +#endif // ZEN_WITH_TESTS + +namespace zen { + +ParallelWork::ParallelWork(std::atomic& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) +{ +} + +ParallelWork::~ParallelWork() +{ + try + { + if (!m_DispatchComplete) + { + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + ZEN_WARN( + "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads " + "to complete"); + m_PendingWork.CountDown(); + } + m_AbortFlag.store(true); + m_PendingWork.Wait(); + ZEN_ASSERT(m_PendingWork.Remaining() == 0); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Exception in ~ParallelWork: {}", Ex.what()); + } +} + +ParallelWork::ExceptionCallback +ParallelWork::DefaultErrorFunction() +{ + return [&](std::exception_ptr Ex, std::atomic& AbortFlag) { + m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); }); + AbortFlag = true; + }; +} + +void +ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) +{ + ZEN_ASSERT(!m_DispatchComplete); + m_DispatchComplete = true; + + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + m_PendingWork.CountDown(); + + while (!m_PendingWork.Wait(UpdateIntervalMS)) + { + UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining()); + } + + RethrowErrors(); +} + +void +ParallelWork::Wait() +{ + ZEN_ASSERT(!m_DispatchComplete); + m_DispatchComplete = true; + + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + m_PendingWork.CountDown(); + m_PendingWork.Wait(); + + RethrowErrors(); +} + +void +ParallelWork::RethrowErrors() +{ + if (!m_Errors.empty()) + { + if (m_Errors.size() > 1) + { + ZEN_INFO("Multiple exceptions throwm during ParallelWork execution, dropping the following exceptions:"); + auto It = m_Errors.begin() + 1; + while (It != m_Errors.end()) + { + try + { + std::rethrow_exception(*It); + } + catch (const std::exception& Ex) + { + ZEN_INFO(" {}", Ex.what()); + } + It++; + } + } + std::exception_ptr Ex = m_Errors.front(); + m_Errors.clear(); + std::rethrow_exception(Ex); + } +} + +#if ZEN_WITH_TESTS + +TEST_CASE("parallellwork.nowork") +{ + std::atomic AbortFlag; + ParallelWork Work(AbortFlag); + Work.Wait(); +} + +TEST_CASE("parallellwork.basic") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic AbortFlag; + ParallelWork Work(AbortFlag); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [](std::atomic& AbortFlag) { CHECK(!AbortFlag); }); + } + Work.Wait(); +} + +TEST_CASE("parallellwork.throws_in_work") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic AbortFlag; + ParallelWork Work(AbortFlag); + for (uint32_t I = 0; I < 10; I++) + { + Work.ScheduleWork(WorkerPool, [I](std::atomic& AbortFlag) { + ZEN_UNUSED(AbortFlag); + if (I > 3) + { + throw std::runtime_error("We throw in async thread"); + } + else + { + Sleep(10); + } + }); + } + CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread"); +} + +TEST_CASE("parallellwork.throws_in_dispatch") +{ + WorkerThreadPool WorkerPool(2); + std::atomic ExecutedCount; + try + { + std::atomic AbortFlag; + ParallelWork Work(AbortFlag); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + ExecutedCount++; + }); + if (I == 3) + { + throw std::runtime_error("We throw in dispatcher thread"); + } + } + CHECK(false); + } + catch (const std::runtime_error& Ex) + { + CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what())); + CHECK_LE(ExecutedCount.load(), 4); + } +} + +void +parallellwork_forcelink() +{ +} +#endif // ZEN_WITH_TESTS + +} // namespace zen -- cgit v1.2.3 From 49701314f570da3622f11eb37cc889c7d39d9a93 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 19 May 2025 22:25:58 +0200 Subject: handle exception with batch work (#401) * use ParallelWork in rpc playback * use ParallelWork in projectstore * use ParallelWork in buildstore * use ParallelWork in cachedisklayer * use ParallelWork in compactcas * use ParallelWork in filecas * don't set abort flag in ParallelWork destructor * add PrepareFileForScatteredWrite for temp files in httpclient * Use PrepareFileForScatteredWrite when stream-decompressing files * be more relaxed when deleting temp files * allow explicit zen-cache when using direct host url without resolving * fix lambda capture when writing loose chunks * no delay when attempting to remove temp files --- src/zenutil/parallelwork.cpp | 1 - 1 file changed, 1 deletion(-) (limited to 'src/zenutil/parallelwork.cpp') diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp index 516d70e28..91591375a 100644 --- a/src/zenutil/parallelwork.cpp +++ b/src/zenutil/parallelwork.cpp @@ -30,7 +30,6 @@ ParallelWork::~ParallelWork() "to complete"); m_PendingWork.CountDown(); } - m_AbortFlag.store(true); m_PendingWork.Wait(); ZEN_ASSERT(m_PendingWork.Remaining() == 0); } -- cgit v1.2.3 From 42aa2c9a8124ada33c88f5c608e4865b1ff84c64 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 30 May 2025 11:51:05 +0200 Subject: faster oplog validate (#408) Improvement: Faster oplog validate to reduce GC wall time and disk I/O pressure --- src/zenutil/parallelwork.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/zenutil/parallelwork.cpp') diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp index 91591375a..ecacc4b5a 100644 --- a/src/zenutil/parallelwork.cpp +++ b/src/zenutil/parallelwork.cpp @@ -85,7 +85,7 @@ ParallelWork::RethrowErrors() { if (m_Errors.size() > 1) { - ZEN_INFO("Multiple exceptions throwm during ParallelWork execution, dropping the following exceptions:"); + ZEN_INFO("Multiple exceptions thrown during ParallelWork execution, dropping the following exceptions:"); auto It = m_Errors.begin() + 1; while (It != m_Errors.end()) { -- cgit v1.2.3 From 40b9386054de3c23f77da74eefaa743240d164fd Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 5 Jun 2025 14:40:02 +0200 Subject: pause, resume and abort running builds cmd (#421) - Feature: `zen builds pause`, `zen builds resume` and `zen builds abort` commands to control a running `zen builds` command - `--process-id` the process id to control, if omitted it tries to find a running process using the same executable as itself - Improvement: Process report now indicates if it is pausing or aborting --- src/zenutil/parallelwork.cpp | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) (limited to 'src/zenutil/parallelwork.cpp') diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp index ecacc4b5a..67fc03c04 100644 --- a/src/zenutil/parallelwork.cpp +++ b/src/zenutil/parallelwork.cpp @@ -14,7 +14,10 @@ namespace zen { -ParallelWork::ParallelWork(std::atomic& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) +ParallelWork::ParallelWork(std::atomic& AbortFlag, std::atomic& PauseFlag) +: m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_PendingWork(1) { } @@ -59,7 +62,7 @@ ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) while (!m_PendingWork.Wait(UpdateIntervalMS)) { - UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining()); + UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining()); } RethrowErrors(); @@ -111,7 +114,8 @@ ParallelWork::RethrowErrors() TEST_CASE("parallellwork.nowork") { std::atomic AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); Work.Wait(); } @@ -120,7 +124,8 @@ TEST_CASE("parallellwork.basic") WorkerThreadPool WorkerPool(2); std::atomic AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [](std::atomic& AbortFlag) { CHECK(!AbortFlag); }); @@ -133,7 +138,8 @@ TEST_CASE("parallellwork.throws_in_work") WorkerThreadPool WorkerPool(2); std::atomic AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t I = 0; I < 10; I++) { Work.ScheduleWork(WorkerPool, [I](std::atomic& AbortFlag) { @@ -158,7 +164,8 @@ TEST_CASE("parallellwork.throws_in_dispatch") try { std::atomic AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic& AbortFlag) { -- cgit v1.2.3 From d000167e12c6dde651ef86be9f67552291ff1b7d Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 16 Jun 2025 13:17:54 +0200 Subject: graceful wait in parallelwork destructor (#438) * exception safety when issuing ParallelWork * add asserts to Latch usage to catch usage errors * extended error messaging and recovery handling in ParallelWork destructor to help find issues --- src/zenutil/parallelwork.cpp | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) (limited to 'src/zenutil/parallelwork.cpp') diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp index 67fc03c04..aa806438b 100644 --- a/src/zenutil/parallelwork.cpp +++ b/src/zenutil/parallelwork.cpp @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -34,7 +35,33 @@ ParallelWork::~ParallelWork() m_PendingWork.CountDown(); } m_PendingWork.Wait(); - ZEN_ASSERT(m_PendingWork.Remaining() == 0); + ptrdiff_t RemainingWork = m_PendingWork.Remaining(); + if (RemainingWork != 0) + { + void* Frames[8]; + uint32_t FrameCount = GetCallstack(2, 8, Frames); + CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames); + ZEN_ERROR("ParallelWork destructor waited for outstanding work but pending work count is {} instead of 0\n{}", + RemainingWork, + CallstackToString(Callstack, " ")); + FreeCallstack(Callstack); + + uint32_t WaitedMs = 0; + while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000) + { + Sleep(50); + WaitedMs += 50; + } + RemainingWork = m_PendingWork.Remaining(); + if (RemainingWork != 0) + { + ZEN_WARN("ParallelWork destructor safety wait failed, pending work count at {}", RemainingWork) + } + else + { + ZEN_INFO("ParallelWork destructor safety wait succeeded"); + } + } } catch (const std::exception& Ex) { -- cgit v1.2.3