diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-03 12:38:35 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-03 12:38:35 +0200 |
| commit | 5361ee1c77b68bb14237169660840d6d63a74892 (patch) | |
| tree | 3ad259133e09485a14506be38e43ec5b62a050f2 /src/zenutil/parallelwork.cpp | |
| parent | move chunking code to zenremotestore lib (#545) (diff) | |
| download | zen-5361ee1c77b68bb14237169660840d6d63a74892.tar.xz zen-5361ee1c77b68bb14237169660840d6d63a74892.zip | |
remove zenutil dependency in zenremotestore (#547)
* remove dependency to zenutil/workerpools.h from remoteprojectstore.cpp
* remove dependency to zenutil/workerpools.h from buildstoragecache.cpp
* remove unneded include
* move jupiter helpers to zenremotestore
* move parallelwork to zencore
* remove zenutil dependency from zenremotestore
* clean up test project dependencies - use indirect dependencies
Diffstat (limited to 'src/zenutil/parallelwork.cpp')
| -rw-r--r-- | src/zenutil/parallelwork.cpp | 264 |
1 files changed, 0 insertions, 264 deletions
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp deleted file mode 100644 index 95417078a..000000000 --- a/src/zenutil/parallelwork.cpp +++ /dev/null @@ -1,264 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/parallelwork.h> - -#include <zencore/callstack.h> -#include <zencore/except.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> - -#include <typeinfo> - -#if ZEN_WITH_TESTS -# include <zencore/testing.h> -#endif // ZEN_WITH_TESTS - -namespace zen { - -ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode) -: m_AbortFlag(AbortFlag) -, m_PauseFlag(PauseFlag) -, m_Mode(Mode) -, m_PendingWork(1) -{ -} - -ParallelWork::~ParallelWork() -{ - try - { - if (!m_DispatchComplete) - { - ZEN_ASSERT(m_PendingWork.Remaining() > 0); - ZEN_WARN( - "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads " - "to complete"); - m_PendingWork.CountDown(); - m_DispatchComplete = true; - } - const bool WaitSucceeded = m_PendingWork.Wait(); - const ptrdiff_t RemainingWork = m_PendingWork.Remaining(); - if (!WaitSucceeded) - { - ZEN_ERROR("ParallelWork::~ParallelWork(): waiting for latch failed, pending work count at {}", RemainingWork); - } - if (RemainingWork != 0) - { - void* Frames[8]; - uint32_t FrameCount = GetCallstack(2, 8, Frames); - CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames); - auto _ = MakeGuard([Callstack]() { FreeCallstack(Callstack); }); - ZEN_WARN("ParallelWork::~ParallelWork(): waited for outstanding work but pending work count is {} instead of 0", RemainingWork); - - uint32_t WaitedMs = 0; - while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000) - { - Sleep(50); - WaitedMs += 50; - } - ptrdiff_t RemainingWorkAfterSafetyWait = m_PendingWork.Remaining(); - if (RemainingWorkAfterSafetyWait != 0) - { - ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks failed, pending work count at {} after {}\n{}", - RemainingWork, - RemainingWorkAfterSafetyWait, - NiceLatencyNs(WaitedMs * 1000000u), - CallstackToString(Callstack, " ")) - } - else - { - ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks completed after {}\n{}", - RemainingWork, - NiceLatencyNs(WaitedMs * 1000000u), - CallstackToString(Callstack, " ")); - } - } - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Exception in ParallelWork::~ParallelWork(): {}", Ex.what()); - } -} - -ParallelWork::ExceptionCallback -ParallelWork::DefaultErrorFunction() -{ - return [&](std::exception_ptr Ex, std::atomic<bool>& AbortFlag) { - m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); }); - AbortFlag = true; - }; -} - -void -ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) -{ - ZEN_ASSERT(!m_DispatchComplete); - ZEN_ASSERT(m_PendingWork.Remaining() > 0); - m_PendingWork.CountDown(); - m_DispatchComplete = true; - - while (!m_PendingWork.Wait(UpdateIntervalMS)) - { - UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining()); - } - - RethrowErrors(); -} - -void -ParallelWork::Wait() -{ - ZEN_ASSERT(!m_DispatchComplete); - ZEN_ASSERT(m_PendingWork.Remaining() > 0); - m_PendingWork.CountDown(); - m_DispatchComplete = true; - - const bool WaitSucceeded = m_PendingWork.Wait(); - const ptrdiff_t RemainingWork = m_PendingWork.Remaining(); - if (!WaitSucceeded) - { - ZEN_ERROR("ParallelWork::Wait(): waiting for latch failed, pending work count at {}", RemainingWork); - } - else if (RemainingWork != 0) - { - ZEN_ERROR("ParallelWork::Wait(): pending work count at {} after successful wait for latch", RemainingWork); - } - - RethrowErrors(); -} - -void -ParallelWork::RethrowErrors() -{ - if (!m_Errors.empty()) - { - if (m_Errors.size() > 1) - { - ZEN_INFO("Multiple exceptions thrown during ParallelWork execution, dropping the following exceptions:"); - auto It = m_Errors.begin() + 1; - while (It != m_Errors.end()) - { - try - { - std::rethrow_exception(*It); - } - catch (const std::exception& Ex) - { - ZEN_INFO(" {}", Ex.what()); - } - It++; - } - } - std::exception_ptr Ex = m_Errors.front(); - m_Errors.clear(); - std::rethrow_exception(Ex); - } -} - -#if ZEN_WITH_TESTS - -TEST_CASE("parallellwork.nowork") -{ - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - Work.Wait(); -} - -TEST_CASE("parallellwork.basic") -{ - WorkerThreadPool WorkerPool(2); - - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - for (uint32_t I = 0; I < 5; I++) - { - Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); }); - } - Work.Wait(); -} - -TEST_CASE("parallellwork.throws_in_work") -{ - WorkerThreadPool WorkerPool(2); - - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - for (uint32_t I = 0; I < 10; I++) - { - Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) { - ZEN_UNUSED(AbortFlag); - if (I > 3) - { - throw std::runtime_error("We throw in async thread"); - } - else - { - Sleep(10); - } - }); - } - CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread"); -} - -TEST_CASE("parallellwork.throws_in_dispatch") -{ - WorkerThreadPool WorkerPool(2); - std::atomic<uint32_t> ExecutedCount; - try - { - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - for (uint32_t I = 0; I < 5; I++) - { - Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) { - if (AbortFlag.load()) - { - return; - } - ExecutedCount++; - }); - if (I == 3) - { - throw std::runtime_error("We throw in dispatcher thread"); - } - } - CHECK(false); - } - catch (const std::runtime_error& Ex) - { - CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what())); - CHECK_LE(ExecutedCount.load(), 4); - } -} - -TEST_CASE("parallellwork.limitqueue") -{ - WorkerThreadPool WorkerPool(2); - - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); - for (uint32_t I = 0; I < 5; I++) - { - Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { - if (AbortFlag.load()) - { - return; - } - Sleep(10); - }); - } - Work.Wait(); -} - -void -parallellwork_forcelink() -{ -} -#endif // ZEN_WITH_TESTS - -} // namespace zen |