From 14f4baabfa56eedb7b71235b951e993da4f641ef Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 15 May 2023 17:55:52 +0200 Subject: all threads should be named (#304) * added WorkerThreadPool naming, packaged_task support * name the http.sys thread pool service threads * added http.sys I/O threadpool naming * upstream cache I/O thread naming --- src/zencore/workthreadpool.cpp | 221 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 196 insertions(+), 25 deletions(-) (limited to 'src/zencore/workthreadpool.cpp') diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index b4328cdbd..cc21e717a 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -2,7 +2,21 @@ #include +#include #include +#include +#include +#include +#include + +#include +#include + +#define ZEN_USE_WINDOWS_THREADPOOL 1 + +#if ZEN_PLATFORM_WINDOWS && ZEN_USE_WINDOWS_THREADPOOL +# include +#endif namespace zen { @@ -16,47 +30,143 @@ namespace detail { }; } // namespace detail -WorkerThreadPool::WorkerThreadPool(int InThreadCount) +////////////////////////////////////////////////////////////////////////// + +#if ZEN_USE_WINDOWS_THREADPOOL && ZEN_PLATFORM_WINDOWS + +namespace { + thread_local bool t_IsThreadNamed{false}; +} + +struct WorkerThreadPool::Impl { - for (int i = 0; i < InThreadCount; ++i) + PTP_POOL m_ThreadPool = nullptr; + PTP_CLEANUP_GROUP m_CleanupGroup = nullptr; + TP_CALLBACK_ENVIRON m_CallbackEnvironment; + PTP_WORK m_Work = nullptr; + + std::string m_WorkerThreadBaseName; + std::atomic m_WorkerThreadCounter{0}; + + RwLock m_QueueLock; + std::deque> m_WorkQueue; + + Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName) { - m_WorkerThreads.emplace_back(&WorkerThreadPool::WorkerThreadFunction, this); + // Thread pool setup + + m_ThreadPool = CreateThreadpool(NULL); + + SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount); + SetThreadpoolThreadMaximum(m_ThreadPool, InThreadCount * 2); + + InitializeThreadpoolEnvironment(&m_CallbackEnvironment); + + m_CleanupGroup = CreateThreadpoolCleanupGroup(); + + SetThreadpoolCallbackPool(&m_CallbackEnvironment, m_ThreadPool); + SetThreadpoolCallbackCleanupGroup(&m_CallbackEnvironment, m_CleanupGroup, NULL); + + m_Work = CreateThreadpoolWork(&WorkCallback, this, &m_CallbackEnvironment); } -} -WorkerThreadPool::~WorkerThreadPool() -{ - m_WorkQueue.CompleteAdding(); + ~Impl() + { + WaitForThreadpoolWorkCallbacks(m_Work, /* CancelPendingCallbacks */ TRUE); + CloseThreadpoolWork(m_Work); + } - for (std::thread& Thread : m_WorkerThreads) + void ScheduleWork(Ref Work) { - Thread.join(); + m_QueueLock.WithExclusiveLock([&] { m_WorkQueue.push_back(std::move(Work)); }); + SubmitThreadpoolWork(m_Work); } + [[nodiscard]] size_t PendingWorkItemCount() const { return 0; } - m_WorkerThreads.clear(); -} + static VOID CALLBACK WorkCallback(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work) + { + ZEN_UNUSED(Instance, Work); + Impl* ThisPtr = reinterpret_cast(Context); + ThisPtr->DoWork(); + } -void -WorkerThreadPool::ScheduleWork(Ref Work) -{ - m_WorkQueue.Enqueue(std::move(Work)); -} + void DoWork() + { + if (!t_IsThreadNamed) + { + t_IsThreadNamed = true; + const int ThreadIndex = ++m_WorkerThreadCounter; + zen::ExtendableStringBuilder<128> ThreadName; + ThreadName << m_WorkerThreadBaseName << "_" << ThreadIndex; + SetCurrentThreadName(ThreadName); + } -void -WorkerThreadPool::ScheduleWork(std::function&& Work) + Ref WorkFromQueue; + + { + RwLock::ExclusiveLockScope _{m_QueueLock}; + WorkFromQueue = std::move(m_WorkQueue.front()); + m_WorkQueue.pop_front(); + } + + WorkFromQueue->Execute(); + } +}; + +#else + +struct WorkerThreadPool::ThreadStartInfo { - m_WorkQueue.Enqueue(Ref(new detail::LambdaWork(Work))); -} + int ThreadNumber; + zen::Latch* Latch; +}; -[[nodiscard]] size_t -WorkerThreadPool::PendingWork() const +struct WorkerThreadPool::Impl { - return m_WorkQueue.Size(); -} + void WorkerThreadFunction(ThreadStartInfo Info); + std::string m_WorkerThreadBaseName; + std::vector m_WorkerThreads; + BlockingQueue> m_WorkQueue; + + Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName) + { + trace::ThreadGroupBegin(m_WorkerThreadBaseName.c_str()); + + zen::Latch WorkerLatch{InThreadCount}; + + for (int i = 0; i < InThreadCount; ++i) + { + m_WorkerThreads.emplace_back(&Impl::WorkerThreadFunction, this, ThreadStartInfo{i + 1, &WorkerLatch}); + } + + WorkerLatch.Wait(); + + trace::ThreadGroupEnd(); + } + + ~Impl() + { + m_WorkQueue.CompleteAdding(); + + for (std::thread& Thread : m_WorkerThreads) + { + Thread.join(); + } + + m_WorkerThreads.clear(); + } + + void ScheduleWork(Ref Work) { m_WorkQueue.Enqueue(std::move(Work)); } + [[nodiscard]] size_t PendingWorkItemCount() const { return m_WorkQueue.Size(); } +}; void -WorkerThreadPool::WorkerThreadFunction() +WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info) { + SetCurrentThreadName(fmt::format("{}_{}", m_WorkerThreadBaseName, Info.ThreadNumber)); + + Info.Latch->CountDown(); + do { Ref Work; @@ -80,4 +190,65 @@ WorkerThreadPool::WorkerThreadFunction() } while (true); } +#endif + +////////////////////////////////////////////////////////////////////////// +WorkerThreadPool::WorkerThreadPool(int InThreadCount) : WorkerThreadPool(InThreadCount, "workerthread") +{ +} + +WorkerThreadPool::WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName) +{ + m_Impl = std::make_unique(InThreadCount, WorkerThreadBaseName); +} + +WorkerThreadPool::~WorkerThreadPool() +{ + m_Impl.reset(); +} + +void +WorkerThreadPool::ScheduleWork(Ref Work) +{ + m_Impl->ScheduleWork(std::move(Work)); +} + +void +WorkerThreadPool::ScheduleWork(std::function&& Work) +{ + ScheduleWork(Ref(new detail::LambdaWork(Work))); +} + +[[nodiscard]] size_t +WorkerThreadPool::PendingWorkItemCount() const +{ + return m_Impl->PendingWorkItemCount(); +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +void +workthreadpool_forcelink() +{ +} + +using namespace std::literals; + +TEST_CASE("threadpool.basic") +{ + WorkerThreadPool Threadpool{1}; + + auto Future42 = Threadpool.EnqueueTask(std::packaged_task{[] { return 42; }}); + auto Future99 = Threadpool.EnqueueTask(std::packaged_task{[] { return 99; }}); + auto FutureThrow = Threadpool.EnqueueTask(std::packaged_task{[] { throw std::runtime_error("meep!"); }}); + + CHECK_EQ(Future42.get(), 42); + CHECK_EQ(Future99.get(), 99); + CHECK_THROWS(FutureThrow.get()); +} + +#endif + } // namespace zen -- cgit v1.2.3