diff options
| author | Stefan Boberg <[email protected]> | 2023-05-15 17:55:52 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-15 17:55:52 +0200 |
| commit | 14f4baabfa56eedb7b71235b951e993da4f641ef (patch) | |
| tree | 86c08838a0179ae15910b791a1412c448340020d /src | |
| parent | v0.2.11 (diff) | |
| download | zen-14f4baabfa56eedb7b71235b951e993da4f641ef.tar.xz zen-14f4baabfa56eedb7b71235b951e993da4f641ef.zip | |
all threads should be named (#304)
* added WorkerThreadPool naming, packaged_task support
* name the http.sys thread pool service threads
* added http.sys I/O threadpool naming
* upstream cache I/O thread naming
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/include/zencore/workthreadpool.h | 46 | ||||
| -rw-r--r-- | src/zencore/workthreadpool.cpp | 221 | ||||
| -rw-r--r-- | src/zenhttp/httpsys.cpp | 22 | ||||
| -rw-r--r-- | src/zenserver/upstream/upstreamapply.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/upstream/upstreamcache.cpp | 7 |
5 files changed, 260 insertions, 40 deletions
diff --git a/src/zencore/include/zencore/workthreadpool.h b/src/zencore/include/zencore/workthreadpool.h index 0ddc65298..c10c6ed12 100644 --- a/src/zencore/include/zencore/workthreadpool.h +++ b/src/zencore/include/zencore/workthreadpool.h @@ -4,17 +4,16 @@ #include <zencore/zencore.h> -#include <zencore/blockingqueue.h> #include <zencore/refcount.h> #include <exception> #include <functional> -#include <system_error> -#include <thread> -#include <vector> +#include <future> namespace zen { +////////////////////////////////////////////////////////////////////////// + struct IWork : public RefCounted { virtual void Execute() = 0; @@ -27,22 +26,51 @@ private: friend class WorkerThreadPool; }; +////////////////////////////////////////////////////////////////////////// + class WorkerThreadPool { public: - WorkerThreadPool(int InThreadCount); + explicit WorkerThreadPool(int InThreadCount); + WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName); ~WorkerThreadPool(); void ScheduleWork(Ref<IWork> Work); void ScheduleWork(std::function<void()>&& Work); - [[nodiscard]] size_t PendingWork() const; + template<typename Func> + auto EnqueueTask(std::packaged_task<Func> Task); + + [[nodiscard]] size_t PendingWorkItemCount() const; private: - void WorkerThreadFunction(); + struct Impl; + struct ThreadStartInfo; - std::vector<std::thread> m_WorkerThreads; - BlockingQueue<Ref<IWork>> m_WorkQueue; + std::unique_ptr<Impl> m_Impl; }; +////////////////////////////////////////////////////////////////////////// + +template<typename Func> +auto +WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task) +{ + struct FutureWork : IWork + { + FutureWork(std::packaged_task<Func> Task) : m_Task{std::move(Task)} {} + virtual void Execute() override { m_Task(); } + + std::packaged_task<Func> m_Task; + }; + + Ref<FutureWork> Work{new FutureWork(std::move(Task))}; + + auto Future = Work->m_Task.get_future(); + ScheduleWork(std::move(Work)); + return Future; +} + +void workthreadpool_forcelink(); // internal + } // namespace zen diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index b4328cdbd..cc21e717a 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -2,7 +2,21 @@ #include <zencore/workthreadpool.h> +#include <zencore/blockingqueue.h> #include <zencore/logging.h> +#include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/thread.h> +#include <zencore/trace.h> + +#include <thread> +#include <vector> + +#define ZEN_USE_WINDOWS_THREADPOOL 1 + +#if ZEN_PLATFORM_WINDOWS && ZEN_USE_WINDOWS_THREADPOOL +# include <zencore/windows.h> +#endif namespace zen { @@ -16,47 +30,143 @@ namespace detail { }; } // namespace detail -WorkerThreadPool::WorkerThreadPool(int InThreadCount) +////////////////////////////////////////////////////////////////////////// + +#if ZEN_USE_WINDOWS_THREADPOOL && ZEN_PLATFORM_WINDOWS + +namespace { + thread_local bool t_IsThreadNamed{false}; +} + +struct WorkerThreadPool::Impl { - for (int i = 0; i < InThreadCount; ++i) + PTP_POOL m_ThreadPool = nullptr; + PTP_CLEANUP_GROUP m_CleanupGroup = nullptr; + TP_CALLBACK_ENVIRON m_CallbackEnvironment; + PTP_WORK m_Work = nullptr; + + std::string m_WorkerThreadBaseName; + std::atomic<int> m_WorkerThreadCounter{0}; + + RwLock m_QueueLock; + std::deque<Ref<IWork>> m_WorkQueue; + + Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName) { - m_WorkerThreads.emplace_back(&WorkerThreadPool::WorkerThreadFunction, this); + // Thread pool setup + + m_ThreadPool = CreateThreadpool(NULL); + + SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount); + SetThreadpoolThreadMaximum(m_ThreadPool, InThreadCount * 2); + + InitializeThreadpoolEnvironment(&m_CallbackEnvironment); + + m_CleanupGroup = CreateThreadpoolCleanupGroup(); + + SetThreadpoolCallbackPool(&m_CallbackEnvironment, m_ThreadPool); + SetThreadpoolCallbackCleanupGroup(&m_CallbackEnvironment, m_CleanupGroup, NULL); + + m_Work = CreateThreadpoolWork(&WorkCallback, this, &m_CallbackEnvironment); } -} -WorkerThreadPool::~WorkerThreadPool() -{ - m_WorkQueue.CompleteAdding(); + ~Impl() + { + WaitForThreadpoolWorkCallbacks(m_Work, /* CancelPendingCallbacks */ TRUE); + CloseThreadpoolWork(m_Work); + } - for (std::thread& Thread : m_WorkerThreads) + void ScheduleWork(Ref<IWork> Work) { - Thread.join(); + m_QueueLock.WithExclusiveLock([&] { m_WorkQueue.push_back(std::move(Work)); }); + SubmitThreadpoolWork(m_Work); } + [[nodiscard]] size_t PendingWorkItemCount() const { return 0; } - m_WorkerThreads.clear(); -} + static VOID CALLBACK WorkCallback(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work) + { + ZEN_UNUSED(Instance, Work); + Impl* ThisPtr = reinterpret_cast<Impl*>(Context); + ThisPtr->DoWork(); + } -void -WorkerThreadPool::ScheduleWork(Ref<IWork> Work) -{ - m_WorkQueue.Enqueue(std::move(Work)); -} + void DoWork() + { + if (!t_IsThreadNamed) + { + t_IsThreadNamed = true; + const int ThreadIndex = ++m_WorkerThreadCounter; + zen::ExtendableStringBuilder<128> ThreadName; + ThreadName << m_WorkerThreadBaseName << "_" << ThreadIndex; + SetCurrentThreadName(ThreadName); + } -void -WorkerThreadPool::ScheduleWork(std::function<void()>&& Work) + Ref<IWork> WorkFromQueue; + + { + RwLock::ExclusiveLockScope _{m_QueueLock}; + WorkFromQueue = std::move(m_WorkQueue.front()); + m_WorkQueue.pop_front(); + } + + WorkFromQueue->Execute(); + } +}; + +#else + +struct WorkerThreadPool::ThreadStartInfo { - m_WorkQueue.Enqueue(Ref<IWork>(new detail::LambdaWork(Work))); -} + int ThreadNumber; + zen::Latch* Latch; +}; -[[nodiscard]] size_t -WorkerThreadPool::PendingWork() const +struct WorkerThreadPool::Impl { - return m_WorkQueue.Size(); -} + void WorkerThreadFunction(ThreadStartInfo Info); + std::string m_WorkerThreadBaseName; + std::vector<std::thread> m_WorkerThreads; + BlockingQueue<Ref<IWork>> m_WorkQueue; + + Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName) + { + trace::ThreadGroupBegin(m_WorkerThreadBaseName.c_str()); + + zen::Latch WorkerLatch{InThreadCount}; + + for (int i = 0; i < InThreadCount; ++i) + { + m_WorkerThreads.emplace_back(&Impl::WorkerThreadFunction, this, ThreadStartInfo{i + 1, &WorkerLatch}); + } + + WorkerLatch.Wait(); + + trace::ThreadGroupEnd(); + } + + ~Impl() + { + m_WorkQueue.CompleteAdding(); + + for (std::thread& Thread : m_WorkerThreads) + { + Thread.join(); + } + + m_WorkerThreads.clear(); + } + + void ScheduleWork(Ref<IWork> Work) { m_WorkQueue.Enqueue(std::move(Work)); } + [[nodiscard]] size_t PendingWorkItemCount() const { return m_WorkQueue.Size(); } +}; void -WorkerThreadPool::WorkerThreadFunction() +WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info) { + SetCurrentThreadName(fmt::format("{}_{}", m_WorkerThreadBaseName, Info.ThreadNumber)); + + Info.Latch->CountDown(); + do { Ref<IWork> Work; @@ -80,4 +190,65 @@ WorkerThreadPool::WorkerThreadFunction() } while (true); } +#endif + +////////////////////////////////////////////////////////////////////////// +WorkerThreadPool::WorkerThreadPool(int InThreadCount) : WorkerThreadPool(InThreadCount, "workerthread") +{ +} + +WorkerThreadPool::WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName) +{ + m_Impl = std::make_unique<Impl>(InThreadCount, WorkerThreadBaseName); +} + +WorkerThreadPool::~WorkerThreadPool() +{ + m_Impl.reset(); +} + +void +WorkerThreadPool::ScheduleWork(Ref<IWork> Work) +{ + m_Impl->ScheduleWork(std::move(Work)); +} + +void +WorkerThreadPool::ScheduleWork(std::function<void()>&& Work) +{ + ScheduleWork(Ref<IWork>(new detail::LambdaWork(Work))); +} + +[[nodiscard]] size_t +WorkerThreadPool::PendingWorkItemCount() const +{ + return m_Impl->PendingWorkItemCount(); +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +void +workthreadpool_forcelink() +{ +} + +using namespace std::literals; + +TEST_CASE("threadpool.basic") +{ + WorkerThreadPool Threadpool{1}; + + auto Future42 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 42; }}); + auto Future99 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 99; }}); + auto FutureThrow = Threadpool.EnqueueTask(std::packaged_task<void()>{[] { throw std::runtime_error("meep!"); }}); + + CHECK_EQ(Future42.get(), 42); + CHECK_EQ(Future99.get(), 99); + CHECK_THROWS(FutureThrow.get()); +} + +#endif + } // namespace zen diff --git a/src/zenhttp/httpsys.cpp b/src/zenhttp/httpsys.cpp index 25e4393b3..979f69aeb 100644 --- a/src/zenhttp/httpsys.cpp +++ b/src/zenhttp/httpsys.cpp @@ -178,7 +178,7 @@ class HttpSysServerRequest : public HttpServerRequest { public: HttpSysServerRequest(HttpSysTransaction& Tx, HttpService& Service, IoBuffer PayloadBuffer); - ~HttpSysServerRequest() = default; + ~HttpSysServerRequest(); virtual Oid ParseSessionId() const override; virtual uint32_t ParseRequestId() const override; @@ -722,7 +722,7 @@ HttpSysServer::HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThr : m_Log(logging::Get("http")) , m_RequestLog(logging::Get("http_requests")) , m_ThreadPool(ThreadCount) -, m_AsyncWorkPool(AsyncWorkThreadCount) +, m_AsyncWorkPool(AsyncWorkThreadCount, "http_async") { ULONG Result = HttpInitialize(HTTPAPI_VERSION_2, HTTP_INITIALIZE_SERVER, nullptr); @@ -1093,6 +1093,9 @@ HttpSysTransaction::IssueInitialRequest(std::error_code& ErrorCode) m_InitialHttpHandler.IssueRequest(ErrorCode); } +thread_local bool t_IsHttpSysThreadNamed = false; +static std::atomic<int> HttpSysThreadIndex = 0; + void HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance, PVOID pContext /* HttpSysServer */, @@ -1105,6 +1108,17 @@ HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance, UNREFERENCED_PARAMETER(Instance); UNREFERENCED_PARAMETER(pContext); + // Assign names to threads for context + + if (!t_IsHttpSysThreadNamed) + { + t_IsHttpSysThreadNamed = true; + const int ThreadIndex = ++HttpSysThreadIndex; + zen::ExtendableStringBuilder<128> ThreadName; + ThreadName << "httpio_" << ThreadIndex; + SetCurrentThreadName(ThreadName); + } + // Note that for a given transaction we may be in this completion function on more // than one thread at any given moment. This means we need to be careful about what // happens in here @@ -1306,6 +1320,10 @@ HttpSysServerRequest::HttpSysServerRequest(HttpSysTransaction& Tx, HttpService& } } +HttpSysServerRequest::~HttpSysServerRequest() +{ +} + Oid HttpSysServerRequest::ParseSessionId() const { diff --git a/src/zenserver/upstream/upstreamapply.cpp b/src/zenserver/upstream/upstreamapply.cpp index c719b225d..3d29f2228 100644 --- a/src/zenserver/upstream/upstreamapply.cpp +++ b/src/zenserver/upstream/upstreamapply.cpp @@ -188,9 +188,9 @@ public: virtual void GetStatus(CbObjectWriter& Status) override { Status << "upstream_worker_threads" << m_Options.UpstreamThreadCount; - Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWork(); + Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWorkItemCount(); Status << "downstream_worker_threads" << m_Options.DownstreamThreadCount; - Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWork(); + Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWorkItemCount(); Status.BeginArray("endpoints"); for (const auto& Ep : m_Endpoints) diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp index 245097796..2a9e67c06 100644 --- a/src/zenserver/upstream/upstreamcache.cpp +++ b/src/zenserver/upstream/upstreamcache.cpp @@ -1490,7 +1490,7 @@ public: { for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { - m_UpstreamThreads.emplace_back(&UpstreamCacheImpl::ProcessUpstreamQueue, this); + m_UpstreamThreads.emplace_back(&UpstreamCacheImpl::ProcessUpstreamQueue, this, Idx + 1); } m_EndpointMonitorThread = std::thread(&UpstreamCacheImpl::MonitorEndpoints, this); @@ -1947,8 +1947,11 @@ private: } } - void ProcessUpstreamQueue() + void ProcessUpstreamQueue(int ThreadIndex) { + std::string ThreadName = fmt::format("upstream_{}", ThreadIndex); + SetCurrentThreadName(ThreadName); + for (;;) { UpstreamCacheRecord CacheRecord; |