diff options
| author | Dan Engelbrecht <[email protected]> | 2025-05-16 19:51:36 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-05-16 19:51:36 +0200 |
| commit | 4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81 (patch) | |
| tree | 1fbab083b3fe8919a36caa2d925c933f696a5791 /src/zenutil/include | |
| parent | validate custom fields (#399) (diff) | |
| download | zen-4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81.tar.xz zen-4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81.zip | |
parallel work handle dispatch exception (#400)
- Bugfix: Wait for async threads if dispatching of work using ParallellWork throws exception
Diffstat (limited to 'src/zenutil/include')
| -rw-r--r-- | src/zenutil/include/zenutil/buildstoragecache.h | 2 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkedcontent.h | 4 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/parallellwork.h | 119 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/parallelwork.h | 71 |
4 files changed, 74 insertions, 122 deletions
diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h index cab35328d..e1fb73fd4 100644 --- a/src/zenutil/include/zenutil/buildstoragecache.h +++ b/src/zenutil/include/zenutil/buildstoragecache.h @@ -44,7 +44,7 @@ public: virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; virtual void Flush( - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0; }; diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h index 57b55cb8e..d33869be2 100644 --- a/src/zenutil/include/zenutil/chunkedcontent.h +++ b/src/zenutil/include/zenutil/chunkedcontent.h @@ -67,7 +67,7 @@ FolderContent GetFolderContent(GetFolderContentStatistics& Stats, std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory, std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile, WorkerThreadPool& WorkerPool, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, std::atomic<bool>& AbortFlag); @@ -116,7 +116,7 @@ ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats, const std::filesystem::path& RootPath, const FolderContent& Content, const ChunkingController& InChunkingController, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, std::atomic<bool>& AbortFlag); diff --git a/src/zenutil/include/zenutil/parallellwork.h b/src/zenutil/include/zenutil/parallellwork.h deleted file mode 100644 index 8ea77c65d..000000000 --- a/src/zenutil/include/zenutil/parallellwork.h +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/except.h> -#include <zencore/fmtutils.h> -#include <zencore/thread.h> -#include <zencore/workthreadpool.h> - -#include <atomic> - -class ParallellWorkException : public std::runtime_error -{ -public: - explicit ParallellWorkException(std::vector<std::string>&& Errors) : std::runtime_error(Errors.front()), m_Errors(std::move(Errors)) {} - - const std::vector<std::string> m_Errors; -}; - -namespace zen { - -class ParallellWork -{ -public: - ParallellWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) {} - - ~ParallellWork() - { - // Make sure to call Wait before destroying - ZEN_ASSERT(m_PendingWork.Remaining() == 0); - } - - std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)> DefaultErrorFunction() - { - return [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex.what()); }); - AbortFlag = true; - }; - } - - void ScheduleWork(WorkerThreadPool& WorkerPool, - std::function<void(std::atomic<bool>& AbortFlag)>&& Work, - std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)>&& OnError) - { - m_PendingWork.AddCount(1); - try - { - WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = std::move(OnError)] { - try - { - Work(m_AbortFlag); - } - catch (const AssertException& AssertEx) - { - OnError( - std::runtime_error(fmt::format("Caught assert exception while handling request: {}", AssertEx.FullDescription())), - m_AbortFlag); - } - catch (const std::system_error& SystemError) - { - if (IsOOM(SystemError.code())) - { - OnError(std::runtime_error(fmt::format("Out of memory. Reason: {}", SystemError.what())), m_AbortFlag); - } - else if (IsOOD(SystemError.code())) - { - OnError(std::runtime_error(fmt::format("Out of disk. Reason: {}", SystemError.what())), m_AbortFlag); - } - else - { - OnError(std::runtime_error(fmt::format("System error. Reason: {}", SystemError.what())), m_AbortFlag); - } - } - catch (const std::exception& Ex) - { - OnError(Ex, m_AbortFlag); - } - m_PendingWork.CountDown(); - }); - } - catch (const std::exception&) - { - m_PendingWork.CountDown(); - throw; - } - } - - void Abort() { m_AbortFlag = true; } - - bool IsAborted() const { return m_AbortFlag.load(); } - - void Wait(int32_t UpdateInteralMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback) - { - ZEN_ASSERT(m_PendingWork.Remaining() > 0); - m_PendingWork.CountDown(); - while (!m_PendingWork.Wait(UpdateInteralMS)) - { - UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining()); - } - if (m_Errors.size() == 1) - { - throw std::runtime_error(m_Errors.front()); - } - else if (m_Errors.size() > 1) - { - throw ParallellWorkException(std::move(m_Errors)); - } - } - Latch& PendingWork() { return m_PendingWork; } - -private: - std::atomic<bool>& m_AbortFlag; - Latch m_PendingWork; - - RwLock m_ErrorLock; - std::vector<std::string> m_Errors; -}; - -} // namespace zen diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h new file mode 100644 index 000000000..08e730b28 --- /dev/null +++ b/src/zenutil/include/zenutil/parallelwork.h @@ -0,0 +1,71 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/thread.h> +#include <zencore/workthreadpool.h> + +#include <atomic> + +namespace zen { + +class ParallelWork +{ +public: + ParallelWork(std::atomic<bool>& AbortFlag); + + ~ParallelWork(); + + typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback; + typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback; + typedef std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)> UpdateCallback; + + void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {}) + { + m_PendingWork.AddCount(1); + try + { + WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { + try + { + Work(m_AbortFlag); + } + catch (...) + { + OnError(std::current_exception(), m_AbortFlag); + } + m_PendingWork.CountDown(); + }); + } + catch (const std::exception&) + { + m_PendingWork.CountDown(); + throw; + } + } + + void Abort() { m_AbortFlag = true; } + + bool IsAborted() const { return m_AbortFlag.load(); } + + void Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback); + + void Wait(); + + Latch& PendingWork() { return m_PendingWork; } + +private: + ExceptionCallback DefaultErrorFunction(); + void RethrowErrors(); + + std::atomic<bool>& m_AbortFlag; + bool m_DispatchComplete = false; + Latch m_PendingWork; + + RwLock m_ErrorLock; + std::vector<std::exception_ptr> m_Errors; +}; + +void parallellwork_forcelink(); + +} // namespace zen |