aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-15 17:55:52 +0200
committerGitHub <[email protected]>2023-05-15 17:55:52 +0200
commit14f4baabfa56eedb7b71235b951e993da4f641ef (patch)
tree86c08838a0179ae15910b791a1412c448340020d /src
parentv0.2.11 (diff)
downloadzen-14f4baabfa56eedb7b71235b951e993da4f641ef.tar.xz
zen-14f4baabfa56eedb7b71235b951e993da4f641ef.zip
all threads should be named (#304)
* added WorkerThreadPool naming, packaged_task support * name the http.sys thread pool service threads * added http.sys I/O threadpool naming * upstream cache I/O thread naming
Diffstat (limited to 'src')
-rw-r--r--src/zencore/include/zencore/workthreadpool.h46
-rw-r--r--src/zencore/workthreadpool.cpp221
-rw-r--r--src/zenhttp/httpsys.cpp22
-rw-r--r--src/zenserver/upstream/upstreamapply.cpp4
-rw-r--r--src/zenserver/upstream/upstreamcache.cpp7
5 files changed, 260 insertions, 40 deletions
diff --git a/src/zencore/include/zencore/workthreadpool.h b/src/zencore/include/zencore/workthreadpool.h
index 0ddc65298..c10c6ed12 100644
--- a/src/zencore/include/zencore/workthreadpool.h
+++ b/src/zencore/include/zencore/workthreadpool.h
@@ -4,17 +4,16 @@
#include <zencore/zencore.h>
-#include <zencore/blockingqueue.h>
#include <zencore/refcount.h>
#include <exception>
#include <functional>
-#include <system_error>
-#include <thread>
-#include <vector>
+#include <future>
namespace zen {
+//////////////////////////////////////////////////////////////////////////
+
struct IWork : public RefCounted
{
virtual void Execute() = 0;
@@ -27,22 +26,51 @@ private:
friend class WorkerThreadPool;
};
+//////////////////////////////////////////////////////////////////////////
+
class WorkerThreadPool
{
public:
- WorkerThreadPool(int InThreadCount);
+ explicit WorkerThreadPool(int InThreadCount);
+ WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName);
~WorkerThreadPool();
void ScheduleWork(Ref<IWork> Work);
void ScheduleWork(std::function<void()>&& Work);
- [[nodiscard]] size_t PendingWork() const;
+ template<typename Func>
+ auto EnqueueTask(std::packaged_task<Func> Task);
+
+ [[nodiscard]] size_t PendingWorkItemCount() const;
private:
- void WorkerThreadFunction();
+ struct Impl;
+ struct ThreadStartInfo;
- std::vector<std::thread> m_WorkerThreads;
- BlockingQueue<Ref<IWork>> m_WorkQueue;
+ std::unique_ptr<Impl> m_Impl;
};
+//////////////////////////////////////////////////////////////////////////
+
+template<typename Func>
+auto
+WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task)
+{
+ struct FutureWork : IWork
+ {
+ FutureWork(std::packaged_task<Func> Task) : m_Task{std::move(Task)} {}
+ virtual void Execute() override { m_Task(); }
+
+ std::packaged_task<Func> m_Task;
+ };
+
+ Ref<FutureWork> Work{new FutureWork(std::move(Task))};
+
+ auto Future = Work->m_Task.get_future();
+ ScheduleWork(std::move(Work));
+ return Future;
+}
+
+void workthreadpool_forcelink(); // internal
+
} // namespace zen
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp
index b4328cdbd..cc21e717a 100644
--- a/src/zencore/workthreadpool.cpp
+++ b/src/zencore/workthreadpool.cpp
@@ -2,7 +2,21 @@
#include <zencore/workthreadpool.h>
+#include <zencore/blockingqueue.h>
#include <zencore/logging.h>
+#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/thread.h>
+#include <zencore/trace.h>
+
+#include <thread>
+#include <vector>
+
+#define ZEN_USE_WINDOWS_THREADPOOL 1
+
+#if ZEN_PLATFORM_WINDOWS && ZEN_USE_WINDOWS_THREADPOOL
+# include <zencore/windows.h>
+#endif
namespace zen {
@@ -16,47 +30,143 @@ namespace detail {
};
} // namespace detail
-WorkerThreadPool::WorkerThreadPool(int InThreadCount)
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_USE_WINDOWS_THREADPOOL && ZEN_PLATFORM_WINDOWS
+
+namespace {
+ thread_local bool t_IsThreadNamed{false};
+}
+
+struct WorkerThreadPool::Impl
{
- for (int i = 0; i < InThreadCount; ++i)
+ PTP_POOL m_ThreadPool = nullptr;
+ PTP_CLEANUP_GROUP m_CleanupGroup = nullptr;
+ TP_CALLBACK_ENVIRON m_CallbackEnvironment;
+ PTP_WORK m_Work = nullptr;
+
+ std::string m_WorkerThreadBaseName;
+ std::atomic<int> m_WorkerThreadCounter{0};
+
+ RwLock m_QueueLock;
+ std::deque<Ref<IWork>> m_WorkQueue;
+
+ Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName)
{
- m_WorkerThreads.emplace_back(&WorkerThreadPool::WorkerThreadFunction, this);
+ // Thread pool setup
+
+ m_ThreadPool = CreateThreadpool(NULL);
+
+ SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount);
+ SetThreadpoolThreadMaximum(m_ThreadPool, InThreadCount * 2);
+
+ InitializeThreadpoolEnvironment(&m_CallbackEnvironment);
+
+ m_CleanupGroup = CreateThreadpoolCleanupGroup();
+
+ SetThreadpoolCallbackPool(&m_CallbackEnvironment, m_ThreadPool);
+ SetThreadpoolCallbackCleanupGroup(&m_CallbackEnvironment, m_CleanupGroup, NULL);
+
+ m_Work = CreateThreadpoolWork(&WorkCallback, this, &m_CallbackEnvironment);
}
-}
-WorkerThreadPool::~WorkerThreadPool()
-{
- m_WorkQueue.CompleteAdding();
+ ~Impl()
+ {
+ WaitForThreadpoolWorkCallbacks(m_Work, /* CancelPendingCallbacks */ TRUE);
+ CloseThreadpoolWork(m_Work);
+ }
- for (std::thread& Thread : m_WorkerThreads)
+ void ScheduleWork(Ref<IWork> Work)
{
- Thread.join();
+ m_QueueLock.WithExclusiveLock([&] { m_WorkQueue.push_back(std::move(Work)); });
+ SubmitThreadpoolWork(m_Work);
}
+ [[nodiscard]] size_t PendingWorkItemCount() const { return 0; }
- m_WorkerThreads.clear();
-}
+ static VOID CALLBACK WorkCallback(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work)
+ {
+ ZEN_UNUSED(Instance, Work);
+ Impl* ThisPtr = reinterpret_cast<Impl*>(Context);
+ ThisPtr->DoWork();
+ }
-void
-WorkerThreadPool::ScheduleWork(Ref<IWork> Work)
-{
- m_WorkQueue.Enqueue(std::move(Work));
-}
+ void DoWork()
+ {
+ if (!t_IsThreadNamed)
+ {
+ t_IsThreadNamed = true;
+ const int ThreadIndex = ++m_WorkerThreadCounter;
+ zen::ExtendableStringBuilder<128> ThreadName;
+ ThreadName << m_WorkerThreadBaseName << "_" << ThreadIndex;
+ SetCurrentThreadName(ThreadName);
+ }
-void
-WorkerThreadPool::ScheduleWork(std::function<void()>&& Work)
+ Ref<IWork> WorkFromQueue;
+
+ {
+ RwLock::ExclusiveLockScope _{m_QueueLock};
+ WorkFromQueue = std::move(m_WorkQueue.front());
+ m_WorkQueue.pop_front();
+ }
+
+ WorkFromQueue->Execute();
+ }
+};
+
+#else
+
+struct WorkerThreadPool::ThreadStartInfo
{
- m_WorkQueue.Enqueue(Ref<IWork>(new detail::LambdaWork(Work)));
-}
+ int ThreadNumber;
+ zen::Latch* Latch;
+};
-[[nodiscard]] size_t
-WorkerThreadPool::PendingWork() const
+struct WorkerThreadPool::Impl
{
- return m_WorkQueue.Size();
-}
+ void WorkerThreadFunction(ThreadStartInfo Info);
+ std::string m_WorkerThreadBaseName;
+ std::vector<std::thread> m_WorkerThreads;
+ BlockingQueue<Ref<IWork>> m_WorkQueue;
+
+ Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName)
+ {
+ trace::ThreadGroupBegin(m_WorkerThreadBaseName.c_str());
+
+ zen::Latch WorkerLatch{InThreadCount};
+
+ for (int i = 0; i < InThreadCount; ++i)
+ {
+ m_WorkerThreads.emplace_back(&Impl::WorkerThreadFunction, this, ThreadStartInfo{i + 1, &WorkerLatch});
+ }
+
+ WorkerLatch.Wait();
+
+ trace::ThreadGroupEnd();
+ }
+
+ ~Impl()
+ {
+ m_WorkQueue.CompleteAdding();
+
+ for (std::thread& Thread : m_WorkerThreads)
+ {
+ Thread.join();
+ }
+
+ m_WorkerThreads.clear();
+ }
+
+ void ScheduleWork(Ref<IWork> Work) { m_WorkQueue.Enqueue(std::move(Work)); }
+ [[nodiscard]] size_t PendingWorkItemCount() const { return m_WorkQueue.Size(); }
+};
void
-WorkerThreadPool::WorkerThreadFunction()
+WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info)
{
+ SetCurrentThreadName(fmt::format("{}_{}", m_WorkerThreadBaseName, Info.ThreadNumber));
+
+ Info.Latch->CountDown();
+
do
{
Ref<IWork> Work;
@@ -80,4 +190,65 @@ WorkerThreadPool::WorkerThreadFunction()
} while (true);
}
+#endif
+
+//////////////////////////////////////////////////////////////////////////
+WorkerThreadPool::WorkerThreadPool(int InThreadCount) : WorkerThreadPool(InThreadCount, "workerthread")
+{
+}
+
+WorkerThreadPool::WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName)
+{
+ m_Impl = std::make_unique<Impl>(InThreadCount, WorkerThreadBaseName);
+}
+
+WorkerThreadPool::~WorkerThreadPool()
+{
+ m_Impl.reset();
+}
+
+void
+WorkerThreadPool::ScheduleWork(Ref<IWork> Work)
+{
+ m_Impl->ScheduleWork(std::move(Work));
+}
+
+void
+WorkerThreadPool::ScheduleWork(std::function<void()>&& Work)
+{
+ ScheduleWork(Ref<IWork>(new detail::LambdaWork(Work)));
+}
+
+[[nodiscard]] size_t
+WorkerThreadPool::PendingWorkItemCount() const
+{
+ return m_Impl->PendingWorkItemCount();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+void
+workthreadpool_forcelink()
+{
+}
+
+using namespace std::literals;
+
+TEST_CASE("threadpool.basic")
+{
+ WorkerThreadPool Threadpool{1};
+
+ auto Future42 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 42; }});
+ auto Future99 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 99; }});
+ auto FutureThrow = Threadpool.EnqueueTask(std::packaged_task<void()>{[] { throw std::runtime_error("meep!"); }});
+
+ CHECK_EQ(Future42.get(), 42);
+ CHECK_EQ(Future99.get(), 99);
+ CHECK_THROWS(FutureThrow.get());
+}
+
+#endif
+
} // namespace zen
diff --git a/src/zenhttp/httpsys.cpp b/src/zenhttp/httpsys.cpp
index 25e4393b3..979f69aeb 100644
--- a/src/zenhttp/httpsys.cpp
+++ b/src/zenhttp/httpsys.cpp
@@ -178,7 +178,7 @@ class HttpSysServerRequest : public HttpServerRequest
{
public:
HttpSysServerRequest(HttpSysTransaction& Tx, HttpService& Service, IoBuffer PayloadBuffer);
- ~HttpSysServerRequest() = default;
+ ~HttpSysServerRequest();
virtual Oid ParseSessionId() const override;
virtual uint32_t ParseRequestId() const override;
@@ -722,7 +722,7 @@ HttpSysServer::HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThr
: m_Log(logging::Get("http"))
, m_RequestLog(logging::Get("http_requests"))
, m_ThreadPool(ThreadCount)
-, m_AsyncWorkPool(AsyncWorkThreadCount)
+, m_AsyncWorkPool(AsyncWorkThreadCount, "http_async")
{
ULONG Result = HttpInitialize(HTTPAPI_VERSION_2, HTTP_INITIALIZE_SERVER, nullptr);
@@ -1093,6 +1093,9 @@ HttpSysTransaction::IssueInitialRequest(std::error_code& ErrorCode)
m_InitialHttpHandler.IssueRequest(ErrorCode);
}
+thread_local bool t_IsHttpSysThreadNamed = false;
+static std::atomic<int> HttpSysThreadIndex = 0;
+
void
HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance,
PVOID pContext /* HttpSysServer */,
@@ -1105,6 +1108,17 @@ HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance,
UNREFERENCED_PARAMETER(Instance);
UNREFERENCED_PARAMETER(pContext);
+ // Assign names to threads for context
+
+ if (!t_IsHttpSysThreadNamed)
+ {
+ t_IsHttpSysThreadNamed = true;
+ const int ThreadIndex = ++HttpSysThreadIndex;
+ zen::ExtendableStringBuilder<128> ThreadName;
+ ThreadName << "httpio_" << ThreadIndex;
+ SetCurrentThreadName(ThreadName);
+ }
+
// Note that for a given transaction we may be in this completion function on more
// than one thread at any given moment. This means we need to be careful about what
// happens in here
@@ -1306,6 +1320,10 @@ HttpSysServerRequest::HttpSysServerRequest(HttpSysTransaction& Tx, HttpService&
}
}
+HttpSysServerRequest::~HttpSysServerRequest()
+{
+}
+
Oid
HttpSysServerRequest::ParseSessionId() const
{
diff --git a/src/zenserver/upstream/upstreamapply.cpp b/src/zenserver/upstream/upstreamapply.cpp
index c719b225d..3d29f2228 100644
--- a/src/zenserver/upstream/upstreamapply.cpp
+++ b/src/zenserver/upstream/upstreamapply.cpp
@@ -188,9 +188,9 @@ public:
virtual void GetStatus(CbObjectWriter& Status) override
{
Status << "upstream_worker_threads" << m_Options.UpstreamThreadCount;
- Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWork();
+ Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWorkItemCount();
Status << "downstream_worker_threads" << m_Options.DownstreamThreadCount;
- Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWork();
+ Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWorkItemCount();
Status.BeginArray("endpoints");
for (const auto& Ep : m_Endpoints)
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp
index 245097796..2a9e67c06 100644
--- a/src/zenserver/upstream/upstreamcache.cpp
+++ b/src/zenserver/upstream/upstreamcache.cpp
@@ -1490,7 +1490,7 @@ public:
{
for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
{
- m_UpstreamThreads.emplace_back(&UpstreamCacheImpl::ProcessUpstreamQueue, this);
+ m_UpstreamThreads.emplace_back(&UpstreamCacheImpl::ProcessUpstreamQueue, this, Idx + 1);
}
m_EndpointMonitorThread = std::thread(&UpstreamCacheImpl::MonitorEndpoints, this);
@@ -1947,8 +1947,11 @@ private:
}
}
- void ProcessUpstreamQueue()
+ void ProcessUpstreamQueue(int ThreadIndex)
{
+ std::string ThreadName = fmt::format("upstream_{}", ThreadIndex);
+ SetCurrentThreadName(ThreadName);
+
for (;;)
{
UpstreamCacheRecord CacheRecord;