diff options
Diffstat (limited to 'src/zencore')
| -rw-r--r-- | src/zencore/filesystem.cpp | 42 | ||||
| -rw-r--r-- | src/zencore/include/zencore/workthreadpool.h | 23 | ||||
| -rw-r--r-- | src/zencore/jobqueue.cpp | 68 | ||||
| -rw-r--r-- | src/zencore/workthreadpool.cpp | 151 | ||||
| -rw-r--r-- | src/zencore/zencore.cpp | 7 |
5 files changed, 171 insertions, 120 deletions
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 8327838c9..5125beeca 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -2439,26 +2439,28 @@ GetDirectoryContent(const std::filesystem::path& RootDir, PendingWorkCount.AddCount(1); try { - WorkerPool.ScheduleWork([WorkerPool = &WorkerPool, - PendingWorkCount = &PendingWorkCount, - Visitor = Visitor, - Flags = Flags, - Path = std::move(Path), - RelativeRoot = RelativeRoot / DirectoryName]() { - ZEN_ASSERT(Visitor); - auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); }); - try - { - MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor); - FileSystemTraversal Traversal; - Traversal.TraverseFileSystem(Path, SubVisitor); - Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content)); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Failed scheduling work to scan subfolder '{}'. Reason: '{}'", Path / RelativeRoot, Ex.what()); - } - }); + WorkerPool.ScheduleWork( + [WorkerPool = &WorkerPool, + PendingWorkCount = &PendingWorkCount, + Visitor = Visitor, + Flags = Flags, + Path = std::move(Path), + RelativeRoot = RelativeRoot / DirectoryName]() { + ZEN_ASSERT(Visitor); + auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); }); + try + { + MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor); + FileSystemTraversal Traversal; + Traversal.TraverseFileSystem(Path, SubVisitor); + Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content)); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed scheduling work to scan subfolder '{}'. Reason: '{}'", Path / RelativeRoot, Ex.what()); + } + }, + WorkerThreadPool::EMode::DisableBacklog); } catch (const std::exception Ex) { diff --git a/src/zencore/include/zencore/workthreadpool.h b/src/zencore/include/zencore/workthreadpool.h index 62356495c..4c38dd651 100644 --- a/src/zencore/include/zencore/workthreadpool.h +++ b/src/zencore/include/zencore/workthreadpool.h @@ -18,11 +18,7 @@ struct IWork : public RefCounted { virtual void Execute() = 0; - inline std::exception_ptr GetException() { return m_Exception; } - private: - std::exception_ptr m_Exception; - friend class WorkerThreadPool; }; @@ -35,13 +31,18 @@ public: WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName); ~WorkerThreadPool(); - void ScheduleWork(Ref<IWork> Work); - void ScheduleWork(std::function<void()>&& Work); + // Decides what to do if there are no free workers in the pool when the work is submitted + enum class EMode + { + EnableBacklog, // The work will be added to a backlog of work to do + DisableBacklog // The work will be executed synchronously in the caller thread + }; + + void ScheduleWork(Ref<IWork> Work, EMode Mode); + void ScheduleWork(std::function<void()>&& Work, EMode Mode); template<typename Func> - auto EnqueueTask(std::packaged_task<Func> Task); - - [[nodiscard]] size_t PendingWorkItemCount() const; + auto EnqueueTask(std::packaged_task<Func> Task, EMode Mode); private: struct Impl; @@ -54,7 +55,7 @@ private: template<typename Func> auto -WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task) +WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task, EMode Mode) { struct FutureWork : IWork { @@ -67,7 +68,7 @@ WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task) Ref<FutureWork> Work{new FutureWork(std::move(Task))}; auto Future = Work->m_Task.get_future(); - ScheduleWork(std::move(Work)); + ScheduleWork(std::move(Work), Mode); return Future; } diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp index 5d727b69c..4aa8c113e 100644 --- a/src/zencore/jobqueue.cpp +++ b/src/zencore/jobqueue.cpp @@ -109,10 +109,12 @@ public: WorkerCounter.AddCount(1); try { - WorkerPool.ScheduleWork([&]() { - auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); }); - Worker(); - }); + WorkerPool.ScheduleWork( + [&]() { + auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); }); + Worker(); + }, + WorkerThreadPool::EMode::EnableBacklog); return {.Id = NewJobId}; } catch (const std::exception& Ex) @@ -466,34 +468,36 @@ TEST_CASE("JobQueue") for (uint32_t I = 0; I < 100; I++) { JobsLatch.AddCount(1); - Pool.ScheduleWork([&Queue, &JobsLatch, I]() { - auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); - JobsLatch.AddCount(1); - auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) { - auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); - if (Context.IsCancelled()) - { - return; - } - Context.ReportProgress("going to sleep", "", 100, 100); - Sleep(10); - if (Context.IsCancelled()) - { - return; - } - Context.ReportProgress("going to sleep again", "", 100, 50); - if ((I & 0xFF) == 0x10) - { - zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I)); - } - Sleep(10); - if (Context.IsCancelled()) - { - return; - } - Context.ReportProgress("done", "", 100, 0); - }); - }); + Pool.ScheduleWork( + [&Queue, &JobsLatch, I]() { + auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); + JobsLatch.AddCount(1); + auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) { + auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); + if (Context.IsCancelled()) + { + return; + } + Context.ReportProgress("going to sleep", "", 100, 100); + Sleep(10); + if (Context.IsCancelled()) + { + return; + } + Context.ReportProgress("going to sleep again", "", 100, 50); + if ((I & 0xFF) == 0x10) + { + zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I)); + } + Sleep(10); + if (Context.IsCancelled()) + { + return; + } + Context.ReportProgress("done", "", 100, 0); + }); + }, + WorkerThreadPool::EMode::EnableBacklog); } auto Join = [](std::span<std::string> Strings, std::string_view Delimiter) -> std::string { diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index 445fe939e..e241c0de8 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -5,6 +5,7 @@ #include <zencore/blockingqueue.h> #include <zencore/except.h> #include <zencore/logging.h> +#include <zencore/scopeguard.h> #include <zencore/string.h> #include <zencore/testing.h> #include <zencore/thread.h> @@ -13,6 +14,10 @@ #include <thread> #include <vector> +ZEN_THIRD_PARTY_INCLUDES_START +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + #define ZEN_USE_WINDOWS_THREADPOOL 1 #if ZEN_PLATFORM_WINDOWS && ZEN_USE_WINDOWS_THREADPOOL @@ -41,18 +46,23 @@ namespace { struct WorkerThreadPool::Impl { + const int m_ThreadCount = 0; PTP_POOL m_ThreadPool = nullptr; PTP_CLEANUP_GROUP m_CleanupGroup = nullptr; TP_CALLBACK_ENVIRON m_CallbackEnvironment; PTP_WORK m_Work = nullptr; - std::string m_WorkerThreadBaseName; - std::atomic<int> m_WorkerThreadCounter{0}; + std::string m_WorkerThreadBaseName; + std::atomic<size_t> m_WorkerThreadCounter{0}; + std::atomic<int> m_FreeWorkerCount{0}; - RwLock m_QueueLock; + mutable RwLock m_QueueLock; std::deque<Ref<IWork>> m_WorkQueue; - Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName) + Impl(int InThreadCount, std::string_view WorkerThreadBaseName) + : m_ThreadCount(InThreadCount) + , m_WorkerThreadBaseName(WorkerThreadBaseName) + , m_FreeWorkerCount(m_ThreadCount) { // Thread pool setup @@ -62,11 +72,11 @@ struct WorkerThreadPool::Impl ThrowLastError("CreateThreadpool failed"); } - if (!SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount)) + if (!SetThreadpoolThreadMinimum(m_ThreadPool, (DWORD)m_ThreadCount)) { ThrowLastError("SetThreadpoolThreadMinimum failed"); } - SetThreadpoolThreadMaximum(m_ThreadPool, InThreadCount * 2); + SetThreadpoolThreadMaximum(m_ThreadPool, (DWORD)m_ThreadCount); InitializeThreadpoolEnvironment(&m_CallbackEnvironment); @@ -93,12 +103,29 @@ struct WorkerThreadPool::Impl CloseThreadpool(m_ThreadPool); } - void ScheduleWork(Ref<IWork> Work) + [[nodiscard]] Ref<IWork> ScheduleWork(Ref<IWork> Work, WorkerThreadPool::EMode Mode) { - m_QueueLock.WithExclusiveLock([&] { m_WorkQueue.push_back(std::move(Work)); }); + if (Mode == WorkerThreadPool::EMode::DisableBacklog) + { + if (m_FreeWorkerCount <= 0) + { + return Work; + } + RwLock::ExclusiveLockScope _(m_QueueLock); + const int QueuedCount = gsl::narrow<int>(m_WorkQueue.size()); + if (QueuedCount >= m_FreeWorkerCount) + { + return Work; + } + m_WorkQueue.push_back(std::move(Work)); + } + else + { + m_QueueLock.WithExclusiveLock([&] { m_WorkQueue.push_back(std::move(Work)); }); + } SubmitThreadpoolWork(m_Work); + return {}; } - [[nodiscard]] size_t PendingWorkItemCount() const { return 0; } static VOID CALLBACK WorkCallback(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work) { @@ -109,10 +136,13 @@ struct WorkerThreadPool::Impl void DoWork() { + m_FreeWorkerCount--; + auto _ = MakeGuard([&]() { m_FreeWorkerCount++; }); + if (!t_IsThreadNamed) { t_IsThreadNamed = true; - const int ThreadIndex = ++m_WorkerThreadCounter; + const size_t ThreadIndex = ++m_WorkerThreadCounter; zen::ExtendableStringBuilder<128> ThreadName; ThreadName << m_WorkerThreadBaseName << "_" << ThreadIndex; SetCurrentThreadName(ThreadName); @@ -121,7 +151,7 @@ struct WorkerThreadPool::Impl Ref<IWork> WorkFromQueue; { - RwLock::ExclusiveLockScope _{m_QueueLock}; + RwLock::ExclusiveLockScope __{m_QueueLock}; WorkFromQueue = std::move(m_WorkQueue.front()); m_WorkQueue.pop_front(); } @@ -141,20 +171,25 @@ struct WorkerThreadPool::ThreadStartInfo struct WorkerThreadPool::Impl { + const int m_ThreadCount = 0; void WorkerThreadFunction(ThreadStartInfo Info); std::string m_WorkerThreadBaseName; std::vector<std::thread> m_WorkerThreads; BlockingQueue<Ref<IWork>> m_WorkQueue; + std::atomic<int> m_FreeWorkerCount{0}; - Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName) + Impl(int InThreadCount, std::string_view WorkerThreadBaseName) + : m_ThreadCount(InThreadCount) + , m_WorkerThreadBaseName(WorkerThreadBaseName) + , m_FreeWorkerCount(m_ThreadCount) { # if ZEN_WITH_TRACE trace::ThreadGroupBegin(m_WorkerThreadBaseName.c_str()); # endif - zen::Latch WorkerLatch{InThreadCount}; + zen::Latch WorkerLatch{m_ThreadCount}; - for (int i = 0; i < InThreadCount; ++i) + for (int i = 0; i < m_ThreadCount; ++i) { m_WorkerThreads.emplace_back(&Impl::WorkerThreadFunction, this, ThreadStartInfo{i + 1, &WorkerLatch}); } @@ -181,8 +216,23 @@ struct WorkerThreadPool::Impl m_WorkerThreads.clear(); } - void ScheduleWork(Ref<IWork> Work) { m_WorkQueue.Enqueue(std::move(Work)); } - [[nodiscard]] size_t PendingWorkItemCount() const { return m_WorkQueue.Size(); } + [[nodiscard]] Ref<IWork> ScheduleWork(Ref<IWork> Work, WorkerThreadPool::EMode Mode) + { + if (Mode == WorkerThreadPool::EMode::DisableBacklog) + { + if (m_FreeWorkerCount <= 0) + { + return Work; + } + const int QueuedCount = gsl::narrow<int>(m_WorkQueue.Size()); + if (QueuedCount >= m_FreeWorkerCount) + { + return Work; + } + } + m_WorkQueue.Enqueue(std::move(Work)); + return {}; + } }; void @@ -197,21 +247,23 @@ WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info) Ref<IWork> Work; if (m_WorkQueue.WaitAndDequeue(Work)) { + m_FreeWorkerCount--; + auto _ = MakeGuard([&]() { m_FreeWorkerCount++; }); + try { ZEN_TRACE_CPU_FLUSH("AsyncWork"); Work->Execute(); + Work = {}; } catch (const AssertException& Ex) { - Work->m_Exception = std::current_exception(); - + Work = {}; ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription()); } catch (const std::exception& e) { - Work->m_Exception = std::current_exception(); - + Work = {}; ZEN_WARN("Caught exception in worker thread: {}", e.what()); } } @@ -243,48 +295,38 @@ WorkerThreadPool::~WorkerThreadPool() } void -WorkerThreadPool::ScheduleWork(Ref<IWork> Work) +WorkerThreadPool::ScheduleWork(Ref<IWork> Work, EMode Mode) { if (m_Impl) { - m_Impl->ScheduleWork(std::move(Work)); - } - else - { - try + if (Work = m_Impl->ScheduleWork(std::move(Work), Mode); !Work) { - ZEN_TRACE_CPU_FLUSH("SyncWork"); - Work->Execute(); - } - catch (const AssertException& Ex) - { - Work->m_Exception = std::current_exception(); - - ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription()); + return; } - catch (const std::exception& e) - { - Work->m_Exception = std::current_exception(); + } - ZEN_WARN("Caught exception when executing worker synchronously: {}", e.what()); - } + try + { + ZEN_TRACE_CPU_FLUSH("SyncWork"); + Work->Execute(); + Work = {}; + } + catch (const AssertException& Ex) + { + Work = {}; + ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription()); + } + catch (const std::exception& e) + { + Work = {}; + ZEN_WARN("Caught exception when executing worker synchronously: {}", e.what()); } } void -WorkerThreadPool::ScheduleWork(std::function<void()>&& Work) -{ - ScheduleWork(Ref<IWork>(new detail::LambdaWork(std::move(Work)))); -} - -[[nodiscard]] size_t -WorkerThreadPool::PendingWorkItemCount() const +WorkerThreadPool::ScheduleWork(std::function<void()>&& Work, EMode Mode) { - if (m_Impl) - { - return m_Impl->PendingWorkItemCount(); - } - return 0; + ScheduleWork(Ref<IWork>(new detail::LambdaWork(std::move(Work))), Mode); } ////////////////////////////////////////////////////////////////////////// @@ -302,9 +344,10 @@ TEST_CASE("threadpool.basic") { WorkerThreadPool Threadpool{1}; - auto Future42 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 42; }}); - auto Future99 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 99; }}); - auto FutureThrow = Threadpool.EnqueueTask(std::packaged_task<void()>{[] { throw std::runtime_error("meep!"); }}); + auto Future42 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 42; }}, WorkerThreadPool::EMode::EnableBacklog); + auto Future99 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 99; }}, WorkerThreadPool::EMode::EnableBacklog); + auto FutureThrow = Threadpool.EnqueueTask(std::packaged_task<void()>{[] { throw std::runtime_error("meep!"); }}, + WorkerThreadPool::EMode::EnableBacklog); CHECK_EQ(Future42.get(), 42); CHECK_EQ(Future99.get(), 99); diff --git a/src/zencore/zencore.cpp b/src/zencore/zencore.cpp index 51e06ae14..5a6232318 100644 --- a/src/zencore/zencore.cpp +++ b/src/zencore/zencore.cpp @@ -350,9 +350,10 @@ TEST_CASE("Assert.Callstack") WorkerThreadPool Pool(1); auto Task = Pool.EnqueueTask(std::packaged_task<int()>{[] { - ZEN_ASSERT(false); - return 1; - }}); + ZEN_ASSERT(false); + return 1; + }}, + WorkerThreadPool::EMode::EnableBacklog); try { |