aboutsummaryrefslogtreecommitdiff
path: root/src/zencore/workthreadpool.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-15 17:55:52 +0200
committerGitHub <[email protected]>2023-05-15 17:55:52 +0200
commit14f4baabfa56eedb7b71235b951e993da4f641ef (patch)
tree86c08838a0179ae15910b791a1412c448340020d /src/zencore/workthreadpool.cpp
parentv0.2.11 (diff)
downloadzen-14f4baabfa56eedb7b71235b951e993da4f641ef.tar.xz
zen-14f4baabfa56eedb7b71235b951e993da4f641ef.zip
all threads should be named (#304)
* added WorkerThreadPool naming, packaged_task support * name the http.sys thread pool service threads * added http.sys I/O threadpool naming * upstream cache I/O thread naming
Diffstat (limited to 'src/zencore/workthreadpool.cpp')
-rw-r--r--src/zencore/workthreadpool.cpp221
1 files changed, 196 insertions, 25 deletions
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp
index b4328cdbd..cc21e717a 100644
--- a/src/zencore/workthreadpool.cpp
+++ b/src/zencore/workthreadpool.cpp
@@ -2,7 +2,21 @@
#include <zencore/workthreadpool.h>
+#include <zencore/blockingqueue.h>
#include <zencore/logging.h>
+#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/thread.h>
+#include <zencore/trace.h>
+
+#include <thread>
+#include <vector>
+
+#define ZEN_USE_WINDOWS_THREADPOOL 1
+
+#if ZEN_PLATFORM_WINDOWS && ZEN_USE_WINDOWS_THREADPOOL
+# include <zencore/windows.h>
+#endif
namespace zen {
@@ -16,47 +30,143 @@ namespace detail {
};
} // namespace detail
-WorkerThreadPool::WorkerThreadPool(int InThreadCount)
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_USE_WINDOWS_THREADPOOL && ZEN_PLATFORM_WINDOWS
+
+namespace {
+ thread_local bool t_IsThreadNamed{false};
+}
+
+struct WorkerThreadPool::Impl
{
- for (int i = 0; i < InThreadCount; ++i)
+ PTP_POOL m_ThreadPool = nullptr;
+ PTP_CLEANUP_GROUP m_CleanupGroup = nullptr;
+ TP_CALLBACK_ENVIRON m_CallbackEnvironment;
+ PTP_WORK m_Work = nullptr;
+
+ std::string m_WorkerThreadBaseName;
+ std::atomic<int> m_WorkerThreadCounter{0};
+
+ RwLock m_QueueLock;
+ std::deque<Ref<IWork>> m_WorkQueue;
+
+ Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName)
{
- m_WorkerThreads.emplace_back(&WorkerThreadPool::WorkerThreadFunction, this);
+ // Thread pool setup
+
+ m_ThreadPool = CreateThreadpool(NULL);
+
+ SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount);
+ SetThreadpoolThreadMaximum(m_ThreadPool, InThreadCount * 2);
+
+ InitializeThreadpoolEnvironment(&m_CallbackEnvironment);
+
+ m_CleanupGroup = CreateThreadpoolCleanupGroup();
+
+ SetThreadpoolCallbackPool(&m_CallbackEnvironment, m_ThreadPool);
+ SetThreadpoolCallbackCleanupGroup(&m_CallbackEnvironment, m_CleanupGroup, NULL);
+
+ m_Work = CreateThreadpoolWork(&WorkCallback, this, &m_CallbackEnvironment);
}
-}
-WorkerThreadPool::~WorkerThreadPool()
-{
- m_WorkQueue.CompleteAdding();
+ ~Impl()
+ {
+ WaitForThreadpoolWorkCallbacks(m_Work, /* CancelPendingCallbacks */ TRUE);
+ CloseThreadpoolWork(m_Work);
+ }
- for (std::thread& Thread : m_WorkerThreads)
+ void ScheduleWork(Ref<IWork> Work)
{
- Thread.join();
+ m_QueueLock.WithExclusiveLock([&] { m_WorkQueue.push_back(std::move(Work)); });
+ SubmitThreadpoolWork(m_Work);
}
+ [[nodiscard]] size_t PendingWorkItemCount() const { return 0; }
- m_WorkerThreads.clear();
-}
+ static VOID CALLBACK WorkCallback(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work)
+ {
+ ZEN_UNUSED(Instance, Work);
+ Impl* ThisPtr = reinterpret_cast<Impl*>(Context);
+ ThisPtr->DoWork();
+ }
-void
-WorkerThreadPool::ScheduleWork(Ref<IWork> Work)
-{
- m_WorkQueue.Enqueue(std::move(Work));
-}
+ void DoWork()
+ {
+ if (!t_IsThreadNamed)
+ {
+ t_IsThreadNamed = true;
+ const int ThreadIndex = ++m_WorkerThreadCounter;
+ zen::ExtendableStringBuilder<128> ThreadName;
+ ThreadName << m_WorkerThreadBaseName << "_" << ThreadIndex;
+ SetCurrentThreadName(ThreadName);
+ }
-void
-WorkerThreadPool::ScheduleWork(std::function<void()>&& Work)
+ Ref<IWork> WorkFromQueue;
+
+ {
+ RwLock::ExclusiveLockScope _{m_QueueLock};
+ WorkFromQueue = std::move(m_WorkQueue.front());
+ m_WorkQueue.pop_front();
+ }
+
+ WorkFromQueue->Execute();
+ }
+};
+
+#else
+
+struct WorkerThreadPool::ThreadStartInfo
{
- m_WorkQueue.Enqueue(Ref<IWork>(new detail::LambdaWork(Work)));
-}
+ int ThreadNumber;
+ zen::Latch* Latch;
+};
-[[nodiscard]] size_t
-WorkerThreadPool::PendingWork() const
+struct WorkerThreadPool::Impl
{
- return m_WorkQueue.Size();
-}
+ void WorkerThreadFunction(ThreadStartInfo Info);
+ std::string m_WorkerThreadBaseName;
+ std::vector<std::thread> m_WorkerThreads;
+ BlockingQueue<Ref<IWork>> m_WorkQueue;
+
+ Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName)
+ {
+ trace::ThreadGroupBegin(m_WorkerThreadBaseName.c_str());
+
+ zen::Latch WorkerLatch{InThreadCount};
+
+ for (int i = 0; i < InThreadCount; ++i)
+ {
+ m_WorkerThreads.emplace_back(&Impl::WorkerThreadFunction, this, ThreadStartInfo{i + 1, &WorkerLatch});
+ }
+
+ WorkerLatch.Wait();
+
+ trace::ThreadGroupEnd();
+ }
+
+ ~Impl()
+ {
+ m_WorkQueue.CompleteAdding();
+
+ for (std::thread& Thread : m_WorkerThreads)
+ {
+ Thread.join();
+ }
+
+ m_WorkerThreads.clear();
+ }
+
+ void ScheduleWork(Ref<IWork> Work) { m_WorkQueue.Enqueue(std::move(Work)); }
+ [[nodiscard]] size_t PendingWorkItemCount() const { return m_WorkQueue.Size(); }
+};
void
-WorkerThreadPool::WorkerThreadFunction()
+WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info)
{
+ SetCurrentThreadName(fmt::format("{}_{}", m_WorkerThreadBaseName, Info.ThreadNumber));
+
+ Info.Latch->CountDown();
+
do
{
Ref<IWork> Work;
@@ -80,4 +190,65 @@ WorkerThreadPool::WorkerThreadFunction()
} while (true);
}
+#endif
+
+//////////////////////////////////////////////////////////////////////////
+WorkerThreadPool::WorkerThreadPool(int InThreadCount) : WorkerThreadPool(InThreadCount, "workerthread")
+{
+}
+
+WorkerThreadPool::WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName)
+{
+ m_Impl = std::make_unique<Impl>(InThreadCount, WorkerThreadBaseName);
+}
+
+WorkerThreadPool::~WorkerThreadPool()
+{
+ m_Impl.reset();
+}
+
+void
+WorkerThreadPool::ScheduleWork(Ref<IWork> Work)
+{
+ m_Impl->ScheduleWork(std::move(Work));
+}
+
+void
+WorkerThreadPool::ScheduleWork(std::function<void()>&& Work)
+{
+ ScheduleWork(Ref<IWork>(new detail::LambdaWork(Work)));
+}
+
+[[nodiscard]] size_t
+WorkerThreadPool::PendingWorkItemCount() const
+{
+ return m_Impl->PendingWorkItemCount();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+void
+workthreadpool_forcelink()
+{
+}
+
+using namespace std::literals;
+
+TEST_CASE("threadpool.basic")
+{
+ WorkerThreadPool Threadpool{1};
+
+ auto Future42 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 42; }});
+ auto Future99 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 99; }});
+ auto FutureThrow = Threadpool.EnqueueTask(std::packaged_task<void()>{[] { throw std::runtime_error("meep!"); }});
+
+ CHECK_EQ(Future42.get(), 42);
+ CHECK_EQ(Future99.get(), 99);
+ CHECK_THROWS(FutureThrow.get());
+}
+
+#endif
+
} // namespace zen