diff options
Diffstat (limited to 'src/zenhttp')
| -rw-r--r-- | src/zenhttp/httpserver.cpp | 3 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpserver.h | 1 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpsys.cpp | 53 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpsys.h | 1 | ||||
| -rw-r--r-- | src/zenhttp/servers/iothreadpool.cpp | 243 | ||||
| -rw-r--r-- | src/zenhttp/servers/iothreadpool.h | 90 |
6 files changed, 363 insertions, 28 deletions
diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp index f2fe4738f..761665c30 100644 --- a/src/zenhttp/httpserver.cpp +++ b/src/zenhttp/httpserver.cpp @@ -1095,7 +1095,8 @@ CreateHttpServerClass(const std::string_view ServerClass, const HttpServerConfig .IsAsyncResponseEnabled = Config.HttpSys.IsAsyncResponseEnabled, .IsRequestLoggingEnabled = Config.HttpSys.IsRequestLoggingEnabled, .IsDedicatedServer = Config.IsDedicatedServer, - .ForceLoopback = Config.ForceLoopback})); + .ForceLoopback = Config.ForceLoopback, + .UseExplicitIoThreadPool = Config.HttpSys.UseExplicitIoThreadPool})); } #endif else if (ServerClass == "null"sv) diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h index 350532126..7887beacd 100644 --- a/src/zenhttp/include/zenhttp/httpserver.h +++ b/src/zenhttp/include/zenhttp/httpserver.h @@ -249,6 +249,7 @@ struct HttpServerConfig unsigned int AsyncWorkThreadCount = 0; bool IsAsyncResponseEnabled = true; bool IsRequestLoggingEnabled = false; + bool UseExplicitIoThreadPool = false; } HttpSys; }; diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index c640ba90b..6809c280a 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -128,6 +128,7 @@ private: bool m_IsAsyncResponseEnabled = true; std::unique_ptr<WinIoThreadPool> m_IoThreadPool; + bool m_IoThreadPoolIsWinTp = true; RwLock m_AsyncWorkPoolInitLock; WorkerThreadPool* m_AsyncWorkPool = nullptr; @@ -374,7 +375,8 @@ public: void IssueInitialRequest(std::error_code& ErrorCode); bool IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler); - PTP_IO Iocp(); + void StartIo(); + void CancelIo(); HANDLE RequestQueueHandle(); inline OVERLAPPED* Overlapped() { return &m_HttpOverlapped; } inline HttpSysServer& Server() { return m_HttpServer; } @@ -614,9 +616,8 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) ZEN_TRACE_CPU("httpsys::Response::IssueRequest"); HttpSysTransaction& Tx = Transaction(); HTTP_REQUEST* const HttpReq = Tx.HttpRequest(); - PTP_IO const Iocp = Tx.Iocp(); - StartThreadpoolIo(Iocp); + Tx.StartIo(); // Split payload into batches to play well with the underlying API @@ -822,7 +823,7 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) // An error occurred, no completion will be posted to IOCP - CancelThreadpoolIo(Iocp); + Tx.CancelIo(); // Emit diagnostics @@ -1001,7 +1002,8 @@ HttpSysServer::HttpSysServer(const HttpSysConfig& InConfig) MaxThreadCount *= 2; } - m_IoThreadPool = std::make_unique<WinIoThreadPool>(MinThreadCount, MaxThreadCount); + m_IoThreadPoolIsWinTp = !m_InitialConfig.UseExplicitIoThreadPool; + m_IoThreadPool = WinIoThreadPool::Create(!m_IoThreadPoolIsWinTp, MinThreadCount, MaxThreadCount); if (m_InitialConfig.AsyncWorkThreadCount == 0) { @@ -1018,10 +1020,11 @@ HttpSysServer::HttpSysServer(const HttpSysConfig& InConfig) m_IsHttpInitialized = true; m_IsOk = true; - ZEN_INFO("http.sys server started in {} mode, using {}-{} I/O threads and {} async worker threads", + ZEN_INFO("http.sys server started in {} mode, using {}-{} I/O threads ({}) and {} async worker threads", m_InitialConfig.IsDedicatedServer ? "DEDICATED" : "NORMAL", MinThreadCount, MaxThreadCount, + m_InitialConfig.UseExplicitIoThreadPool ? "explicit IOCP" : "Windows Thread Pool", m_InitialConfig.AsyncWorkThreadCount); } @@ -1497,10 +1500,16 @@ HttpSysTransaction::~HttpSysTransaction() { } -PTP_IO -HttpSysTransaction::Iocp() +void +HttpSysTransaction::StartIo() +{ + m_HttpServer.m_IoThreadPool->StartIo(); +} + +void +HttpSysTransaction::CancelIo() { - return m_HttpServer.m_IoThreadPool->Iocp(); + m_HttpServer.m_IoThreadPool->CancelIo(); } HANDLE @@ -1521,7 +1530,6 @@ static std::atomic<int> HttpSysThreadIndex = 0; static void NameCurrentHttpSysThread() { - t_IsHttpSysThreadNamed = true; const int ThreadIndex = ++HttpSysThreadIndex; zen::ExtendableStringBuilder<16> ThreadName; ThreadName << "httpio_" << ThreadIndex; @@ -1538,19 +1546,25 @@ HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance, { ZEN_UNUSED(Io, Instance); - // Assign names to threads for context - - if (!t_IsHttpSysThreadNamed) - { - NameCurrentHttpSysThread(); - } - // Note that for a given transaction we may be in this completion function on more // than one thread at any given moment. This means we need to be careful about what // happens in here HttpSysTransaction* Transaction = CONTAINING_RECORD(pOverlapped, HttpSysTransaction, m_HttpOverlapped); + // Assign names to threads for context (only needed when using Windows' native + // thread pool) + + if (Transaction->Server().m_IoThreadPoolIsWinTp) + { + if (!t_IsHttpSysThreadNamed) + { + t_IsHttpSysThreadNamed = true; + + NameCurrentHttpSysThread(); + } + } + if (Transaction->HandleCompletion(IoResult, NumberOfBytesTransferred) == HttpSysTransaction::Status::kDone) { delete Transaction; @@ -2020,10 +2034,9 @@ InitialRequestHandler::IssueRequest(std::error_code& ErrorCode) ZEN_TRACE_CPU("httpsys::Request::IssueRequest"); HttpSysTransaction& Tx = Transaction(); - PTP_IO Iocp = Tx.Iocp(); HTTP_REQUEST* HttpReq = Tx.HttpRequest(); - StartThreadpoolIo(Iocp); + Tx.StartIo(); ULONG HttpApiResult; @@ -2057,7 +2070,7 @@ InitialRequestHandler::IssueRequest(std::error_code& ErrorCode) if (HttpApiResult != ERROR_IO_PENDING && HttpApiResult != NO_ERROR) { - CancelThreadpoolIo(Iocp); + Tx.CancelIo(); ErrorCode = MakeErrorCode(HttpApiResult); diff --git a/src/zenhttp/servers/httpsys.h b/src/zenhttp/servers/httpsys.h index b2fe7475b..4ff6df1fa 100644 --- a/src/zenhttp/servers/httpsys.h +++ b/src/zenhttp/servers/httpsys.h @@ -22,6 +22,7 @@ struct HttpSysConfig bool IsRequestLoggingEnabled = false; bool IsDedicatedServer = false; bool ForceLoopback = false; + bool UseExplicitIoThreadPool = false; }; Ref<HttpServer> CreateHttpSysServer(HttpSysConfig Config); diff --git a/src/zenhttp/servers/iothreadpool.cpp b/src/zenhttp/servers/iothreadpool.cpp index e941606e2..d180f17f8 100644 --- a/src/zenhttp/servers/iothreadpool.cpp +++ b/src/zenhttp/servers/iothreadpool.cpp @@ -3,12 +3,35 @@ #include "iothreadpool.h" #include <zencore/except.h> +#include <zencore/logging.h> +#include <zencore/thread.h> #if ZEN_PLATFORM_WINDOWS +# include <thread> + namespace zen { -WinIoThreadPool::WinIoThreadPool(int InThreadCount, int InMaxThreadCount) +////////////////////////////////////////////////////////////////////////// +// Factory + +std::unique_ptr<WinIoThreadPool> +WinIoThreadPool::Create(bool UseExplicitThreads, int MinThreads, int MaxThreads) +{ + if (UseExplicitThreads) + { + return std::make_unique<ExplicitIoThreadPool>(MinThreads, MaxThreads); + } + else + { + return std::make_unique<WinTpIoThreadPool>(MinThreads, MaxThreads); + } +} + +////////////////////////////////////////////////////////////////////////// +// WinTpIoThreadPool - Windows Thread Pool implementation + +WinTpIoThreadPool::WinTpIoThreadPool(int InThreadCount, int InMaxThreadCount) { ZEN_ASSERT(InThreadCount); @@ -31,7 +54,7 @@ WinIoThreadPool::WinIoThreadPool(int InThreadCount, int InMaxThreadCount) SetThreadpoolCallbackCleanupGroup(&m_CallbackEnvironment, m_CleanupGroup, NULL); } -WinIoThreadPool::~WinIoThreadPool() +WinTpIoThreadPool::~WinTpIoThreadPool() { // this will wait for all callbacks to complete and tear down the `CreateThreadpoolIo` // object and release all related objects @@ -42,7 +65,7 @@ WinIoThreadPool::~WinIoThreadPool() } void -WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) +WinTpIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) { ZEN_ASSERT(!m_ThreadPoolIo); @@ -54,6 +77,220 @@ WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, voi } } +void +WinTpIoThreadPool::StartIo() +{ + StartThreadpoolIo(m_ThreadPoolIo); +} + +void +WinTpIoThreadPool::CancelIo() +{ + CancelThreadpoolIo(m_ThreadPoolIo); +} + +////////////////////////////////////////////////////////////////////////// +// ExplicitIoThreadPool - Raw IOCP + std::thread with load-based scaling + +static LoggerRef +ExplicitIoPoolLog() +{ + static LoggerRef s_Log = logging::Get("iopool"); + return s_Log; +} + +ExplicitIoThreadPool::ExplicitIoThreadPool(int InMinThreadCount, int InMaxThreadCount) +: m_MinThreads(InMinThreadCount) +, m_MaxThreads(InMaxThreadCount) +{ + ZEN_ASSERT(InMinThreadCount > 0); + + if (m_MaxThreads < m_MinThreads) + { + m_MaxThreads = m_MinThreads; + } + + m_Iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + + if (!m_Iocp) + { + ZEN_LOG_ERROR(ExplicitIoPoolLog(), "failed to create I/O completion port: {}", GetLastError()); + } +} + +ExplicitIoThreadPool::~ExplicitIoThreadPool() +{ + m_ShuttingDown.store(true, std::memory_order::release); + + // Post poison-pill completions to wake all threads + const int ThreadCount = m_TotalThreads.load(std::memory_order::acquire); + for (int i = 0; i < ThreadCount; ++i) + { + PostQueuedCompletionStatus(m_Iocp, 0, 0, nullptr); + } + + // Join all threads + { + RwLock::ExclusiveLockScope _(m_ThreadListLock); + for (auto& Thread : m_Threads) + { + if (Thread.joinable()) + { + Thread.join(); + } + } + m_Threads.clear(); + } + + if (m_Iocp) + { + CloseHandle(m_Iocp); + m_Iocp = nullptr; + } +} + +void +ExplicitIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) +{ + ZEN_ASSERT(m_Iocp); + ZEN_ASSERT(!m_Callback); + + m_Callback = Callback; + m_Context = Context; + + // Associate the I/O handle with our completion port + HANDLE Result = CreateIoCompletionPort(IoHandle, m_Iocp, /* CompletionKey */ 0, 0); + + if (!Result) + { + ErrorCode = MakeErrorCodeFromLastError(); + return; + } + + // Now spawn the initial worker threads + for (int i = 0; i < m_MinThreads; ++i) + { + SpawnWorkerThread(); + } +} + +void +ExplicitIoThreadPool::StartIo() +{ + // No-op for raw IOCP - completions are posted automatically +} + +void +ExplicitIoThreadPool::CancelIo() +{ + // No-op for raw IOCP - completions are posted automatically +} + +void +ExplicitIoThreadPool::SpawnWorkerThread() +{ + RwLock::ExclusiveLockScope _(m_ThreadListLock); + + ++m_TotalThreads; + m_Threads.emplace_back([this] { WorkerThreadMain(); }); +} + +void +ExplicitIoThreadPool::WorkerThreadMain() +{ + static std::atomic<int> s_ThreadIndex{0}; + const int ThreadIndex = ++s_ThreadIndex; + ExtendableStringBuilder<16> ThreadName; + ThreadName << "xpio_" << ThreadIndex; + SetCurrentThreadName(ThreadName); + + static constexpr DWORD kIdleTimeoutMs = 15000; + + while (!m_ShuttingDown.load(std::memory_order::acquire)) + { + DWORD BytesTransferred = 0; + ULONG_PTR CompletionKey = 0; + OVERLAPPED* pOverlapped = nullptr; + + BOOL Success = GetQueuedCompletionStatus(m_Iocp, &BytesTransferred, &CompletionKey, &pOverlapped, kIdleTimeoutMs); + + if (m_ShuttingDown.load(std::memory_order::acquire)) + { + break; + } + + if (!Success && !pOverlapped) + { + DWORD Error = GetLastError(); + + if (Error == WAIT_TIMEOUT) + { + // Timeout - consider scaling down + const int CurrentTotal = m_TotalThreads.load(std::memory_order::acquire); + if (CurrentTotal > m_MinThreads) + { + // Try to claim this thread for exit by decrementing the count. + // Use CAS to avoid thundering herd of exits. + int Expected = CurrentTotal; + if (m_TotalThreads.compare_exchange_strong(Expected, CurrentTotal - 1, std::memory_order::acq_rel)) + { + ZEN_LOG_DEBUG(ExplicitIoPoolLog(), + "scaling down I/O thread (idle timeout), {} threads remaining", + CurrentTotal - 1); + return; // Thread exits + } + } + continue; + } + + // Some other error with no overlapped - unexpected + ZEN_LOG_WARN(ExplicitIoPoolLog(), "GetQueuedCompletionStatus failed with error {}", Error); + continue; + } + + if (!pOverlapped) + { + // Poison pill (null overlapped) - shutdown signal + break; + } + + // Got a real completion - determine the I/O result + ULONG IoResult = NO_ERROR; + if (!Success) + { + IoResult = GetLastError(); + } + + // Track active threads for scale-up decisions + const int ActiveBefore = m_ActiveCount.fetch_add(1, std::memory_order::acq_rel); + const int TotalNow = m_TotalThreads.load(std::memory_order::acquire); + + // Scale up: if all threads are now busy and we haven't hit the max, spawn another + if ((ActiveBefore + 1) >= TotalNow && TotalNow < m_MaxThreads) + { + // Use CAS to ensure only one thread triggers the scale-up + int Expected = TotalNow; + if (m_TotalThreads.compare_exchange_strong(Expected, TotalNow + 1, std::memory_order::acq_rel)) + { + ZEN_LOG_DEBUG(ExplicitIoPoolLog(), "scaling up I/O thread pool, {} -> {} threads", TotalNow, TotalNow + 1); + + // Spawn outside the hot path - but we need the thread list lock + // We already incremented m_TotalThreads, so do the actual spawn + { + RwLock::ExclusiveLockScope _(m_ThreadListLock); + m_Threads.emplace_back([this] { WorkerThreadMain(); }); + } + } + } + + // Invoke the callback with the same signature as PTP_WIN32_IO_CALLBACK + // Parameters: Instance, Context, Overlapped, IoResult, NumberOfBytesTransferred, Io + m_Callback(nullptr, m_Context, pOverlapped, IoResult, BytesTransferred, nullptr); + + m_ActiveCount.fetch_sub(1, std::memory_order::release); + } +} + } // namespace zen #endif diff --git a/src/zenhttp/servers/iothreadpool.h b/src/zenhttp/servers/iothreadpool.h index e75e95e58..e2c15ba76 100644 --- a/src/zenhttp/servers/iothreadpool.h +++ b/src/zenhttp/servers/iothreadpool.h @@ -4,21 +4,66 @@ #include <zencore/zencore.h> +#include <zencore/thread.h> + #if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> +# include <memory> # include <system_error> namespace zen { +/** + * @brief Abstract base class for I/O thread pools used by the http.sys server. + * + * Two implementations are available: + * - WinTpIoThreadPool: Uses the Windows Thread Pool API (default, current behavior) + * - ExplicitIoThreadPool: Uses raw IOCP + std::thread with load-based scaling + */ class WinIoThreadPool { public: - WinIoThreadPool(int InThreadCount, int InMaxThreadCount); - ~WinIoThreadPool(); + virtual ~WinIoThreadPool() = default; + + /** + * @brief Bind an I/O handle to this pool with the given callback + */ + virtual void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) = 0; + + /** + * @brief Called before issuing an async I/O operation. + * The Windows TP implementation calls StartThreadpoolIo; the explicit implementation is a no-op. + */ + virtual void StartIo() = 0; - void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode); - inline PTP_IO Iocp() const { return m_ThreadPoolIo; } + /** + * @brief Called when an async I/O operation fails synchronously (to undo StartIo). + * The Windows TP implementation calls CancelThreadpoolIo; the explicit implementation is a no-op. + */ + virtual void CancelIo() = 0; + + /** + * @brief Factory method to create an I/O thread pool + * @param UseExplicitThreads If true, creates an ExplicitIoThreadPool; otherwise creates a WinTpIoThreadPool + * @param MinThreads Minimum number of threads + * @param MaxThreads Maximum number of threads + */ + static std::unique_ptr<WinIoThreadPool> Create(bool UseExplicitThreads, int MinThreads, int MaxThreads); +}; + +/** + * @brief Windows Thread Pool implementation (wraps CreateThreadpool/CreateThreadpoolIo) + */ +class WinTpIoThreadPool : public WinIoThreadPool +{ +public: + WinTpIoThreadPool(int InThreadCount, int InMaxThreadCount); + ~WinTpIoThreadPool(); + + virtual void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) override; + virtual void StartIo() override; + virtual void CancelIo() override; private: PTP_POOL m_ThreadPool = nullptr; @@ -27,5 +72,42 @@ private: TP_CALLBACK_ENVIRON m_CallbackEnvironment; }; +/** + * @brief Explicit IOCP + std::thread implementation with load-based thread scaling + * + * Creates a raw I/O completion port and manages worker threads directly. Threads + * scale up when all are busy servicing completions, and scale down when idle for + * an extended period. + */ +class ExplicitIoThreadPool : public WinIoThreadPool +{ +public: + ExplicitIoThreadPool(int InMinThreadCount, int InMaxThreadCount); + ~ExplicitIoThreadPool(); + + virtual void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) override; + virtual void StartIo() override; + virtual void CancelIo() override; + +private: + void WorkerThreadMain(); + void SpawnWorkerThread(); + + HANDLE m_Iocp = nullptr; + + PTP_WIN32_IO_CALLBACK m_Callback = nullptr; + void* m_Context = nullptr; + + int m_MinThreads; + int m_MaxThreads; + + std::atomic<int> m_TotalThreads{0}; + std::atomic<int> m_ActiveCount{0}; + std::atomic<bool> m_ShuttingDown{false}; + + RwLock m_ThreadListLock; + std::vector<std::thread> m_Threads; +}; + } // namespace zen #endif |