From 4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 16 May 2025 19:51:36 +0200 Subject: parallel work handle dispatch exception (#400) - Bugfix: Wait for async threads if dispatching of work using ParallellWork throws exception --- src/zenutil/chunkedcontent.cpp | 52 ++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 27 deletions(-) (limited to 'src/zenutil/chunkedcontent.cpp') diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp index 17b348f8d..ae129324e 100644 --- a/src/zenutil/chunkedcontent.cpp +++ b/src/zenutil/chunkedcontent.cpp @@ -11,7 +11,7 @@ #include #include -#include +#include #include ZEN_THIRD_PARTY_INCLUDES_START @@ -378,7 +378,7 @@ GetFolderContent(GetFolderContentStatistics& Stats, std::function&& AcceptDirectory, std::function&& AcceptFile, WorkerThreadPool& WorkerPool, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function&& UpdateCallback, std::atomic& AbortFlag) { @@ -467,7 +467,7 @@ GetFolderContent(GetFolderContentStatistics& Stats, WorkerPool, PendingWork); PendingWork.CountDown(); - while (!PendingWork.Wait(UpdateInteralMS)) + while (!PendingWork.Wait(UpdateIntervalMS)) { UpdateCallback(AbortFlag.load(), PendingWork.Remaining()); } @@ -731,7 +731,7 @@ ChunkFolderContent(ChunkingStatistics& Stats, const std::filesystem::path& RootPath, const FolderContent& Content, const ChunkingController& InChunkingController, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function&& UpdateCallback, std::atomic& AbortFlag) { @@ -772,7 +772,7 @@ ChunkFolderContent(ChunkingStatistics& Stats, RwLock Lock; - ParallellWork Work(AbortFlag); + ParallelWork Work(AbortFlag); for (uint32_t PathIndex : Order) { @@ -780,28 +780,26 @@ ChunkFolderContent(ChunkingStatistics& Stats, { break; } - Work.ScheduleWork( - WorkerPool, // GetSyncWorkerPool() - [&, PathIndex](std::atomic& AbortFlag) { - if (!AbortFlag) - { - IoHash RawHash = HashOneFile(Stats, - InChunkingController, - Result, - ChunkHashToChunkIndex, - RawHashToSequenceRawHashIndex, - Lock, - RootPath, - PathIndex, - AbortFlag); - Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; }); - Stats.FilesProcessed++; - } - }, - Work.DefaultErrorFunction()); - } - - Work.Wait(UpdateInteralMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) { + Work.ScheduleWork(WorkerPool, // GetSyncWorkerPool() + [&, PathIndex](std::atomic& AbortFlag) { + if (!AbortFlag) + { + IoHash RawHash = HashOneFile(Stats, + InChunkingController, + Result, + ChunkHashToChunkIndex, + RawHashToSequenceRawHashIndex, + Lock, + RootPath, + PathIndex, + AbortFlag); + Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; }); + Stats.FilesProcessed++; + } + }); + } + + Work.Wait(UpdateIntervalMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted); ZEN_UNUSED(PendingWork); UpdateCallback(Work.IsAborted(), Work.PendingWork().Remaining()); -- cgit v1.2.3