diff options
| author | Stefan Boberg <[email protected]> | 2026-02-20 15:01:42 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2026-02-20 15:01:42 +0100 |
| commit | a024a6427f64c455a71edc507e43388265700764 (patch) | |
| tree | 0419f0010020a096f0b3ad63cd0aed747b82fc06 | |
| parent | added new I/O thread pool implementation (diff) | |
| download | zen-a024a6427f64c455a71edc507e43388265700764.tar.xz zen-a024a6427f64c455a71edc507e43388265700764.zip | |
made http.sys async worker thread pool configurable via the same option as the main I/O pool
this is also to gain control over threads to improve shutdown behaviour
| -rw-r--r-- | src/zencore/include/zencore/workthreadpool.h | 4 | ||||
| -rw-r--r-- | src/zencore/workthreadpool.cpp | 69 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpsys.cpp | 3 |
3 files changed, 47 insertions, 29 deletions
diff --git a/src/zencore/include/zencore/workthreadpool.h b/src/zencore/include/zencore/workthreadpool.h index 4c38dd651..09c4903d6 100644 --- a/src/zencore/include/zencore/workthreadpool.h +++ b/src/zencore/include/zencore/workthreadpool.h @@ -27,8 +27,8 @@ private: class WorkerThreadPool { public: - explicit WorkerThreadPool(int InThreadCount); - WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName); + explicit WorkerThreadPool(int InThreadCount, bool UseExplicitThreads = false); + WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName, bool UseExplicitThreads = false); ~WorkerThreadPool(); // Decides what to do if there are no free workers in the pool when the work is submitted diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index cb84bbe06..df24d4185 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -18,9 +18,7 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <gsl/gsl-lite.hpp> ZEN_THIRD_PARTY_INCLUDES_END -#define ZEN_USE_WINDOWS_THREADPOOL 1 - -#if ZEN_PLATFORM_WINDOWS && ZEN_USE_WINDOWS_THREADPOOL +#if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> #endif @@ -38,13 +36,21 @@ namespace detail { ////////////////////////////////////////////////////////////////////////// -#if ZEN_USE_WINDOWS_THREADPOOL && ZEN_PLATFORM_WINDOWS +struct WorkerThreadPool::Impl +{ + virtual ~Impl() = default; + [[nodiscard]] virtual Ref<IWork> ScheduleWork(Ref<IWork> Work, WorkerThreadPool::EMode Mode) = 0; +}; + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_PLATFORM_WINDOWS namespace { thread_local bool t_IsThreadNamed{false}; } -struct WorkerThreadPool::Impl +struct WinTpImpl : WorkerThreadPool::Impl { const int m_ThreadCount = 0; PTP_POOL m_ThreadPool = nullptr; @@ -59,7 +65,7 @@ struct WorkerThreadPool::Impl mutable RwLock m_QueueLock; std::deque<Ref<IWork>> m_WorkQueue; - Impl(int InThreadCount, std::string_view WorkerThreadBaseName) + WinTpImpl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_ThreadCount(InThreadCount) , m_WorkerThreadBaseName(WorkerThreadBaseName) , m_FreeWorkerCount(m_ThreadCount) @@ -96,14 +102,14 @@ struct WorkerThreadPool::Impl } } - ~Impl() + ~WinTpImpl() override { WaitForThreadpoolWorkCallbacks(m_Work, /* CancelPendingCallbacks */ TRUE); CloseThreadpoolWork(m_Work); CloseThreadpool(m_ThreadPool); } - [[nodiscard]] Ref<IWork> ScheduleWork(Ref<IWork> Work, WorkerThreadPool::EMode Mode) + [[nodiscard]] Ref<IWork> ScheduleWork(Ref<IWork> Work, WorkerThreadPool::EMode Mode) override { if (Mode == WorkerThreadPool::EMode::DisableBacklog) { @@ -130,7 +136,7 @@ struct WorkerThreadPool::Impl static VOID CALLBACK WorkCallback(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work) { ZEN_UNUSED(Instance, Work); - Impl* ThisPtr = reinterpret_cast<Impl*>(Context); + WinTpImpl* ThisPtr = reinterpret_cast<WinTpImpl*>(Context); ThisPtr->DoWork(); } @@ -175,7 +181,9 @@ struct WorkerThreadPool::Impl } }; -#else +#endif + +////////////////////////////////////////////////////////////////////////// struct WorkerThreadPool::ThreadStartInfo { @@ -183,39 +191,39 @@ struct WorkerThreadPool::ThreadStartInfo zen::Latch* Latch; }; -struct WorkerThreadPool::Impl +struct ExplicitImpl : WorkerThreadPool::Impl { const int m_ThreadCount = 0; - void WorkerThreadFunction(ThreadStartInfo Info); + void WorkerThreadFunction(WorkerThreadPool::ThreadStartInfo Info); std::string m_WorkerThreadBaseName; std::vector<std::thread> m_WorkerThreads; BlockingQueue<Ref<IWork>> m_WorkQueue; std::atomic<int> m_FreeWorkerCount{0}; - Impl(int InThreadCount, std::string_view WorkerThreadBaseName) + ExplicitImpl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_ThreadCount(InThreadCount) , m_WorkerThreadBaseName(WorkerThreadBaseName) , m_FreeWorkerCount(m_ThreadCount) { -# if ZEN_WITH_TRACE +#if ZEN_WITH_TRACE trace::ThreadGroupBegin(m_WorkerThreadBaseName.c_str()); -# endif +#endif zen::Latch WorkerLatch{m_ThreadCount}; for (int i = 0; i < m_ThreadCount; ++i) { - m_WorkerThreads.emplace_back(&Impl::WorkerThreadFunction, this, ThreadStartInfo{i + 1, &WorkerLatch}); + m_WorkerThreads.emplace_back(&ExplicitImpl::WorkerThreadFunction, this, WorkerThreadPool::ThreadStartInfo{i + 1, &WorkerLatch}); } WorkerLatch.Wait(); -# if ZEN_WITH_TRACE +#if ZEN_WITH_TRACE trace::ThreadGroupEnd(); -# endif +#endif } - ~Impl() + ~ExplicitImpl() override { m_WorkQueue.CompleteAdding(); @@ -230,7 +238,7 @@ struct WorkerThreadPool::Impl m_WorkerThreads.clear(); } - [[nodiscard]] Ref<IWork> ScheduleWork(Ref<IWork> Work, WorkerThreadPool::EMode Mode) + [[nodiscard]] Ref<IWork> ScheduleWork(Ref<IWork> Work, WorkerThreadPool::EMode Mode) override { if (Mode == WorkerThreadPool::EMode::DisableBacklog) { @@ -250,7 +258,7 @@ struct WorkerThreadPool::Impl }; void -WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info) +ExplicitImpl::WorkerThreadFunction(WorkerThreadPool::ThreadStartInfo Info) { SetCurrentThreadName(fmt::format("{}_{}", m_WorkerThreadBaseName, Info.ThreadNumber)); @@ -288,18 +296,27 @@ WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info) } while (true); } -#endif - ////////////////////////////////////////////////////////////////////////// -WorkerThreadPool::WorkerThreadPool(int InThreadCount) : WorkerThreadPool(InThreadCount, "workerthread") +WorkerThreadPool::WorkerThreadPool(int InThreadCount, bool UseExplicitThreads) +: WorkerThreadPool(InThreadCount, "workerthread", UseExplicitThreads) { } -WorkerThreadPool::WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName) +WorkerThreadPool::WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName, bool UseExplicitThreads) { if (InThreadCount > 0) { - m_Impl = std::make_unique<Impl>(InThreadCount, WorkerThreadBaseName); +#if ZEN_PLATFORM_WINDOWS + if (!UseExplicitThreads) + { + m_Impl = std::make_unique<WinTpImpl>(InThreadCount, WorkerThreadBaseName); + } + else +#endif + { + ZEN_UNUSED(UseExplicitThreads); + m_Impl = std::make_unique<ExplicitImpl>(InThreadCount, WorkerThreadBaseName); + } } } diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index 6809c280a..4406d0619 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -1332,7 +1332,8 @@ HttpSysServer::WorkPool() if (!m_AsyncWorkPool) { - m_AsyncWorkPool = new WorkerThreadPool(m_InitialConfig.AsyncWorkThreadCount, "http_async"); + m_AsyncWorkPool = + new WorkerThreadPool(m_InitialConfig.AsyncWorkThreadCount, "http_async", m_InitialConfig.UseExplicitIoThreadPool); } } |