diff options
| author | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
|---|---|---|
| committer | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
| commit | bb298631ba35a323827dda0b8cd6158e276b5f61 (patch) | |
| tree | 7ba8db91c44ce83f2c518f80f80ab14910eefa6f /src/zenutil/parallelwork.cpp | |
| parent | Change to PutResult structure (diff) | |
| parent | 5.6.14 (diff) | |
| download | zen-bb298631ba35a323827dda0b8cd6158e276b5f61.tar.xz zen-bb298631ba35a323827dda0b8cd6158e276b5f61.zip | |
Merge branch 'main' into zs/put-overwrite-policy
Diffstat (limited to 'src/zenutil/parallelwork.cpp')
| -rw-r--r-- | src/zenutil/parallelwork.cpp | 225 |
1 files changed, 225 insertions, 0 deletions
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp new file mode 100644 index 000000000..aa806438b --- /dev/null +++ b/src/zenutil/parallelwork.cpp @@ -0,0 +1,225 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/parallelwork.h> + +#include <zencore/callstack.h> +#include <zencore/except.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> + +#include <typeinfo> + +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +#endif // ZEN_WITH_TESTS + +namespace zen { + +ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag) +: m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_PendingWork(1) +{ +} + +ParallelWork::~ParallelWork() +{ + try + { + if (!m_DispatchComplete) + { + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + ZEN_WARN( + "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads " + "to complete"); + m_PendingWork.CountDown(); + } + m_PendingWork.Wait(); + ptrdiff_t RemainingWork = m_PendingWork.Remaining(); + if (RemainingWork != 0) + { + void* Frames[8]; + uint32_t FrameCount = GetCallstack(2, 8, Frames); + CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames); + ZEN_ERROR("ParallelWork destructor waited for outstanding work but pending work count is {} instead of 0\n{}", + RemainingWork, + CallstackToString(Callstack, " ")); + FreeCallstack(Callstack); + + uint32_t WaitedMs = 0; + while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000) + { + Sleep(50); + WaitedMs += 50; + } + RemainingWork = m_PendingWork.Remaining(); + if (RemainingWork != 0) + { + ZEN_WARN("ParallelWork destructor safety wait failed, pending work count at {}", RemainingWork) + } + else + { + ZEN_INFO("ParallelWork destructor safety wait succeeded"); + } + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Exception in ~ParallelWork: {}", Ex.what()); + } +} + +ParallelWork::ExceptionCallback +ParallelWork::DefaultErrorFunction() +{ + return [&](std::exception_ptr Ex, std::atomic<bool>& AbortFlag) { + m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); }); + AbortFlag = true; + }; +} + +void +ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) +{ + ZEN_ASSERT(!m_DispatchComplete); + m_DispatchComplete = true; + + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + m_PendingWork.CountDown(); + + while (!m_PendingWork.Wait(UpdateIntervalMS)) + { + UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining()); + } + + RethrowErrors(); +} + +void +ParallelWork::Wait() +{ + ZEN_ASSERT(!m_DispatchComplete); + m_DispatchComplete = true; + + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + m_PendingWork.CountDown(); + m_PendingWork.Wait(); + + RethrowErrors(); +} + +void +ParallelWork::RethrowErrors() +{ + if (!m_Errors.empty()) + { + if (m_Errors.size() > 1) + { + ZEN_INFO("Multiple exceptions thrown during ParallelWork execution, dropping the following exceptions:"); + auto It = m_Errors.begin() + 1; + while (It != m_Errors.end()) + { + try + { + std::rethrow_exception(*It); + } + catch (const std::exception& Ex) + { + ZEN_INFO(" {}", Ex.what()); + } + It++; + } + } + std::exception_ptr Ex = m_Errors.front(); + m_Errors.clear(); + std::rethrow_exception(Ex); + } +} + +#if ZEN_WITH_TESTS + +TEST_CASE("parallellwork.nowork") +{ + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + Work.Wait(); +} + +TEST_CASE("parallellwork.basic") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); }); + } + Work.Wait(); +} + +TEST_CASE("parallellwork.throws_in_work") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + for (uint32_t I = 0; I < 10; I++) + { + Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) { + ZEN_UNUSED(AbortFlag); + if (I > 3) + { + throw std::runtime_error("We throw in async thread"); + } + else + { + Sleep(10); + } + }); + } + CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread"); +} + +TEST_CASE("parallellwork.throws_in_dispatch") +{ + WorkerThreadPool WorkerPool(2); + std::atomic<uint32_t> ExecutedCount; + try + { + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + ExecutedCount++; + }); + if (I == 3) + { + throw std::runtime_error("We throw in dispatcher thread"); + } + } + CHECK(false); + } + catch (const std::runtime_error& Ex) + { + CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what())); + CHECK_LE(ExecutedCount.load(), 4); + } +} + +void +parallellwork_forcelink() +{ +} +#endif // ZEN_WITH_TESTS + +} // namespace zen |