diff options
| author | Dan Engelbrecht <[email protected]> | 2025-05-16 19:51:36 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-05-16 19:51:36 +0200 |
| commit | 4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81 (patch) | |
| tree | 1fbab083b3fe8919a36caa2d925c933f696a5791 /src/zenutil | |
| parent | validate custom fields (#399) (diff) | |
| download | zen-4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81.tar.xz zen-4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81.zip | |
parallel work handle dispatch exception (#400)
- Bugfix: Wait for async threads if dispatching of work using ParallellWork throws exception
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/buildstoragecache.cpp | 4 | ||||
| -rw-r--r-- | src/zenutil/chunkedcontent.cpp | 52 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstoragecache.h | 2 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkedcontent.h | 4 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/parallellwork.h | 119 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/parallelwork.h | 71 | ||||
| -rw-r--r-- | src/zenutil/parallelwork.cpp | 192 | ||||
| -rw-r--r-- | src/zenutil/zenutil.cpp | 2 |
8 files changed, 295 insertions, 151 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp index f273ac699..88238effd 100644 --- a/src/zenutil/buildstoragecache.cpp +++ b/src/zenutil/buildstoragecache.cpp @@ -338,7 +338,7 @@ public: return {}; } - virtual void Flush(int32_t UpdateInteralMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override + virtual void Flush(int32_t UpdateIntervalMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override { if (IsFlushed) { @@ -358,7 +358,7 @@ public: intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining(); if (UpdateCallback(Remaining)) { - if (m_PendingBackgroundWorkCount.Wait(UpdateInteralMS)) + if (m_PendingBackgroundWorkCount.Wait(UpdateIntervalMS)) { UpdateCallback(0); return; diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp index 17b348f8d..ae129324e 100644 --- a/src/zenutil/chunkedcontent.cpp +++ b/src/zenutil/chunkedcontent.cpp @@ -11,7 +11,7 @@ #include <zenutil/chunkedfile.h> #include <zenutil/chunkingcontroller.h> -#include <zenutil/parallellwork.h> +#include <zenutil/parallelwork.h> #include <zenutil/workerpools.h> ZEN_THIRD_PARTY_INCLUDES_START @@ -378,7 +378,7 @@ GetFolderContent(GetFolderContentStatistics& Stats, std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory, std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile, WorkerThreadPool& WorkerPool, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, std::atomic<bool>& AbortFlag) { @@ -467,7 +467,7 @@ GetFolderContent(GetFolderContentStatistics& Stats, WorkerPool, PendingWork); PendingWork.CountDown(); - while (!PendingWork.Wait(UpdateInteralMS)) + while (!PendingWork.Wait(UpdateIntervalMS)) { UpdateCallback(AbortFlag.load(), PendingWork.Remaining()); } @@ -731,7 +731,7 @@ ChunkFolderContent(ChunkingStatistics& Stats, const std::filesystem::path& RootPath, const FolderContent& Content, const ChunkingController& InChunkingController, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, std::atomic<bool>& AbortFlag) { @@ -772,7 +772,7 @@ ChunkFolderContent(ChunkingStatistics& Stats, RwLock Lock; - ParallellWork Work(AbortFlag); + ParallelWork Work(AbortFlag); for (uint32_t PathIndex : Order) { @@ -780,28 +780,26 @@ ChunkFolderContent(ChunkingStatistics& Stats, { break; } - Work.ScheduleWork( - WorkerPool, // GetSyncWorkerPool() - [&, PathIndex](std::atomic<bool>& AbortFlag) { - if (!AbortFlag) - { - IoHash RawHash = HashOneFile(Stats, - InChunkingController, - Result, - ChunkHashToChunkIndex, - RawHashToSequenceRawHashIndex, - Lock, - RootPath, - PathIndex, - AbortFlag); - Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; }); - Stats.FilesProcessed++; - } - }, - Work.DefaultErrorFunction()); - } - - Work.Wait(UpdateInteralMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) { + Work.ScheduleWork(WorkerPool, // GetSyncWorkerPool() + [&, PathIndex](std::atomic<bool>& AbortFlag) { + if (!AbortFlag) + { + IoHash RawHash = HashOneFile(Stats, + InChunkingController, + Result, + ChunkHashToChunkIndex, + RawHashToSequenceRawHashIndex, + Lock, + RootPath, + PathIndex, + AbortFlag); + Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; }); + Stats.FilesProcessed++; + } + }); + } + + Work.Wait(UpdateIntervalMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted); ZEN_UNUSED(PendingWork); UpdateCallback(Work.IsAborted(), Work.PendingWork().Remaining()); diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h index cab35328d..e1fb73fd4 100644 --- a/src/zenutil/include/zenutil/buildstoragecache.h +++ b/src/zenutil/include/zenutil/buildstoragecache.h @@ -44,7 +44,7 @@ public: virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; virtual void Flush( - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0; }; diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h index 57b55cb8e..d33869be2 100644 --- a/src/zenutil/include/zenutil/chunkedcontent.h +++ b/src/zenutil/include/zenutil/chunkedcontent.h @@ -67,7 +67,7 @@ FolderContent GetFolderContent(GetFolderContentStatistics& Stats, std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory, std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile, WorkerThreadPool& WorkerPool, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, std::atomic<bool>& AbortFlag); @@ -116,7 +116,7 @@ ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats, const std::filesystem::path& RootPath, const FolderContent& Content, const ChunkingController& InChunkingController, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, std::atomic<bool>& AbortFlag); diff --git a/src/zenutil/include/zenutil/parallellwork.h b/src/zenutil/include/zenutil/parallellwork.h deleted file mode 100644 index 8ea77c65d..000000000 --- a/src/zenutil/include/zenutil/parallellwork.h +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/except.h> -#include <zencore/fmtutils.h> -#include <zencore/thread.h> -#include <zencore/workthreadpool.h> - -#include <atomic> - -class ParallellWorkException : public std::runtime_error -{ -public: - explicit ParallellWorkException(std::vector<std::string>&& Errors) : std::runtime_error(Errors.front()), m_Errors(std::move(Errors)) {} - - const std::vector<std::string> m_Errors; -}; - -namespace zen { - -class ParallellWork -{ -public: - ParallellWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) {} - - ~ParallellWork() - { - // Make sure to call Wait before destroying - ZEN_ASSERT(m_PendingWork.Remaining() == 0); - } - - std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)> DefaultErrorFunction() - { - return [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex.what()); }); - AbortFlag = true; - }; - } - - void ScheduleWork(WorkerThreadPool& WorkerPool, - std::function<void(std::atomic<bool>& AbortFlag)>&& Work, - std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)>&& OnError) - { - m_PendingWork.AddCount(1); - try - { - WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = std::move(OnError)] { - try - { - Work(m_AbortFlag); - } - catch (const AssertException& AssertEx) - { - OnError( - std::runtime_error(fmt::format("Caught assert exception while handling request: {}", AssertEx.FullDescription())), - m_AbortFlag); - } - catch (const std::system_error& SystemError) - { - if (IsOOM(SystemError.code())) - { - OnError(std::runtime_error(fmt::format("Out of memory. Reason: {}", SystemError.what())), m_AbortFlag); - } - else if (IsOOD(SystemError.code())) - { - OnError(std::runtime_error(fmt::format("Out of disk. Reason: {}", SystemError.what())), m_AbortFlag); - } - else - { - OnError(std::runtime_error(fmt::format("System error. Reason: {}", SystemError.what())), m_AbortFlag); - } - } - catch (const std::exception& Ex) - { - OnError(Ex, m_AbortFlag); - } - m_PendingWork.CountDown(); - }); - } - catch (const std::exception&) - { - m_PendingWork.CountDown(); - throw; - } - } - - void Abort() { m_AbortFlag = true; } - - bool IsAborted() const { return m_AbortFlag.load(); } - - void Wait(int32_t UpdateInteralMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback) - { - ZEN_ASSERT(m_PendingWork.Remaining() > 0); - m_PendingWork.CountDown(); - while (!m_PendingWork.Wait(UpdateInteralMS)) - { - UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining()); - } - if (m_Errors.size() == 1) - { - throw std::runtime_error(m_Errors.front()); - } - else if (m_Errors.size() > 1) - { - throw ParallellWorkException(std::move(m_Errors)); - } - } - Latch& PendingWork() { return m_PendingWork; } - -private: - std::atomic<bool>& m_AbortFlag; - Latch m_PendingWork; - - RwLock m_ErrorLock; - std::vector<std::string> m_Errors; -}; - -} // namespace zen diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h new file mode 100644 index 000000000..08e730b28 --- /dev/null +++ b/src/zenutil/include/zenutil/parallelwork.h @@ -0,0 +1,71 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/thread.h> +#include <zencore/workthreadpool.h> + +#include <atomic> + +namespace zen { + +class ParallelWork +{ +public: + ParallelWork(std::atomic<bool>& AbortFlag); + + ~ParallelWork(); + + typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback; + typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback; + typedef std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)> UpdateCallback; + + void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {}) + { + m_PendingWork.AddCount(1); + try + { + WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { + try + { + Work(m_AbortFlag); + } + catch (...) + { + OnError(std::current_exception(), m_AbortFlag); + } + m_PendingWork.CountDown(); + }); + } + catch (const std::exception&) + { + m_PendingWork.CountDown(); + throw; + } + } + + void Abort() { m_AbortFlag = true; } + + bool IsAborted() const { return m_AbortFlag.load(); } + + void Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback); + + void Wait(); + + Latch& PendingWork() { return m_PendingWork; } + +private: + ExceptionCallback DefaultErrorFunction(); + void RethrowErrors(); + + std::atomic<bool>& m_AbortFlag; + bool m_DispatchComplete = false; + Latch m_PendingWork; + + RwLock m_ErrorLock; + std::vector<std::exception_ptr> m_Errors; +}; + +void parallellwork_forcelink(); + +} // namespace zen diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp new file mode 100644 index 000000000..516d70e28 --- /dev/null +++ b/src/zenutil/parallelwork.cpp @@ -0,0 +1,192 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/parallelwork.h> + +#include <zencore/except.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> + +#include <typeinfo> + +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +#endif // ZEN_WITH_TESTS + +namespace zen { + +ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) +{ +} + +ParallelWork::~ParallelWork() +{ + try + { + if (!m_DispatchComplete) + { + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + ZEN_WARN( + "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads " + "to complete"); + m_PendingWork.CountDown(); + } + m_AbortFlag.store(true); + m_PendingWork.Wait(); + ZEN_ASSERT(m_PendingWork.Remaining() == 0); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Exception in ~ParallelWork: {}", Ex.what()); + } +} + +ParallelWork::ExceptionCallback +ParallelWork::DefaultErrorFunction() +{ + return [&](std::exception_ptr Ex, std::atomic<bool>& AbortFlag) { + m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); }); + AbortFlag = true; + }; +} + +void +ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) +{ + ZEN_ASSERT(!m_DispatchComplete); + m_DispatchComplete = true; + + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + m_PendingWork.CountDown(); + + while (!m_PendingWork.Wait(UpdateIntervalMS)) + { + UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining()); + } + + RethrowErrors(); +} + +void +ParallelWork::Wait() +{ + ZEN_ASSERT(!m_DispatchComplete); + m_DispatchComplete = true; + + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + m_PendingWork.CountDown(); + m_PendingWork.Wait(); + + RethrowErrors(); +} + +void +ParallelWork::RethrowErrors() +{ + if (!m_Errors.empty()) + { + if (m_Errors.size() > 1) + { + ZEN_INFO("Multiple exceptions throwm during ParallelWork execution, dropping the following exceptions:"); + auto It = m_Errors.begin() + 1; + while (It != m_Errors.end()) + { + try + { + std::rethrow_exception(*It); + } + catch (const std::exception& Ex) + { + ZEN_INFO(" {}", Ex.what()); + } + It++; + } + } + std::exception_ptr Ex = m_Errors.front(); + m_Errors.clear(); + std::rethrow_exception(Ex); + } +} + +#if ZEN_WITH_TESTS + +TEST_CASE("parallellwork.nowork") +{ + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); + Work.Wait(); +} + +TEST_CASE("parallellwork.basic") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); }); + } + Work.Wait(); +} + +TEST_CASE("parallellwork.throws_in_work") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); + for (uint32_t I = 0; I < 10; I++) + { + Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) { + ZEN_UNUSED(AbortFlag); + if (I > 3) + { + throw std::runtime_error("We throw in async thread"); + } + else + { + Sleep(10); + } + }); + } + CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread"); +} + +TEST_CASE("parallellwork.throws_in_dispatch") +{ + WorkerThreadPool WorkerPool(2); + std::atomic<uint32_t> ExecutedCount; + try + { + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + ExecutedCount++; + }); + if (I == 3) + { + throw std::runtime_error("We throw in dispatcher thread"); + } + } + CHECK(false); + } + catch (const std::runtime_error& Ex) + { + CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what())); + CHECK_LE(ExecutedCount.load(), 4); + } +} + +void +parallellwork_forcelink() +{ +} +#endif // ZEN_WITH_TESTS + +} // namespace zen diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index aff9156f4..fe23b00c1 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -8,6 +8,7 @@ # include <zenutil/cache/rpcrecording.h> # include <zenutil/chunkedfile.h> # include <zenutil/commandlineoptions.h> +# include <zenutil/parallelwork.h> namespace zen { @@ -19,6 +20,7 @@ zenutil_forcelinktests() cacherequests_forcelink(); chunkedfile_forcelink(); commandlineoptions_forcelink(); + parallellwork_forcelink(); } } // namespace zen |