diff options
| author | Stefan Boberg <[email protected]> | 2026-02-20 14:43:32 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2026-02-20 14:43:32 +0100 |
| commit | c7d033755c1566f7908e3a66bced6b3c3d062d70 (patch) | |
| tree | e0c46b7fcd5599720a9194830b2c229bc6738dd3 | |
| parent | update .gitignore to exclude .claude/ (diff) | |
| download | zen-c7d033755c1566f7908e3a66bced6b3c3d062d70.tar.xz zen-c7d033755c1566f7908e3a66bced6b3c3d062d70.zip | |
added new I/O thread pool implementation
the new variant manages a dynamically growing/shrinking set of threads manually instead of relying on the built-in Windows thread pool
the benefit of this is that we're in charge of setup and teardown so can make better guarantees about lifetimes of threads which can help with shutdown issues
| -rw-r--r-- | src/zenhttp/httpserver.cpp | 3 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpserver.h | 1 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpsys.cpp | 53 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpsys.h | 1 | ||||
| -rw-r--r-- | src/zenhttp/servers/iothreadpool.cpp | 243 | ||||
| -rw-r--r-- | src/zenhttp/servers/iothreadpool.h | 90 | ||||
| -rw-r--r-- | src/zenserver/config/config.cpp | 10 |
7 files changed, 373 insertions, 28 deletions
diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp index f2fe4738f..761665c30 100644 --- a/src/zenhttp/httpserver.cpp +++ b/src/zenhttp/httpserver.cpp @@ -1095,7 +1095,8 @@ CreateHttpServerClass(const std::string_view ServerClass, const HttpServerConfig .IsAsyncResponseEnabled = Config.HttpSys.IsAsyncResponseEnabled, .IsRequestLoggingEnabled = Config.HttpSys.IsRequestLoggingEnabled, .IsDedicatedServer = Config.IsDedicatedServer, - .ForceLoopback = Config.ForceLoopback})); + .ForceLoopback = Config.ForceLoopback, + .UseExplicitIoThreadPool = Config.HttpSys.UseExplicitIoThreadPool})); } #endif else if (ServerClass == "null"sv) diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h index 350532126..7887beacd 100644 --- a/src/zenhttp/include/zenhttp/httpserver.h +++ b/src/zenhttp/include/zenhttp/httpserver.h @@ -249,6 +249,7 @@ struct HttpServerConfig unsigned int AsyncWorkThreadCount = 0; bool IsAsyncResponseEnabled = true; bool IsRequestLoggingEnabled = false; + bool UseExplicitIoThreadPool = false; } HttpSys; }; diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index c640ba90b..6809c280a 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -128,6 +128,7 @@ private: bool m_IsAsyncResponseEnabled = true; std::unique_ptr<WinIoThreadPool> m_IoThreadPool; + bool m_IoThreadPoolIsWinTp = true; RwLock m_AsyncWorkPoolInitLock; WorkerThreadPool* m_AsyncWorkPool = nullptr; @@ -374,7 +375,8 @@ public: void IssueInitialRequest(std::error_code& ErrorCode); bool IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler); - PTP_IO Iocp(); + void StartIo(); + void CancelIo(); HANDLE RequestQueueHandle(); inline OVERLAPPED* Overlapped() { return &m_HttpOverlapped; } inline HttpSysServer& Server() { return m_HttpServer; } @@ -614,9 +616,8 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) ZEN_TRACE_CPU("httpsys::Response::IssueRequest"); HttpSysTransaction& Tx = Transaction(); HTTP_REQUEST* const HttpReq = Tx.HttpRequest(); - PTP_IO const Iocp = Tx.Iocp(); - StartThreadpoolIo(Iocp); + Tx.StartIo(); // Split payload into batches to play well with the underlying API @@ -822,7 +823,7 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) // An error occurred, no completion will be posted to IOCP - CancelThreadpoolIo(Iocp); + Tx.CancelIo(); // Emit diagnostics @@ -1001,7 +1002,8 @@ HttpSysServer::HttpSysServer(const HttpSysConfig& InConfig) MaxThreadCount *= 2; } - m_IoThreadPool = std::make_unique<WinIoThreadPool>(MinThreadCount, MaxThreadCount); + m_IoThreadPoolIsWinTp = !m_InitialConfig.UseExplicitIoThreadPool; + m_IoThreadPool = WinIoThreadPool::Create(!m_IoThreadPoolIsWinTp, MinThreadCount, MaxThreadCount); if (m_InitialConfig.AsyncWorkThreadCount == 0) { @@ -1018,10 +1020,11 @@ HttpSysServer::HttpSysServer(const HttpSysConfig& InConfig) m_IsHttpInitialized = true; m_IsOk = true; - ZEN_INFO("http.sys server started in {} mode, using {}-{} I/O threads and {} async worker threads", + ZEN_INFO("http.sys server started in {} mode, using {}-{} I/O threads ({}) and {} async worker threads", m_InitialConfig.IsDedicatedServer ? "DEDICATED" : "NORMAL", MinThreadCount, MaxThreadCount, + m_InitialConfig.UseExplicitIoThreadPool ? "explicit IOCP" : "Windows Thread Pool", m_InitialConfig.AsyncWorkThreadCount); } @@ -1497,10 +1500,16 @@ HttpSysTransaction::~HttpSysTransaction() { } -PTP_IO -HttpSysTransaction::Iocp() +void +HttpSysTransaction::StartIo() +{ + m_HttpServer.m_IoThreadPool->StartIo(); +} + +void +HttpSysTransaction::CancelIo() { - return m_HttpServer.m_IoThreadPool->Iocp(); + m_HttpServer.m_IoThreadPool->CancelIo(); } HANDLE @@ -1521,7 +1530,6 @@ static std::atomic<int> HttpSysThreadIndex = 0; static void NameCurrentHttpSysThread() { - t_IsHttpSysThreadNamed = true; const int ThreadIndex = ++HttpSysThreadIndex; zen::ExtendableStringBuilder<16> ThreadName; ThreadName << "httpio_" << ThreadIndex; @@ -1538,19 +1546,25 @@ HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance, { ZEN_UNUSED(Io, Instance); - // Assign names to threads for context - - if (!t_IsHttpSysThreadNamed) - { - NameCurrentHttpSysThread(); - } - // Note that for a given transaction we may be in this completion function on more // than one thread at any given moment. This means we need to be careful about what // happens in here HttpSysTransaction* Transaction = CONTAINING_RECORD(pOverlapped, HttpSysTransaction, m_HttpOverlapped); + // Assign names to threads for context (only needed when using Windows' native + // thread pool) + + if (Transaction->Server().m_IoThreadPoolIsWinTp) + { + if (!t_IsHttpSysThreadNamed) + { + t_IsHttpSysThreadNamed = true; + + NameCurrentHttpSysThread(); + } + } + if (Transaction->HandleCompletion(IoResult, NumberOfBytesTransferred) == HttpSysTransaction::Status::kDone) { delete Transaction; @@ -2020,10 +2034,9 @@ InitialRequestHandler::IssueRequest(std::error_code& ErrorCode) ZEN_TRACE_CPU("httpsys::Request::IssueRequest"); HttpSysTransaction& Tx = Transaction(); - PTP_IO Iocp = Tx.Iocp(); HTTP_REQUEST* HttpReq = Tx.HttpRequest(); - StartThreadpoolIo(Iocp); + Tx.StartIo(); ULONG HttpApiResult; @@ -2057,7 +2070,7 @@ InitialRequestHandler::IssueRequest(std::error_code& ErrorCode) if (HttpApiResult != ERROR_IO_PENDING && HttpApiResult != NO_ERROR) { - CancelThreadpoolIo(Iocp); + Tx.CancelIo(); ErrorCode = MakeErrorCode(HttpApiResult); diff --git a/src/zenhttp/servers/httpsys.h b/src/zenhttp/servers/httpsys.h index b2fe7475b..4ff6df1fa 100644 --- a/src/zenhttp/servers/httpsys.h +++ b/src/zenhttp/servers/httpsys.h @@ -22,6 +22,7 @@ struct HttpSysConfig bool IsRequestLoggingEnabled = false; bool IsDedicatedServer = false; bool ForceLoopback = false; + bool UseExplicitIoThreadPool = false; }; Ref<HttpServer> CreateHttpSysServer(HttpSysConfig Config); diff --git a/src/zenhttp/servers/iothreadpool.cpp b/src/zenhttp/servers/iothreadpool.cpp index e941606e2..d180f17f8 100644 --- a/src/zenhttp/servers/iothreadpool.cpp +++ b/src/zenhttp/servers/iothreadpool.cpp @@ -3,12 +3,35 @@ #include "iothreadpool.h" #include <zencore/except.h> +#include <zencore/logging.h> +#include <zencore/thread.h> #if ZEN_PLATFORM_WINDOWS +# include <thread> + namespace zen { -WinIoThreadPool::WinIoThreadPool(int InThreadCount, int InMaxThreadCount) +////////////////////////////////////////////////////////////////////////// +// Factory + +std::unique_ptr<WinIoThreadPool> +WinIoThreadPool::Create(bool UseExplicitThreads, int MinThreads, int MaxThreads) +{ + if (UseExplicitThreads) + { + return std::make_unique<ExplicitIoThreadPool>(MinThreads, MaxThreads); + } + else + { + return std::make_unique<WinTpIoThreadPool>(MinThreads, MaxThreads); + } +} + +////////////////////////////////////////////////////////////////////////// +// WinTpIoThreadPool - Windows Thread Pool implementation + +WinTpIoThreadPool::WinTpIoThreadPool(int InThreadCount, int InMaxThreadCount) { ZEN_ASSERT(InThreadCount); @@ -31,7 +54,7 @@ WinIoThreadPool::WinIoThreadPool(int InThreadCount, int InMaxThreadCount) SetThreadpoolCallbackCleanupGroup(&m_CallbackEnvironment, m_CleanupGroup, NULL); } -WinIoThreadPool::~WinIoThreadPool() +WinTpIoThreadPool::~WinTpIoThreadPool() { // this will wait for all callbacks to complete and tear down the `CreateThreadpoolIo` // object and release all related objects @@ -42,7 +65,7 @@ WinIoThreadPool::~WinIoThreadPool() } void -WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) +WinTpIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) { ZEN_ASSERT(!m_ThreadPoolIo); @@ -54,6 +77,220 @@ WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, voi } } +void +WinTpIoThreadPool::StartIo() +{ + StartThreadpoolIo(m_ThreadPoolIo); +} + +void +WinTpIoThreadPool::CancelIo() +{ + CancelThreadpoolIo(m_ThreadPoolIo); +} + +////////////////////////////////////////////////////////////////////////// +// ExplicitIoThreadPool - Raw IOCP + std::thread with load-based scaling + +static LoggerRef +ExplicitIoPoolLog() +{ + static LoggerRef s_Log = logging::Get("iopool"); + return s_Log; +} + +ExplicitIoThreadPool::ExplicitIoThreadPool(int InMinThreadCount, int InMaxThreadCount) +: m_MinThreads(InMinThreadCount) +, m_MaxThreads(InMaxThreadCount) +{ + ZEN_ASSERT(InMinThreadCount > 0); + + if (m_MaxThreads < m_MinThreads) + { + m_MaxThreads = m_MinThreads; + } + + m_Iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + + if (!m_Iocp) + { + ZEN_LOG_ERROR(ExplicitIoPoolLog(), "failed to create I/O completion port: {}", GetLastError()); + } +} + +ExplicitIoThreadPool::~ExplicitIoThreadPool() +{ + m_ShuttingDown.store(true, std::memory_order::release); + + // Post poison-pill completions to wake all threads + const int ThreadCount = m_TotalThreads.load(std::memory_order::acquire); + for (int i = 0; i < ThreadCount; ++i) + { + PostQueuedCompletionStatus(m_Iocp, 0, 0, nullptr); + } + + // Join all threads + { + RwLock::ExclusiveLockScope _(m_ThreadListLock); + for (auto& Thread : m_Threads) + { + if (Thread.joinable()) + { + Thread.join(); + } + } + m_Threads.clear(); + } + + if (m_Iocp) + { + CloseHandle(m_Iocp); + m_Iocp = nullptr; + } +} + +void +ExplicitIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) +{ + ZEN_ASSERT(m_Iocp); + ZEN_ASSERT(!m_Callback); + + m_Callback = Callback; + m_Context = Context; + + // Associate the I/O handle with our completion port + HANDLE Result = CreateIoCompletionPort(IoHandle, m_Iocp, /* CompletionKey */ 0, 0); + + if (!Result) + { + ErrorCode = MakeErrorCodeFromLastError(); + return; + } + + // Now spawn the initial worker threads + for (int i = 0; i < m_MinThreads; ++i) + { + SpawnWorkerThread(); + } +} + +void +ExplicitIoThreadPool::StartIo() +{ + // No-op for raw IOCP - completions are posted automatically +} + +void +ExplicitIoThreadPool::CancelIo() +{ + // No-op for raw IOCP - completions are posted automatically +} + +void +ExplicitIoThreadPool::SpawnWorkerThread() +{ + RwLock::ExclusiveLockScope _(m_ThreadListLock); + + ++m_TotalThreads; + m_Threads.emplace_back([this] { WorkerThreadMain(); }); +} + +void +ExplicitIoThreadPool::WorkerThreadMain() +{ + static std::atomic<int> s_ThreadIndex{0}; + const int ThreadIndex = ++s_ThreadIndex; + ExtendableStringBuilder<16> ThreadName; + ThreadName << "xpio_" << ThreadIndex; + SetCurrentThreadName(ThreadName); + + static constexpr DWORD kIdleTimeoutMs = 15000; + + while (!m_ShuttingDown.load(std::memory_order::acquire)) + { + DWORD BytesTransferred = 0; + ULONG_PTR CompletionKey = 0; + OVERLAPPED* pOverlapped = nullptr; + + BOOL Success = GetQueuedCompletionStatus(m_Iocp, &BytesTransferred, &CompletionKey, &pOverlapped, kIdleTimeoutMs); + + if (m_ShuttingDown.load(std::memory_order::acquire)) + { + break; + } + + if (!Success && !pOverlapped) + { + DWORD Error = GetLastError(); + + if (Error == WAIT_TIMEOUT) + { + // Timeout - consider scaling down + const int CurrentTotal = m_TotalThreads.load(std::memory_order::acquire); + if (CurrentTotal > m_MinThreads) + { + // Try to claim this thread for exit by decrementing the count. + // Use CAS to avoid thundering herd of exits. + int Expected = CurrentTotal; + if (m_TotalThreads.compare_exchange_strong(Expected, CurrentTotal - 1, std::memory_order::acq_rel)) + { + ZEN_LOG_DEBUG(ExplicitIoPoolLog(), + "scaling down I/O thread (idle timeout), {} threads remaining", + CurrentTotal - 1); + return; // Thread exits + } + } + continue; + } + + // Some other error with no overlapped - unexpected + ZEN_LOG_WARN(ExplicitIoPoolLog(), "GetQueuedCompletionStatus failed with error {}", Error); + continue; + } + + if (!pOverlapped) + { + // Poison pill (null overlapped) - shutdown signal + break; + } + + // Got a real completion - determine the I/O result + ULONG IoResult = NO_ERROR; + if (!Success) + { + IoResult = GetLastError(); + } + + // Track active threads for scale-up decisions + const int ActiveBefore = m_ActiveCount.fetch_add(1, std::memory_order::acq_rel); + const int TotalNow = m_TotalThreads.load(std::memory_order::acquire); + + // Scale up: if all threads are now busy and we haven't hit the max, spawn another + if ((ActiveBefore + 1) >= TotalNow && TotalNow < m_MaxThreads) + { + // Use CAS to ensure only one thread triggers the scale-up + int Expected = TotalNow; + if (m_TotalThreads.compare_exchange_strong(Expected, TotalNow + 1, std::memory_order::acq_rel)) + { + ZEN_LOG_DEBUG(ExplicitIoPoolLog(), "scaling up I/O thread pool, {} -> {} threads", TotalNow, TotalNow + 1); + + // Spawn outside the hot path - but we need the thread list lock + // We already incremented m_TotalThreads, so do the actual spawn + { + RwLock::ExclusiveLockScope _(m_ThreadListLock); + m_Threads.emplace_back([this] { WorkerThreadMain(); }); + } + } + } + + // Invoke the callback with the same signature as PTP_WIN32_IO_CALLBACK + // Parameters: Instance, Context, Overlapped, IoResult, NumberOfBytesTransferred, Io + m_Callback(nullptr, m_Context, pOverlapped, IoResult, BytesTransferred, nullptr); + + m_ActiveCount.fetch_sub(1, std::memory_order::release); + } +} + } // namespace zen #endif diff --git a/src/zenhttp/servers/iothreadpool.h b/src/zenhttp/servers/iothreadpool.h index e75e95e58..e2c15ba76 100644 --- a/src/zenhttp/servers/iothreadpool.h +++ b/src/zenhttp/servers/iothreadpool.h @@ -4,21 +4,66 @@ #include <zencore/zencore.h> +#include <zencore/thread.h> + #if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> +# include <memory> # include <system_error> namespace zen { +/** + * @brief Abstract base class for I/O thread pools used by the http.sys server. + * + * Two implementations are available: + * - WinTpIoThreadPool: Uses the Windows Thread Pool API (default, current behavior) + * - ExplicitIoThreadPool: Uses raw IOCP + std::thread with load-based scaling + */ class WinIoThreadPool { public: - WinIoThreadPool(int InThreadCount, int InMaxThreadCount); - ~WinIoThreadPool(); + virtual ~WinIoThreadPool() = default; + + /** + * @brief Bind an I/O handle to this pool with the given callback + */ + virtual void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) = 0; + + /** + * @brief Called before issuing an async I/O operation. + * The Windows TP implementation calls StartThreadpoolIo; the explicit implementation is a no-op. + */ + virtual void StartIo() = 0; - void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode); - inline PTP_IO Iocp() const { return m_ThreadPoolIo; } + /** + * @brief Called when an async I/O operation fails synchronously (to undo StartIo). + * The Windows TP implementation calls CancelThreadpoolIo; the explicit implementation is a no-op. + */ + virtual void CancelIo() = 0; + + /** + * @brief Factory method to create an I/O thread pool + * @param UseExplicitThreads If true, creates an ExplicitIoThreadPool; otherwise creates a WinTpIoThreadPool + * @param MinThreads Minimum number of threads + * @param MaxThreads Maximum number of threads + */ + static std::unique_ptr<WinIoThreadPool> Create(bool UseExplicitThreads, int MinThreads, int MaxThreads); +}; + +/** + * @brief Windows Thread Pool implementation (wraps CreateThreadpool/CreateThreadpoolIo) + */ +class WinTpIoThreadPool : public WinIoThreadPool +{ +public: + WinTpIoThreadPool(int InThreadCount, int InMaxThreadCount); + ~WinTpIoThreadPool(); + + virtual void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) override; + virtual void StartIo() override; + virtual void CancelIo() override; private: PTP_POOL m_ThreadPool = nullptr; @@ -27,5 +72,42 @@ private: TP_CALLBACK_ENVIRON m_CallbackEnvironment; }; +/** + * @brief Explicit IOCP + std::thread implementation with load-based thread scaling + * + * Creates a raw I/O completion port and manages worker threads directly. Threads + * scale up when all are busy servicing completions, and scale down when idle for + * an extended period. + */ +class ExplicitIoThreadPool : public WinIoThreadPool +{ +public: + ExplicitIoThreadPool(int InMinThreadCount, int InMaxThreadCount); + ~ExplicitIoThreadPool(); + + virtual void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) override; + virtual void StartIo() override; + virtual void CancelIo() override; + +private: + void WorkerThreadMain(); + void SpawnWorkerThread(); + + HANDLE m_Iocp = nullptr; + + PTP_WIN32_IO_CALLBACK m_Callback = nullptr; + void* m_Context = nullptr; + + int m_MinThreads; + int m_MaxThreads; + + std::atomic<int> m_TotalThreads{0}; + std::atomic<int> m_ActiveCount{0}; + std::atomic<bool> m_ShuttingDown{false}; + + RwLock m_ThreadListLock; + std::vector<std::thread> m_Threads; +}; + } // namespace zen #endif diff --git a/src/zenserver/config/config.cpp b/src/zenserver/config/config.cpp index e36352dae..138700df3 100644 --- a/src/zenserver/config/config.cpp +++ b/src/zenserver/config/config.cpp @@ -159,6 +159,9 @@ ZenServerConfiguratorBase::AddCommonConfigOptions(LuaConfig::Options& LuaOptions LuaOptions.AddOption("network.httpsys.requestlogging"sv, ServerOptions.HttpConfig.HttpSys.IsRequestLoggingEnabled, "httpsys-enable-request-logging"sv); + LuaOptions.AddOption("network.httpsys.explicitiopool"sv, + ServerOptions.HttpConfig.HttpSys.UseExplicitIoThreadPool, + "httpsys-explicit-iopool"sv); #endif #if ZEN_WITH_TRACE @@ -330,6 +333,13 @@ ZenServerCmdLineOptions::AddCliOptions(cxxopts::Options& options, ZenServerConfi "Enables Httpsys request logging", cxxopts::value<bool>(ServerOptions.HttpConfig.HttpSys.IsRequestLoggingEnabled), "<httpsys request logging>"); + + options.add_option("httpsys", + "", + "httpsys-explicit-iopool", + "Use explicit IOCP thread pool instead of Windows Thread Pool", + cxxopts::value<bool>(ServerOptions.HttpConfig.HttpSys.UseExplicitIoThreadPool)->default_value("false"), + "<explicit iopool>"); #endif options.add_option("network", |