diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-03 12:38:35 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-03 12:38:35 +0200 |
| commit | 5361ee1c77b68bb14237169660840d6d63a74892 (patch) | |
| tree | 3ad259133e09485a14506be38e43ec5b62a050f2 /src/zencore/include | |
| parent | move chunking code to zenremotestore lib (#545) (diff) | |
| download | zen-5361ee1c77b68bb14237169660840d6d63a74892.tar.xz zen-5361ee1c77b68bb14237169660840d6d63a74892.zip | |
remove zenutil dependency in zenremotestore (#547)
* remove dependency to zenutil/workerpools.h from remoteprojectstore.cpp
* remove dependency to zenutil/workerpools.h from buildstoragecache.cpp
* remove unneded include
* move jupiter helpers to zenremotestore
* move parallelwork to zencore
* remove zenutil dependency from zenremotestore
* clean up test project dependencies - use indirect dependencies
Diffstat (limited to 'src/zencore/include')
| -rw-r--r-- | src/zencore/include/zencore/parallelwork.h | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/src/zencore/include/zencore/parallelwork.h b/src/zencore/include/zencore/parallelwork.h new file mode 100644 index 000000000..05146d644 --- /dev/null +++ b/src/zencore/include/zencore/parallelwork.h @@ -0,0 +1,80 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/scopeguard.h> +#include <zencore/thread.h> +#include <zencore/workthreadpool.h> + +#include <atomic> + +namespace zen { + +class ParallelWork +{ +public: + ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode); + + ~ParallelWork(); + + typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback; + typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback; + typedef std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)> UpdateCallback; + + void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {}) + { + m_PendingWork.AddCount(1); + try + { + WorkerPool.ScheduleWork( + [this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { + auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); }); + try + { + while (m_PauseFlag && !m_AbortFlag) + { + Sleep(2000); + } + Work(m_AbortFlag); + } + catch (...) + { + OnError(std::current_exception(), m_AbortFlag); + } + }, + m_Mode); + } + catch (const std::exception&) + { + m_PendingWork.CountDown(); + throw; + } + } + + void Abort() { m_AbortFlag = true; } + + bool IsAborted() const { return m_AbortFlag.load(); } + + void Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback); + + void Wait(); + + Latch& PendingWork() { return m_PendingWork; } + +private: + ExceptionCallback DefaultErrorFunction(); + void RethrowErrors(); + + std::atomic<bool>& m_AbortFlag; + std::atomic<bool>& m_PauseFlag; + const WorkerThreadPool::EMode m_Mode; + bool m_DispatchComplete = false; + Latch m_PendingWork; + + RwLock m_ErrorLock; + std::vector<std::exception_ptr> m_Errors; +}; + +void parallellwork_forcelink(); + +} // namespace zen |