diff options
| author | Stefan Boberg <[email protected]> | 2023-05-15 17:55:52 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-15 17:55:52 +0200 |
| commit | 14f4baabfa56eedb7b71235b951e993da4f641ef (patch) | |
| tree | 86c08838a0179ae15910b791a1412c448340020d /src/zencore/workthreadpool.cpp | |
| parent | v0.2.11 (diff) | |
| download | zen-14f4baabfa56eedb7b71235b951e993da4f641ef.tar.xz zen-14f4baabfa56eedb7b71235b951e993da4f641ef.zip | |
all threads should be named (#304)
* added WorkerThreadPool naming, packaged_task support
* name the http.sys thread pool service threads
* added http.sys I/O threadpool naming
* upstream cache I/O thread naming
Diffstat (limited to 'src/zencore/workthreadpool.cpp')
| -rw-r--r-- | src/zencore/workthreadpool.cpp | 221 |
1 files changed, 196 insertions, 25 deletions
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index b4328cdbd..cc21e717a 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -2,7 +2,21 @@ #include <zencore/workthreadpool.h> +#include <zencore/blockingqueue.h> #include <zencore/logging.h> +#include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/thread.h> +#include <zencore/trace.h> + +#include <thread> +#include <vector> + +#define ZEN_USE_WINDOWS_THREADPOOL 1 + +#if ZEN_PLATFORM_WINDOWS && ZEN_USE_WINDOWS_THREADPOOL +# include <zencore/windows.h> +#endif namespace zen { @@ -16,47 +30,143 @@ namespace detail { }; } // namespace detail -WorkerThreadPool::WorkerThreadPool(int InThreadCount) +////////////////////////////////////////////////////////////////////////// + +#if ZEN_USE_WINDOWS_THREADPOOL && ZEN_PLATFORM_WINDOWS + +namespace { + thread_local bool t_IsThreadNamed{false}; +} + +struct WorkerThreadPool::Impl { - for (int i = 0; i < InThreadCount; ++i) + PTP_POOL m_ThreadPool = nullptr; + PTP_CLEANUP_GROUP m_CleanupGroup = nullptr; + TP_CALLBACK_ENVIRON m_CallbackEnvironment; + PTP_WORK m_Work = nullptr; + + std::string m_WorkerThreadBaseName; + std::atomic<int> m_WorkerThreadCounter{0}; + + RwLock m_QueueLock; + std::deque<Ref<IWork>> m_WorkQueue; + + Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName) { - m_WorkerThreads.emplace_back(&WorkerThreadPool::WorkerThreadFunction, this); + // Thread pool setup + + m_ThreadPool = CreateThreadpool(NULL); + + SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount); + SetThreadpoolThreadMaximum(m_ThreadPool, InThreadCount * 2); + + InitializeThreadpoolEnvironment(&m_CallbackEnvironment); + + m_CleanupGroup = CreateThreadpoolCleanupGroup(); + + SetThreadpoolCallbackPool(&m_CallbackEnvironment, m_ThreadPool); + SetThreadpoolCallbackCleanupGroup(&m_CallbackEnvironment, m_CleanupGroup, NULL); + + m_Work = CreateThreadpoolWork(&WorkCallback, this, &m_CallbackEnvironment); } -} -WorkerThreadPool::~WorkerThreadPool() -{ - m_WorkQueue.CompleteAdding(); + ~Impl() + { + WaitForThreadpoolWorkCallbacks(m_Work, /* CancelPendingCallbacks */ TRUE); + CloseThreadpoolWork(m_Work); + } - for (std::thread& Thread : m_WorkerThreads) + void ScheduleWork(Ref<IWork> Work) { - Thread.join(); + m_QueueLock.WithExclusiveLock([&] { m_WorkQueue.push_back(std::move(Work)); }); + SubmitThreadpoolWork(m_Work); } + [[nodiscard]] size_t PendingWorkItemCount() const { return 0; } - m_WorkerThreads.clear(); -} + static VOID CALLBACK WorkCallback(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work) + { + ZEN_UNUSED(Instance, Work); + Impl* ThisPtr = reinterpret_cast<Impl*>(Context); + ThisPtr->DoWork(); + } -void -WorkerThreadPool::ScheduleWork(Ref<IWork> Work) -{ - m_WorkQueue.Enqueue(std::move(Work)); -} + void DoWork() + { + if (!t_IsThreadNamed) + { + t_IsThreadNamed = true; + const int ThreadIndex = ++m_WorkerThreadCounter; + zen::ExtendableStringBuilder<128> ThreadName; + ThreadName << m_WorkerThreadBaseName << "_" << ThreadIndex; + SetCurrentThreadName(ThreadName); + } -void -WorkerThreadPool::ScheduleWork(std::function<void()>&& Work) + Ref<IWork> WorkFromQueue; + + { + RwLock::ExclusiveLockScope _{m_QueueLock}; + WorkFromQueue = std::move(m_WorkQueue.front()); + m_WorkQueue.pop_front(); + } + + WorkFromQueue->Execute(); + } +}; + +#else + +struct WorkerThreadPool::ThreadStartInfo { - m_WorkQueue.Enqueue(Ref<IWork>(new detail::LambdaWork(Work))); -} + int ThreadNumber; + zen::Latch* Latch; +}; -[[nodiscard]] size_t -WorkerThreadPool::PendingWork() const +struct WorkerThreadPool::Impl { - return m_WorkQueue.Size(); -} + void WorkerThreadFunction(ThreadStartInfo Info); + std::string m_WorkerThreadBaseName; + std::vector<std::thread> m_WorkerThreads; + BlockingQueue<Ref<IWork>> m_WorkQueue; + + Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName) + { + trace::ThreadGroupBegin(m_WorkerThreadBaseName.c_str()); + + zen::Latch WorkerLatch{InThreadCount}; + + for (int i = 0; i < InThreadCount; ++i) + { + m_WorkerThreads.emplace_back(&Impl::WorkerThreadFunction, this, ThreadStartInfo{i + 1, &WorkerLatch}); + } + + WorkerLatch.Wait(); + + trace::ThreadGroupEnd(); + } + + ~Impl() + { + m_WorkQueue.CompleteAdding(); + + for (std::thread& Thread : m_WorkerThreads) + { + Thread.join(); + } + + m_WorkerThreads.clear(); + } + + void ScheduleWork(Ref<IWork> Work) { m_WorkQueue.Enqueue(std::move(Work)); } + [[nodiscard]] size_t PendingWorkItemCount() const { return m_WorkQueue.Size(); } +}; void -WorkerThreadPool::WorkerThreadFunction() +WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info) { + SetCurrentThreadName(fmt::format("{}_{}", m_WorkerThreadBaseName, Info.ThreadNumber)); + + Info.Latch->CountDown(); + do { Ref<IWork> Work; @@ -80,4 +190,65 @@ WorkerThreadPool::WorkerThreadFunction() } while (true); } +#endif + +////////////////////////////////////////////////////////////////////////// +WorkerThreadPool::WorkerThreadPool(int InThreadCount) : WorkerThreadPool(InThreadCount, "workerthread") +{ +} + +WorkerThreadPool::WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName) +{ + m_Impl = std::make_unique<Impl>(InThreadCount, WorkerThreadBaseName); +} + +WorkerThreadPool::~WorkerThreadPool() +{ + m_Impl.reset(); +} + +void +WorkerThreadPool::ScheduleWork(Ref<IWork> Work) +{ + m_Impl->ScheduleWork(std::move(Work)); +} + +void +WorkerThreadPool::ScheduleWork(std::function<void()>&& Work) +{ + ScheduleWork(Ref<IWork>(new detail::LambdaWork(Work))); +} + +[[nodiscard]] size_t +WorkerThreadPool::PendingWorkItemCount() const +{ + return m_Impl->PendingWorkItemCount(); +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +void +workthreadpool_forcelink() +{ +} + +using namespace std::literals; + +TEST_CASE("threadpool.basic") +{ + WorkerThreadPool Threadpool{1}; + + auto Future42 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 42; }}); + auto Future99 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 99; }}); + auto FutureThrow = Threadpool.EnqueueTask(std::packaged_task<void()>{[] { throw std::runtime_error("meep!"); }}); + + CHECK_EQ(Future42.get(), 42); + CHECK_EQ(Future99.get(), 99); + CHECK_THROWS(FutureThrow.get()); +} + +#endif + } // namespace zen |