aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenhttp')
-rw-r--r--src/zenhttp/httpserver.cpp3
-rw-r--r--src/zenhttp/include/zenhttp/httpserver.h1
-rw-r--r--src/zenhttp/servers/httpsys.cpp53
-rw-r--r--src/zenhttp/servers/httpsys.h1
-rw-r--r--src/zenhttp/servers/iothreadpool.cpp243
-rw-r--r--src/zenhttp/servers/iothreadpool.h90
6 files changed, 363 insertions, 28 deletions
diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp
index f2fe4738f..761665c30 100644
--- a/src/zenhttp/httpserver.cpp
+++ b/src/zenhttp/httpserver.cpp
@@ -1095,7 +1095,8 @@ CreateHttpServerClass(const std::string_view ServerClass, const HttpServerConfig
.IsAsyncResponseEnabled = Config.HttpSys.IsAsyncResponseEnabled,
.IsRequestLoggingEnabled = Config.HttpSys.IsRequestLoggingEnabled,
.IsDedicatedServer = Config.IsDedicatedServer,
- .ForceLoopback = Config.ForceLoopback}));
+ .ForceLoopback = Config.ForceLoopback,
+ .UseExplicitIoThreadPool = Config.HttpSys.UseExplicitIoThreadPool}));
}
#endif
else if (ServerClass == "null"sv)
diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h
index 350532126..7887beacd 100644
--- a/src/zenhttp/include/zenhttp/httpserver.h
+++ b/src/zenhttp/include/zenhttp/httpserver.h
@@ -249,6 +249,7 @@ struct HttpServerConfig
unsigned int AsyncWorkThreadCount = 0;
bool IsAsyncResponseEnabled = true;
bool IsRequestLoggingEnabled = false;
+ bool UseExplicitIoThreadPool = false;
} HttpSys;
};
diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp
index c640ba90b..6809c280a 100644
--- a/src/zenhttp/servers/httpsys.cpp
+++ b/src/zenhttp/servers/httpsys.cpp
@@ -128,6 +128,7 @@ private:
bool m_IsAsyncResponseEnabled = true;
std::unique_ptr<WinIoThreadPool> m_IoThreadPool;
+ bool m_IoThreadPoolIsWinTp = true;
RwLock m_AsyncWorkPoolInitLock;
WorkerThreadPool* m_AsyncWorkPool = nullptr;
@@ -374,7 +375,8 @@ public:
void IssueInitialRequest(std::error_code& ErrorCode);
bool IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler);
- PTP_IO Iocp();
+ void StartIo();
+ void CancelIo();
HANDLE RequestQueueHandle();
inline OVERLAPPED* Overlapped() { return &m_HttpOverlapped; }
inline HttpSysServer& Server() { return m_HttpServer; }
@@ -614,9 +616,8 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode)
ZEN_TRACE_CPU("httpsys::Response::IssueRequest");
HttpSysTransaction& Tx = Transaction();
HTTP_REQUEST* const HttpReq = Tx.HttpRequest();
- PTP_IO const Iocp = Tx.Iocp();
- StartThreadpoolIo(Iocp);
+ Tx.StartIo();
// Split payload into batches to play well with the underlying API
@@ -822,7 +823,7 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode)
// An error occurred, no completion will be posted to IOCP
- CancelThreadpoolIo(Iocp);
+ Tx.CancelIo();
// Emit diagnostics
@@ -1001,7 +1002,8 @@ HttpSysServer::HttpSysServer(const HttpSysConfig& InConfig)
MaxThreadCount *= 2;
}
- m_IoThreadPool = std::make_unique<WinIoThreadPool>(MinThreadCount, MaxThreadCount);
+ m_IoThreadPoolIsWinTp = !m_InitialConfig.UseExplicitIoThreadPool;
+ m_IoThreadPool = WinIoThreadPool::Create(!m_IoThreadPoolIsWinTp, MinThreadCount, MaxThreadCount);
if (m_InitialConfig.AsyncWorkThreadCount == 0)
{
@@ -1018,10 +1020,11 @@ HttpSysServer::HttpSysServer(const HttpSysConfig& InConfig)
m_IsHttpInitialized = true;
m_IsOk = true;
- ZEN_INFO("http.sys server started in {} mode, using {}-{} I/O threads and {} async worker threads",
+ ZEN_INFO("http.sys server started in {} mode, using {}-{} I/O threads ({}) and {} async worker threads",
m_InitialConfig.IsDedicatedServer ? "DEDICATED" : "NORMAL",
MinThreadCount,
MaxThreadCount,
+ m_InitialConfig.UseExplicitIoThreadPool ? "explicit IOCP" : "Windows Thread Pool",
m_InitialConfig.AsyncWorkThreadCount);
}
@@ -1497,10 +1500,16 @@ HttpSysTransaction::~HttpSysTransaction()
{
}
-PTP_IO
-HttpSysTransaction::Iocp()
+void
+HttpSysTransaction::StartIo()
+{
+ m_HttpServer.m_IoThreadPool->StartIo();
+}
+
+void
+HttpSysTransaction::CancelIo()
{
- return m_HttpServer.m_IoThreadPool->Iocp();
+ m_HttpServer.m_IoThreadPool->CancelIo();
}
HANDLE
@@ -1521,7 +1530,6 @@ static std::atomic<int> HttpSysThreadIndex = 0;
static void
NameCurrentHttpSysThread()
{
- t_IsHttpSysThreadNamed = true;
const int ThreadIndex = ++HttpSysThreadIndex;
zen::ExtendableStringBuilder<16> ThreadName;
ThreadName << "httpio_" << ThreadIndex;
@@ -1538,19 +1546,25 @@ HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance,
{
ZEN_UNUSED(Io, Instance);
- // Assign names to threads for context
-
- if (!t_IsHttpSysThreadNamed)
- {
- NameCurrentHttpSysThread();
- }
-
// Note that for a given transaction we may be in this completion function on more
// than one thread at any given moment. This means we need to be careful about what
// happens in here
HttpSysTransaction* Transaction = CONTAINING_RECORD(pOverlapped, HttpSysTransaction, m_HttpOverlapped);
+ // Assign names to threads for context (only needed when using Windows' native
+ // thread pool)
+
+ if (Transaction->Server().m_IoThreadPoolIsWinTp)
+ {
+ if (!t_IsHttpSysThreadNamed)
+ {
+ t_IsHttpSysThreadNamed = true;
+
+ NameCurrentHttpSysThread();
+ }
+ }
+
if (Transaction->HandleCompletion(IoResult, NumberOfBytesTransferred) == HttpSysTransaction::Status::kDone)
{
delete Transaction;
@@ -2020,10 +2034,9 @@ InitialRequestHandler::IssueRequest(std::error_code& ErrorCode)
ZEN_TRACE_CPU("httpsys::Request::IssueRequest");
HttpSysTransaction& Tx = Transaction();
- PTP_IO Iocp = Tx.Iocp();
HTTP_REQUEST* HttpReq = Tx.HttpRequest();
- StartThreadpoolIo(Iocp);
+ Tx.StartIo();
ULONG HttpApiResult;
@@ -2057,7 +2070,7 @@ InitialRequestHandler::IssueRequest(std::error_code& ErrorCode)
if (HttpApiResult != ERROR_IO_PENDING && HttpApiResult != NO_ERROR)
{
- CancelThreadpoolIo(Iocp);
+ Tx.CancelIo();
ErrorCode = MakeErrorCode(HttpApiResult);
diff --git a/src/zenhttp/servers/httpsys.h b/src/zenhttp/servers/httpsys.h
index b2fe7475b..4ff6df1fa 100644
--- a/src/zenhttp/servers/httpsys.h
+++ b/src/zenhttp/servers/httpsys.h
@@ -22,6 +22,7 @@ struct HttpSysConfig
bool IsRequestLoggingEnabled = false;
bool IsDedicatedServer = false;
bool ForceLoopback = false;
+ bool UseExplicitIoThreadPool = false;
};
Ref<HttpServer> CreateHttpSysServer(HttpSysConfig Config);
diff --git a/src/zenhttp/servers/iothreadpool.cpp b/src/zenhttp/servers/iothreadpool.cpp
index e941606e2..d180f17f8 100644
--- a/src/zenhttp/servers/iothreadpool.cpp
+++ b/src/zenhttp/servers/iothreadpool.cpp
@@ -3,12 +3,35 @@
#include "iothreadpool.h"
#include <zencore/except.h>
+#include <zencore/logging.h>
+#include <zencore/thread.h>
#if ZEN_PLATFORM_WINDOWS
+# include <thread>
+
namespace zen {
-WinIoThreadPool::WinIoThreadPool(int InThreadCount, int InMaxThreadCount)
+//////////////////////////////////////////////////////////////////////////
+// Factory
+
+std::unique_ptr<WinIoThreadPool>
+WinIoThreadPool::Create(bool UseExplicitThreads, int MinThreads, int MaxThreads)
+{
+ if (UseExplicitThreads)
+ {
+ return std::make_unique<ExplicitIoThreadPool>(MinThreads, MaxThreads);
+ }
+ else
+ {
+ return std::make_unique<WinTpIoThreadPool>(MinThreads, MaxThreads);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+// WinTpIoThreadPool - Windows Thread Pool implementation
+
+WinTpIoThreadPool::WinTpIoThreadPool(int InThreadCount, int InMaxThreadCount)
{
ZEN_ASSERT(InThreadCount);
@@ -31,7 +54,7 @@ WinIoThreadPool::WinIoThreadPool(int InThreadCount, int InMaxThreadCount)
SetThreadpoolCallbackCleanupGroup(&m_CallbackEnvironment, m_CleanupGroup, NULL);
}
-WinIoThreadPool::~WinIoThreadPool()
+WinTpIoThreadPool::~WinTpIoThreadPool()
{
// this will wait for all callbacks to complete and tear down the `CreateThreadpoolIo`
// object and release all related objects
@@ -42,7 +65,7 @@ WinIoThreadPool::~WinIoThreadPool()
}
void
-WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode)
+WinTpIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode)
{
ZEN_ASSERT(!m_ThreadPoolIo);
@@ -54,6 +77,220 @@ WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, voi
}
}
+void
+WinTpIoThreadPool::StartIo()
+{
+ StartThreadpoolIo(m_ThreadPoolIo);
+}
+
+void
+WinTpIoThreadPool::CancelIo()
+{
+ CancelThreadpoolIo(m_ThreadPoolIo);
+}
+
+//////////////////////////////////////////////////////////////////////////
+// ExplicitIoThreadPool - Raw IOCP + std::thread with load-based scaling
+
+static LoggerRef
+ExplicitIoPoolLog()
+{
+ static LoggerRef s_Log = logging::Get("iopool");
+ return s_Log;
+}
+
+ExplicitIoThreadPool::ExplicitIoThreadPool(int InMinThreadCount, int InMaxThreadCount)
+: m_MinThreads(InMinThreadCount)
+, m_MaxThreads(InMaxThreadCount)
+{
+ ZEN_ASSERT(InMinThreadCount > 0);
+
+ if (m_MaxThreads < m_MinThreads)
+ {
+ m_MaxThreads = m_MinThreads;
+ }
+
+ m_Iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+
+ if (!m_Iocp)
+ {
+ ZEN_LOG_ERROR(ExplicitIoPoolLog(), "failed to create I/O completion port: {}", GetLastError());
+ }
+}
+
+ExplicitIoThreadPool::~ExplicitIoThreadPool()
+{
+ m_ShuttingDown.store(true, std::memory_order::release);
+
+ // Post poison-pill completions to wake all threads
+ const int ThreadCount = m_TotalThreads.load(std::memory_order::acquire);
+ for (int i = 0; i < ThreadCount; ++i)
+ {
+ PostQueuedCompletionStatus(m_Iocp, 0, 0, nullptr);
+ }
+
+ // Join all threads
+ {
+ RwLock::ExclusiveLockScope _(m_ThreadListLock);
+ for (auto& Thread : m_Threads)
+ {
+ if (Thread.joinable())
+ {
+ Thread.join();
+ }
+ }
+ m_Threads.clear();
+ }
+
+ if (m_Iocp)
+ {
+ CloseHandle(m_Iocp);
+ m_Iocp = nullptr;
+ }
+}
+
+void
+ExplicitIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode)
+{
+ ZEN_ASSERT(m_Iocp);
+ ZEN_ASSERT(!m_Callback);
+
+ m_Callback = Callback;
+ m_Context = Context;
+
+ // Associate the I/O handle with our completion port
+ HANDLE Result = CreateIoCompletionPort(IoHandle, m_Iocp, /* CompletionKey */ 0, 0);
+
+ if (!Result)
+ {
+ ErrorCode = MakeErrorCodeFromLastError();
+ return;
+ }
+
+ // Now spawn the initial worker threads
+ for (int i = 0; i < m_MinThreads; ++i)
+ {
+ SpawnWorkerThread();
+ }
+}
+
+void
+ExplicitIoThreadPool::StartIo()
+{
+ // No-op for raw IOCP - completions are posted automatically
+}
+
+void
+ExplicitIoThreadPool::CancelIo()
+{
+ // No-op for raw IOCP - completions are posted automatically
+}
+
+void
+ExplicitIoThreadPool::SpawnWorkerThread()
+{
+ RwLock::ExclusiveLockScope _(m_ThreadListLock);
+
+ ++m_TotalThreads;
+ m_Threads.emplace_back([this] { WorkerThreadMain(); });
+}
+
+void
+ExplicitIoThreadPool::WorkerThreadMain()
+{
+ static std::atomic<int> s_ThreadIndex{0};
+ const int ThreadIndex = ++s_ThreadIndex;
+ ExtendableStringBuilder<16> ThreadName;
+ ThreadName << "xpio_" << ThreadIndex;
+ SetCurrentThreadName(ThreadName);
+
+ static constexpr DWORD kIdleTimeoutMs = 15000;
+
+ while (!m_ShuttingDown.load(std::memory_order::acquire))
+ {
+ DWORD BytesTransferred = 0;
+ ULONG_PTR CompletionKey = 0;
+ OVERLAPPED* pOverlapped = nullptr;
+
+ BOOL Success = GetQueuedCompletionStatus(m_Iocp, &BytesTransferred, &CompletionKey, &pOverlapped, kIdleTimeoutMs);
+
+ if (m_ShuttingDown.load(std::memory_order::acquire))
+ {
+ break;
+ }
+
+ if (!Success && !pOverlapped)
+ {
+ DWORD Error = GetLastError();
+
+ if (Error == WAIT_TIMEOUT)
+ {
+ // Timeout - consider scaling down
+ const int CurrentTotal = m_TotalThreads.load(std::memory_order::acquire);
+ if (CurrentTotal > m_MinThreads)
+ {
+ // Try to claim this thread for exit by decrementing the count.
+ // Use CAS to avoid thundering herd of exits.
+ int Expected = CurrentTotal;
+ if (m_TotalThreads.compare_exchange_strong(Expected, CurrentTotal - 1, std::memory_order::acq_rel))
+ {
+ ZEN_LOG_DEBUG(ExplicitIoPoolLog(),
+ "scaling down I/O thread (idle timeout), {} threads remaining",
+ CurrentTotal - 1);
+ return; // Thread exits
+ }
+ }
+ continue;
+ }
+
+ // Some other error with no overlapped - unexpected
+ ZEN_LOG_WARN(ExplicitIoPoolLog(), "GetQueuedCompletionStatus failed with error {}", Error);
+ continue;
+ }
+
+ if (!pOverlapped)
+ {
+ // Poison pill (null overlapped) - shutdown signal
+ break;
+ }
+
+ // Got a real completion - determine the I/O result
+ ULONG IoResult = NO_ERROR;
+ if (!Success)
+ {
+ IoResult = GetLastError();
+ }
+
+ // Track active threads for scale-up decisions
+ const int ActiveBefore = m_ActiveCount.fetch_add(1, std::memory_order::acq_rel);
+ const int TotalNow = m_TotalThreads.load(std::memory_order::acquire);
+
+ // Scale up: if all threads are now busy and we haven't hit the max, spawn another
+ if ((ActiveBefore + 1) >= TotalNow && TotalNow < m_MaxThreads)
+ {
+ // Use CAS to ensure only one thread triggers the scale-up
+ int Expected = TotalNow;
+ if (m_TotalThreads.compare_exchange_strong(Expected, TotalNow + 1, std::memory_order::acq_rel))
+ {
+ ZEN_LOG_DEBUG(ExplicitIoPoolLog(), "scaling up I/O thread pool, {} -> {} threads", TotalNow, TotalNow + 1);
+
+ // Spawn outside the hot path - but we need the thread list lock
+ // We already incremented m_TotalThreads, so do the actual spawn
+ {
+ RwLock::ExclusiveLockScope _(m_ThreadListLock);
+ m_Threads.emplace_back([this] { WorkerThreadMain(); });
+ }
+ }
+ }
+
+ // Invoke the callback with the same signature as PTP_WIN32_IO_CALLBACK
+ // Parameters: Instance, Context, Overlapped, IoResult, NumberOfBytesTransferred, Io
+ m_Callback(nullptr, m_Context, pOverlapped, IoResult, BytesTransferred, nullptr);
+
+ m_ActiveCount.fetch_sub(1, std::memory_order::release);
+ }
+}
+
} // namespace zen
#endif
diff --git a/src/zenhttp/servers/iothreadpool.h b/src/zenhttp/servers/iothreadpool.h
index e75e95e58..e2c15ba76 100644
--- a/src/zenhttp/servers/iothreadpool.h
+++ b/src/zenhttp/servers/iothreadpool.h
@@ -4,21 +4,66 @@
#include <zencore/zencore.h>
+#include <zencore/thread.h>
+
#if ZEN_PLATFORM_WINDOWS
# include <zencore/windows.h>
+# include <memory>
# include <system_error>
namespace zen {
+/**
+ * @brief Abstract base class for I/O thread pools used by the http.sys server.
+ *
+ * Two implementations are available:
+ * - WinTpIoThreadPool: Uses the Windows Thread Pool API (default, current behavior)
+ * - ExplicitIoThreadPool: Uses raw IOCP + std::thread with load-based scaling
+ */
class WinIoThreadPool
{
public:
- WinIoThreadPool(int InThreadCount, int InMaxThreadCount);
- ~WinIoThreadPool();
+ virtual ~WinIoThreadPool() = default;
+
+ /**
+ * @brief Bind an I/O handle to this pool with the given callback
+ */
+ virtual void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) = 0;
+
+ /**
+ * @brief Called before issuing an async I/O operation.
+ * The Windows TP implementation calls StartThreadpoolIo; the explicit implementation is a no-op.
+ */
+ virtual void StartIo() = 0;
- void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode);
- inline PTP_IO Iocp() const { return m_ThreadPoolIo; }
+ /**
+ * @brief Called when an async I/O operation fails synchronously (to undo StartIo).
+ * The Windows TP implementation calls CancelThreadpoolIo; the explicit implementation is a no-op.
+ */
+ virtual void CancelIo() = 0;
+
+ /**
+ * @brief Factory method to create an I/O thread pool
+ * @param UseExplicitThreads If true, creates an ExplicitIoThreadPool; otherwise creates a WinTpIoThreadPool
+ * @param MinThreads Minimum number of threads
+ * @param MaxThreads Maximum number of threads
+ */
+ static std::unique_ptr<WinIoThreadPool> Create(bool UseExplicitThreads, int MinThreads, int MaxThreads);
+};
+
+/**
+ * @brief Windows Thread Pool implementation (wraps CreateThreadpool/CreateThreadpoolIo)
+ */
+class WinTpIoThreadPool : public WinIoThreadPool
+{
+public:
+ WinTpIoThreadPool(int InThreadCount, int InMaxThreadCount);
+ ~WinTpIoThreadPool();
+
+ virtual void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) override;
+ virtual void StartIo() override;
+ virtual void CancelIo() override;
private:
PTP_POOL m_ThreadPool = nullptr;
@@ -27,5 +72,42 @@ private:
TP_CALLBACK_ENVIRON m_CallbackEnvironment;
};
+/**
+ * @brief Explicit IOCP + std::thread implementation with load-based thread scaling
+ *
+ * Creates a raw I/O completion port and manages worker threads directly. Threads
+ * scale up when all are busy servicing completions, and scale down when idle for
+ * an extended period.
+ */
+class ExplicitIoThreadPool : public WinIoThreadPool
+{
+public:
+ ExplicitIoThreadPool(int InMinThreadCount, int InMaxThreadCount);
+ ~ExplicitIoThreadPool();
+
+ virtual void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) override;
+ virtual void StartIo() override;
+ virtual void CancelIo() override;
+
+private:
+ void WorkerThreadMain();
+ void SpawnWorkerThread();
+
+ HANDLE m_Iocp = nullptr;
+
+ PTP_WIN32_IO_CALLBACK m_Callback = nullptr;
+ void* m_Context = nullptr;
+
+ int m_MinThreads;
+ int m_MaxThreads;
+
+ std::atomic<int> m_TotalThreads{0};
+ std::atomic<int> m_ActiveCount{0};
+ std::atomic<bool> m_ShuttingDown{false};
+
+ RwLock m_ThreadListLock;
+ std::vector<std::thread> m_Threads;
+};
+
} // namespace zen
#endif