aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/parallelwork.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-03 12:38:35 +0200
committerGitHub Enterprise <[email protected]>2025-10-03 12:38:35 +0200
commit5361ee1c77b68bb14237169660840d6d63a74892 (patch)
tree3ad259133e09485a14506be38e43ec5b62a050f2 /src/zenutil/parallelwork.cpp
parentmove chunking code to zenremotestore lib (#545) (diff)
downloadzen-5361ee1c77b68bb14237169660840d6d63a74892.tar.xz
zen-5361ee1c77b68bb14237169660840d6d63a74892.zip
remove zenutil dependency in zenremotestore (#547)
* remove dependency to zenutil/workerpools.h from remoteprojectstore.cpp * remove dependency to zenutil/workerpools.h from buildstoragecache.cpp * remove unneded include * move jupiter helpers to zenremotestore * move parallelwork to zencore * remove zenutil dependency from zenremotestore * clean up test project dependencies - use indirect dependencies
Diffstat (limited to 'src/zenutil/parallelwork.cpp')
-rw-r--r--src/zenutil/parallelwork.cpp264
1 files changed, 0 insertions, 264 deletions
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
deleted file mode 100644
index 95417078a..000000000
--- a/src/zenutil/parallelwork.cpp
+++ /dev/null
@@ -1,264 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenutil/parallelwork.h>
-
-#include <zencore/callstack.h>
-#include <zencore/except.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-
-#include <typeinfo>
-
-#if ZEN_WITH_TESTS
-# include <zencore/testing.h>
-#endif // ZEN_WITH_TESTS
-
-namespace zen {
-
-ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode)
-: m_AbortFlag(AbortFlag)
-, m_PauseFlag(PauseFlag)
-, m_Mode(Mode)
-, m_PendingWork(1)
-{
-}
-
-ParallelWork::~ParallelWork()
-{
- try
- {
- if (!m_DispatchComplete)
- {
- ZEN_ASSERT(m_PendingWork.Remaining() > 0);
- ZEN_WARN(
- "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads "
- "to complete");
- m_PendingWork.CountDown();
- m_DispatchComplete = true;
- }
- const bool WaitSucceeded = m_PendingWork.Wait();
- const ptrdiff_t RemainingWork = m_PendingWork.Remaining();
- if (!WaitSucceeded)
- {
- ZEN_ERROR("ParallelWork::~ParallelWork(): waiting for latch failed, pending work count at {}", RemainingWork);
- }
- if (RemainingWork != 0)
- {
- void* Frames[8];
- uint32_t FrameCount = GetCallstack(2, 8, Frames);
- CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames);
- auto _ = MakeGuard([Callstack]() { FreeCallstack(Callstack); });
- ZEN_WARN("ParallelWork::~ParallelWork(): waited for outstanding work but pending work count is {} instead of 0", RemainingWork);
-
- uint32_t WaitedMs = 0;
- while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000)
- {
- Sleep(50);
- WaitedMs += 50;
- }
- ptrdiff_t RemainingWorkAfterSafetyWait = m_PendingWork.Remaining();
- if (RemainingWorkAfterSafetyWait != 0)
- {
- ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks failed, pending work count at {} after {}\n{}",
- RemainingWork,
- RemainingWorkAfterSafetyWait,
- NiceLatencyNs(WaitedMs * 1000000u),
- CallstackToString(Callstack, " "))
- }
- else
- {
- ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks completed after {}\n{}",
- RemainingWork,
- NiceLatencyNs(WaitedMs * 1000000u),
- CallstackToString(Callstack, " "));
- }
- }
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Exception in ParallelWork::~ParallelWork(): {}", Ex.what());
- }
-}
-
-ParallelWork::ExceptionCallback
-ParallelWork::DefaultErrorFunction()
-{
- return [&](std::exception_ptr Ex, std::atomic<bool>& AbortFlag) {
- m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); });
- AbortFlag = true;
- };
-}
-
-void
-ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback)
-{
- ZEN_ASSERT(!m_DispatchComplete);
- ZEN_ASSERT(m_PendingWork.Remaining() > 0);
- m_PendingWork.CountDown();
- m_DispatchComplete = true;
-
- while (!m_PendingWork.Wait(UpdateIntervalMS))
- {
- UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining());
- }
-
- RethrowErrors();
-}
-
-void
-ParallelWork::Wait()
-{
- ZEN_ASSERT(!m_DispatchComplete);
- ZEN_ASSERT(m_PendingWork.Remaining() > 0);
- m_PendingWork.CountDown();
- m_DispatchComplete = true;
-
- const bool WaitSucceeded = m_PendingWork.Wait();
- const ptrdiff_t RemainingWork = m_PendingWork.Remaining();
- if (!WaitSucceeded)
- {
- ZEN_ERROR("ParallelWork::Wait(): waiting for latch failed, pending work count at {}", RemainingWork);
- }
- else if (RemainingWork != 0)
- {
- ZEN_ERROR("ParallelWork::Wait(): pending work count at {} after successful wait for latch", RemainingWork);
- }
-
- RethrowErrors();
-}
-
-void
-ParallelWork::RethrowErrors()
-{
- if (!m_Errors.empty())
- {
- if (m_Errors.size() > 1)
- {
- ZEN_INFO("Multiple exceptions thrown during ParallelWork execution, dropping the following exceptions:");
- auto It = m_Errors.begin() + 1;
- while (It != m_Errors.end())
- {
- try
- {
- std::rethrow_exception(*It);
- }
- catch (const std::exception& Ex)
- {
- ZEN_INFO(" {}", Ex.what());
- }
- It++;
- }
- }
- std::exception_ptr Ex = m_Errors.front();
- m_Errors.clear();
- std::rethrow_exception(Ex);
- }
-}
-
-#if ZEN_WITH_TESTS
-
-TEST_CASE("parallellwork.nowork")
-{
- std::atomic<bool> AbortFlag;
- std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- Work.Wait();
-}
-
-TEST_CASE("parallellwork.basic")
-{
- WorkerThreadPool WorkerPool(2);
-
- std::atomic<bool> AbortFlag;
- std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- for (uint32_t I = 0; I < 5; I++)
- {
- Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); });
- }
- Work.Wait();
-}
-
-TEST_CASE("parallellwork.throws_in_work")
-{
- WorkerThreadPool WorkerPool(2);
-
- std::atomic<bool> AbortFlag;
- std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- for (uint32_t I = 0; I < 10; I++)
- {
- Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) {
- ZEN_UNUSED(AbortFlag);
- if (I > 3)
- {
- throw std::runtime_error("We throw in async thread");
- }
- else
- {
- Sleep(10);
- }
- });
- }
- CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread");
-}
-
-TEST_CASE("parallellwork.throws_in_dispatch")
-{
- WorkerThreadPool WorkerPool(2);
- std::atomic<uint32_t> ExecutedCount;
- try
- {
- std::atomic<bool> AbortFlag;
- std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- for (uint32_t I = 0; I < 5; I++)
- {
- Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) {
- if (AbortFlag.load())
- {
- return;
- }
- ExecutedCount++;
- });
- if (I == 3)
- {
- throw std::runtime_error("We throw in dispatcher thread");
- }
- }
- CHECK(false);
- }
- catch (const std::runtime_error& Ex)
- {
- CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what()));
- CHECK_LE(ExecutedCount.load(), 4);
- }
-}
-
-TEST_CASE("parallellwork.limitqueue")
-{
- WorkerThreadPool WorkerPool(2);
-
- std::atomic<bool> AbortFlag;
- std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
- for (uint32_t I = 0; I < 5; I++)
- {
- Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) {
- if (AbortFlag.load())
- {
- return;
- }
- Sleep(10);
- });
- }
- Work.Wait();
-}
-
-void
-parallellwork_forcelink()
-{
-}
-#endif // ZEN_WITH_TESTS
-
-} // namespace zen