aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-05-16 19:51:36 +0200
committerGitHub Enterprise <[email protected]>2025-05-16 19:51:36 +0200
commit4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81 (patch)
tree1fbab083b3fe8919a36caa2d925c933f696a5791 /src/zenutil
parentvalidate custom fields (#399) (diff)
downloadzen-4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81.tar.xz
zen-4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81.zip
parallel work handle dispatch exception (#400)
- Bugfix: Wait for async threads if dispatching of work using ParallellWork throws exception
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/buildstoragecache.cpp4
-rw-r--r--src/zenutil/chunkedcontent.cpp52
-rw-r--r--src/zenutil/include/zenutil/buildstoragecache.h2
-rw-r--r--src/zenutil/include/zenutil/chunkedcontent.h4
-rw-r--r--src/zenutil/include/zenutil/parallellwork.h119
-rw-r--r--src/zenutil/include/zenutil/parallelwork.h71
-rw-r--r--src/zenutil/parallelwork.cpp192
-rw-r--r--src/zenutil/zenutil.cpp2
8 files changed, 295 insertions, 151 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp
index f273ac699..88238effd 100644
--- a/src/zenutil/buildstoragecache.cpp
+++ b/src/zenutil/buildstoragecache.cpp
@@ -338,7 +338,7 @@ public:
return {};
}
- virtual void Flush(int32_t UpdateInteralMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override
+ virtual void Flush(int32_t UpdateIntervalMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override
{
if (IsFlushed)
{
@@ -358,7 +358,7 @@ public:
intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining();
if (UpdateCallback(Remaining))
{
- if (m_PendingBackgroundWorkCount.Wait(UpdateInteralMS))
+ if (m_PendingBackgroundWorkCount.Wait(UpdateIntervalMS))
{
UpdateCallback(0);
return;
diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp
index 17b348f8d..ae129324e 100644
--- a/src/zenutil/chunkedcontent.cpp
+++ b/src/zenutil/chunkedcontent.cpp
@@ -11,7 +11,7 @@
#include <zenutil/chunkedfile.h>
#include <zenutil/chunkingcontroller.h>
-#include <zenutil/parallellwork.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/workerpools.h>
ZEN_THIRD_PARTY_INCLUDES_START
@@ -378,7 +378,7 @@ GetFolderContent(GetFolderContentStatistics& Stats,
std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory,
std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile,
WorkerThreadPool& WorkerPool,
- int32_t UpdateInteralMS,
+ int32_t UpdateIntervalMS,
std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
std::atomic<bool>& AbortFlag)
{
@@ -467,7 +467,7 @@ GetFolderContent(GetFolderContentStatistics& Stats,
WorkerPool,
PendingWork);
PendingWork.CountDown();
- while (!PendingWork.Wait(UpdateInteralMS))
+ while (!PendingWork.Wait(UpdateIntervalMS))
{
UpdateCallback(AbortFlag.load(), PendingWork.Remaining());
}
@@ -731,7 +731,7 @@ ChunkFolderContent(ChunkingStatistics& Stats,
const std::filesystem::path& RootPath,
const FolderContent& Content,
const ChunkingController& InChunkingController,
- int32_t UpdateInteralMS,
+ int32_t UpdateIntervalMS,
std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
std::atomic<bool>& AbortFlag)
{
@@ -772,7 +772,7 @@ ChunkFolderContent(ChunkingStatistics& Stats,
RwLock Lock;
- ParallellWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag);
for (uint32_t PathIndex : Order)
{
@@ -780,28 +780,26 @@ ChunkFolderContent(ChunkingStatistics& Stats,
{
break;
}
- Work.ScheduleWork(
- WorkerPool, // GetSyncWorkerPool()
- [&, PathIndex](std::atomic<bool>& AbortFlag) {
- if (!AbortFlag)
- {
- IoHash RawHash = HashOneFile(Stats,
- InChunkingController,
- Result,
- ChunkHashToChunkIndex,
- RawHashToSequenceRawHashIndex,
- Lock,
- RootPath,
- PathIndex,
- AbortFlag);
- Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; });
- Stats.FilesProcessed++;
- }
- },
- Work.DefaultErrorFunction());
- }
-
- Work.Wait(UpdateInteralMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
+ Work.ScheduleWork(WorkerPool, // GetSyncWorkerPool()
+ [&, PathIndex](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ IoHash RawHash = HashOneFile(Stats,
+ InChunkingController,
+ Result,
+ ChunkHashToChunkIndex,
+ RawHashToSequenceRawHashIndex,
+ Lock,
+ RootPath,
+ PathIndex,
+ AbortFlag);
+ Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; });
+ Stats.FilesProcessed++;
+ }
+ });
+ }
+
+ Work.Wait(UpdateIntervalMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted);
ZEN_UNUSED(PendingWork);
UpdateCallback(Work.IsAborted(), Work.PendingWork().Remaining());
diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h
index cab35328d..e1fb73fd4 100644
--- a/src/zenutil/include/zenutil/buildstoragecache.h
+++ b/src/zenutil/include/zenutil/buildstoragecache.h
@@ -44,7 +44,7 @@ public:
virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
virtual void Flush(
- int32_t UpdateInteralMS,
+ int32_t UpdateIntervalMS,
std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0;
};
diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h
index 57b55cb8e..d33869be2 100644
--- a/src/zenutil/include/zenutil/chunkedcontent.h
+++ b/src/zenutil/include/zenutil/chunkedcontent.h
@@ -67,7 +67,7 @@ FolderContent GetFolderContent(GetFolderContentStatistics& Stats,
std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory,
std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile,
WorkerThreadPool& WorkerPool,
- int32_t UpdateInteralMS,
+ int32_t UpdateIntervalMS,
std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
std::atomic<bool>& AbortFlag);
@@ -116,7 +116,7 @@ ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats,
const std::filesystem::path& RootPath,
const FolderContent& Content,
const ChunkingController& InChunkingController,
- int32_t UpdateInteralMS,
+ int32_t UpdateIntervalMS,
std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
std::atomic<bool>& AbortFlag);
diff --git a/src/zenutil/include/zenutil/parallellwork.h b/src/zenutil/include/zenutil/parallellwork.h
deleted file mode 100644
index 8ea77c65d..000000000
--- a/src/zenutil/include/zenutil/parallellwork.h
+++ /dev/null
@@ -1,119 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/except.h>
-#include <zencore/fmtutils.h>
-#include <zencore/thread.h>
-#include <zencore/workthreadpool.h>
-
-#include <atomic>
-
-class ParallellWorkException : public std::runtime_error
-{
-public:
- explicit ParallellWorkException(std::vector<std::string>&& Errors) : std::runtime_error(Errors.front()), m_Errors(std::move(Errors)) {}
-
- const std::vector<std::string> m_Errors;
-};
-
-namespace zen {
-
-class ParallellWork
-{
-public:
- ParallellWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) {}
-
- ~ParallellWork()
- {
- // Make sure to call Wait before destroying
- ZEN_ASSERT(m_PendingWork.Remaining() == 0);
- }
-
- std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)> DefaultErrorFunction()
- {
- return [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) {
- m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex.what()); });
- AbortFlag = true;
- };
- }
-
- void ScheduleWork(WorkerThreadPool& WorkerPool,
- std::function<void(std::atomic<bool>& AbortFlag)>&& Work,
- std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)>&& OnError)
- {
- m_PendingWork.AddCount(1);
- try
- {
- WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = std::move(OnError)] {
- try
- {
- Work(m_AbortFlag);
- }
- catch (const AssertException& AssertEx)
- {
- OnError(
- std::runtime_error(fmt::format("Caught assert exception while handling request: {}", AssertEx.FullDescription())),
- m_AbortFlag);
- }
- catch (const std::system_error& SystemError)
- {
- if (IsOOM(SystemError.code()))
- {
- OnError(std::runtime_error(fmt::format("Out of memory. Reason: {}", SystemError.what())), m_AbortFlag);
- }
- else if (IsOOD(SystemError.code()))
- {
- OnError(std::runtime_error(fmt::format("Out of disk. Reason: {}", SystemError.what())), m_AbortFlag);
- }
- else
- {
- OnError(std::runtime_error(fmt::format("System error. Reason: {}", SystemError.what())), m_AbortFlag);
- }
- }
- catch (const std::exception& Ex)
- {
- OnError(Ex, m_AbortFlag);
- }
- m_PendingWork.CountDown();
- });
- }
- catch (const std::exception&)
- {
- m_PendingWork.CountDown();
- throw;
- }
- }
-
- void Abort() { m_AbortFlag = true; }
-
- bool IsAborted() const { return m_AbortFlag.load(); }
-
- void Wait(int32_t UpdateInteralMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback)
- {
- ZEN_ASSERT(m_PendingWork.Remaining() > 0);
- m_PendingWork.CountDown();
- while (!m_PendingWork.Wait(UpdateInteralMS))
- {
- UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining());
- }
- if (m_Errors.size() == 1)
- {
- throw std::runtime_error(m_Errors.front());
- }
- else if (m_Errors.size() > 1)
- {
- throw ParallellWorkException(std::move(m_Errors));
- }
- }
- Latch& PendingWork() { return m_PendingWork; }
-
-private:
- std::atomic<bool>& m_AbortFlag;
- Latch m_PendingWork;
-
- RwLock m_ErrorLock;
- std::vector<std::string> m_Errors;
-};
-
-} // namespace zen
diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h
new file mode 100644
index 000000000..08e730b28
--- /dev/null
+++ b/src/zenutil/include/zenutil/parallelwork.h
@@ -0,0 +1,71 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/thread.h>
+#include <zencore/workthreadpool.h>
+
+#include <atomic>
+
+namespace zen {
+
+class ParallelWork
+{
+public:
+ ParallelWork(std::atomic<bool>& AbortFlag);
+
+ ~ParallelWork();
+
+ typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback;
+ typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback;
+ typedef std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)> UpdateCallback;
+
+ void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {})
+ {
+ m_PendingWork.AddCount(1);
+ try
+ {
+ WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] {
+ try
+ {
+ Work(m_AbortFlag);
+ }
+ catch (...)
+ {
+ OnError(std::current_exception(), m_AbortFlag);
+ }
+ m_PendingWork.CountDown();
+ });
+ }
+ catch (const std::exception&)
+ {
+ m_PendingWork.CountDown();
+ throw;
+ }
+ }
+
+ void Abort() { m_AbortFlag = true; }
+
+ bool IsAborted() const { return m_AbortFlag.load(); }
+
+ void Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback);
+
+ void Wait();
+
+ Latch& PendingWork() { return m_PendingWork; }
+
+private:
+ ExceptionCallback DefaultErrorFunction();
+ void RethrowErrors();
+
+ std::atomic<bool>& m_AbortFlag;
+ bool m_DispatchComplete = false;
+ Latch m_PendingWork;
+
+ RwLock m_ErrorLock;
+ std::vector<std::exception_ptr> m_Errors;
+};
+
+void parallellwork_forcelink();
+
+} // namespace zen
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
new file mode 100644
index 000000000..516d70e28
--- /dev/null
+++ b/src/zenutil/parallelwork.cpp
@@ -0,0 +1,192 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/parallelwork.h>
+
+#include <zencore/except.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+
+#include <typeinfo>
+
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+#endif // ZEN_WITH_TESTS
+
+namespace zen {
+
+ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1)
+{
+}
+
+ParallelWork::~ParallelWork()
+{
+ try
+ {
+ if (!m_DispatchComplete)
+ {
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ ZEN_WARN(
+ "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads "
+ "to complete");
+ m_PendingWork.CountDown();
+ }
+ m_AbortFlag.store(true);
+ m_PendingWork.Wait();
+ ZEN_ASSERT(m_PendingWork.Remaining() == 0);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in ~ParallelWork: {}", Ex.what());
+ }
+}
+
+ParallelWork::ExceptionCallback
+ParallelWork::DefaultErrorFunction()
+{
+ return [&](std::exception_ptr Ex, std::atomic<bool>& AbortFlag) {
+ m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); });
+ AbortFlag = true;
+ };
+}
+
+void
+ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback)
+{
+ ZEN_ASSERT(!m_DispatchComplete);
+ m_DispatchComplete = true;
+
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ m_PendingWork.CountDown();
+
+ while (!m_PendingWork.Wait(UpdateIntervalMS))
+ {
+ UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining());
+ }
+
+ RethrowErrors();
+}
+
+void
+ParallelWork::Wait()
+{
+ ZEN_ASSERT(!m_DispatchComplete);
+ m_DispatchComplete = true;
+
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ m_PendingWork.CountDown();
+ m_PendingWork.Wait();
+
+ RethrowErrors();
+}
+
+void
+ParallelWork::RethrowErrors()
+{
+ if (!m_Errors.empty())
+ {
+ if (m_Errors.size() > 1)
+ {
+ ZEN_INFO("Multiple exceptions throwm during ParallelWork execution, dropping the following exceptions:");
+ auto It = m_Errors.begin() + 1;
+ while (It != m_Errors.end())
+ {
+ try
+ {
+ std::rethrow_exception(*It);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_INFO(" {}", Ex.what());
+ }
+ It++;
+ }
+ }
+ std::exception_ptr Ex = m_Errors.front();
+ m_Errors.clear();
+ std::rethrow_exception(Ex);
+ }
+}
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("parallellwork.nowork")
+{
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ Work.Wait();
+}
+
+TEST_CASE("parallellwork.basic")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); });
+ }
+ Work.Wait();
+}
+
+TEST_CASE("parallellwork.throws_in_work")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ for (uint32_t I = 0; I < 10; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) {
+ ZEN_UNUSED(AbortFlag);
+ if (I > 3)
+ {
+ throw std::runtime_error("We throw in async thread");
+ }
+ else
+ {
+ Sleep(10);
+ }
+ });
+ }
+ CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread");
+}
+
+TEST_CASE("parallellwork.throws_in_dispatch")
+{
+ WorkerThreadPool WorkerPool(2);
+ std::atomic<uint32_t> ExecutedCount;
+ try
+ {
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ ExecutedCount++;
+ });
+ if (I == 3)
+ {
+ throw std::runtime_error("We throw in dispatcher thread");
+ }
+ }
+ CHECK(false);
+ }
+ catch (const std::runtime_error& Ex)
+ {
+ CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what()));
+ CHECK_LE(ExecutedCount.load(), 4);
+ }
+}
+
+void
+parallellwork_forcelink()
+{
+}
+#endif // ZEN_WITH_TESTS
+
+} // namespace zen
diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp
index aff9156f4..fe23b00c1 100644
--- a/src/zenutil/zenutil.cpp
+++ b/src/zenutil/zenutil.cpp
@@ -8,6 +8,7 @@
# include <zenutil/cache/rpcrecording.h>
# include <zenutil/chunkedfile.h>
# include <zenutil/commandlineoptions.h>
+# include <zenutil/parallelwork.h>
namespace zen {
@@ -19,6 +20,7 @@ zenutil_forcelinktests()
cacherequests_forcelink();
chunkedfile_forcelink();
commandlineoptions_forcelink();
+ parallellwork_forcelink();
}
} // namespace zen