aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/servers
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenhttp/servers')
-rw-r--r--src/zenhttp/servers/httpsys.cpp58
-rw-r--r--src/zenhttp/servers/httpsys.h1
-rw-r--r--src/zenhttp/servers/iothreadpool.cpp275
-rw-r--r--src/zenhttp/servers/iothreadpool.h92
-rw-r--r--src/zenhttp/servers/wshttpsys.cpp13
-rw-r--r--src/zenhttp/servers/wshttpsys.h9
6 files changed, 411 insertions, 37 deletions
diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp
index f8fb1c9be..67a71c0cc 100644
--- a/src/zenhttp/servers/httpsys.cpp
+++ b/src/zenhttp/servers/httpsys.cpp
@@ -145,6 +145,7 @@ private:
bool m_IsAsyncResponseEnabled = true;
std::unique_ptr<WinIoThreadPool> m_IoThreadPool;
+ bool m_IoThreadPoolIsWinTp = true;
RwLock m_AsyncWorkPoolInitLock;
std::atomic<WorkerThreadPool*> m_AsyncWorkPool = nullptr;
@@ -403,7 +404,8 @@ public:
void IssueInitialRequest(std::error_code& ErrorCode);
bool IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler);
- PTP_IO Iocp();
+ void StartIo();
+ void CancelIo();
HANDLE RequestQueueHandle();
inline OVERLAPPED* Overlapped() { return &m_IoContext.Overlapped; }
inline HttpSysServer& Server() { return m_HttpServer; }
@@ -646,9 +648,8 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode)
ZEN_TRACE_CPU("httpsys::Response::IssueRequest");
HttpSysTransaction& Tx = Transaction();
HTTP_REQUEST* const HttpReq = Tx.HttpRequest();
- PTP_IO const Iocp = Tx.Iocp();
- StartThreadpoolIo(Iocp);
+ Tx.StartIo();
// Split payload into batches to play well with the underlying API
@@ -863,7 +864,7 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode)
// An error occurred, no completion will be posted to IOCP
- CancelThreadpoolIo(Iocp);
+ Tx.CancelIo();
// Emit diagnostics
@@ -1045,7 +1046,8 @@ HttpSysServer::HttpSysServer(const HttpSysConfig& InConfig)
MaxThreadCount *= 2;
}
- m_IoThreadPool = std::make_unique<WinIoThreadPool>(MinThreadCount, MaxThreadCount);
+ m_IoThreadPoolIsWinTp = !m_InitialConfig.UseExplicitIoThreadPool;
+ m_IoThreadPool = WinIoThreadPool::Create(!m_IoThreadPoolIsWinTp, MinThreadCount, MaxThreadCount);
if (m_InitialConfig.AsyncWorkThreadCount == 0)
{
@@ -1062,10 +1064,11 @@ HttpSysServer::HttpSysServer(const HttpSysConfig& InConfig)
m_IsHttpInitialized = true;
m_IsOk = true;
- ZEN_INFO("http.sys server started in {} mode, using {}-{} I/O threads and {} async worker threads",
+ ZEN_INFO("http.sys server started in {} mode, using {}-{} I/O threads ({}) and {} async worker threads",
m_InitialConfig.IsDedicatedServer ? "DEDICATED" : "NORMAL",
MinThreadCount,
MaxThreadCount,
+ m_InitialConfig.UseExplicitIoThreadPool ? "explicit IOCP" : "Windows Thread Pool",
m_InitialConfig.AsyncWorkThreadCount);
}
@@ -1664,7 +1667,8 @@ HttpSysServer::WorkPool()
if (!m_AsyncWorkPool.load(std::memory_order_relaxed))
{
- m_AsyncWorkPool.store(new WorkerThreadPool(m_InitialConfig.AsyncWorkThreadCount, "http_async"), std::memory_order_release);
+ m_AsyncWorkPool =
+ new WorkerThreadPool(m_InitialConfig.AsyncWorkThreadCount, "http_async", m_InitialConfig.UseExplicitIoThreadPool);
}
}
@@ -1839,10 +1843,16 @@ HttpSysTransaction::~HttpSysTransaction()
{
}
-PTP_IO
-HttpSysTransaction::Iocp()
+void
+HttpSysTransaction::StartIo()
+{
+ m_HttpServer.m_IoThreadPool->StartIo();
+}
+
+void
+HttpSysTransaction::CancelIo()
{
- return m_HttpServer.m_IoThreadPool->Iocp();
+ m_HttpServer.m_IoThreadPool->CancelIo();
}
HANDLE
@@ -1863,7 +1873,6 @@ static std::atomic<int> HttpSysThreadIndex = 0;
static void
NameCurrentHttpSysThread()
{
- t_IsHttpSysThreadNamed = true;
const int ThreadIndex = ++HttpSysThreadIndex;
zen::ExtendableStringBuilder<16> ThreadName;
ThreadName << "httpio_" << ThreadIndex;
@@ -1880,13 +1889,6 @@ HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance,
{
ZEN_UNUSED(Io, Instance);
- // Assign names to threads for context
-
- if (!t_IsHttpSysThreadNamed)
- {
- NameCurrentHttpSysThread();
- }
-
// Note that for a given transaction we may be in this completion function on more
// than one thread at any given moment. This means we need to be careful about what
// happens in here
@@ -1909,6 +1911,19 @@ HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance,
HttpSysTransaction* Transaction = CONTAINING_RECORD(IoContext, HttpSysTransaction, m_IoContext);
+ // Assign names to threads for context (only needed when using Windows' native
+ // thread pool)
+
+ if (Transaction->Server().m_IoThreadPoolIsWinTp)
+ {
+ if (!t_IsHttpSysThreadNamed)
+ {
+ t_IsHttpSysThreadNamed = true;
+
+ NameCurrentHttpSysThread();
+ }
+ }
+
if (Transaction->HandleCompletion(IoResult, NumberOfBytesTransferred) == HttpSysTransaction::Status::kDone)
{
delete Transaction;
@@ -2391,10 +2406,9 @@ InitialRequestHandler::IssueRequest(std::error_code& ErrorCode)
ZEN_TRACE_CPU("httpsys::Request::IssueRequest");
HttpSysTransaction& Tx = Transaction();
- PTP_IO Iocp = Tx.Iocp();
HTTP_REQUEST* HttpReq = Tx.HttpRequest();
- StartThreadpoolIo(Iocp);
+ Tx.StartIo();
ULONG HttpApiResult;
@@ -2428,7 +2442,7 @@ InitialRequestHandler::IssueRequest(std::error_code& ErrorCode)
if (HttpApiResult != ERROR_IO_PENDING && HttpApiResult != NO_ERROR)
{
- CancelThreadpoolIo(Iocp);
+ Tx.CancelIo();
ErrorCode = MakeErrorCode(HttpApiResult);
@@ -2574,7 +2588,7 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT
Ref<WsHttpSysConnection> WsConn(new WsHttpSysConnection(RequestQueueHandle,
RequestId,
*WsHandler,
- Transaction().Iocp(),
+ *Transaction().Server().m_IoThreadPool,
&Transaction().Server()));
Ref<WebSocketConnection> WsConnRef(WsConn.Get());
diff --git a/src/zenhttp/servers/httpsys.h b/src/zenhttp/servers/httpsys.h
index ca465ad00..0685b42b2 100644
--- a/src/zenhttp/servers/httpsys.h
+++ b/src/zenhttp/servers/httpsys.h
@@ -22,6 +22,7 @@ struct HttpSysConfig
bool IsRequestLoggingEnabled = false;
bool IsDedicatedServer = false;
bool ForceLoopback = false;
+ bool UseExplicitIoThreadPool = false;
int HttpsPort = 0; // 0 = HTTPS disabled
std::string CertThumbprint; // Hex SHA-1 (40 chars) for auto SSL binding
std::string CertStoreName = "MY"; // Windows certificate store name
diff --git a/src/zenhttp/servers/iothreadpool.cpp b/src/zenhttp/servers/iothreadpool.cpp
index e941606e2..1053f0b0c 100644
--- a/src/zenhttp/servers/iothreadpool.cpp
+++ b/src/zenhttp/servers/iothreadpool.cpp
@@ -3,12 +3,36 @@
#include "iothreadpool.h"
#include <zencore/except.h>
+#include <zencore/logging.h>
+#include <zencore/thread.h>
#if ZEN_PLATFORM_WINDOWS
+# include <algorithm>
+# include <thread>
+
namespace zen {
-WinIoThreadPool::WinIoThreadPool(int InThreadCount, int InMaxThreadCount)
+//////////////////////////////////////////////////////////////////////////
+// Factory
+
+std::unique_ptr<WinIoThreadPool>
+WinIoThreadPool::Create(bool UseExplicitThreads, int MinThreads, int MaxThreads)
+{
+ if (UseExplicitThreads)
+ {
+ return std::make_unique<ExplicitIoThreadPool>(MinThreads, MaxThreads);
+ }
+ else
+ {
+ return std::make_unique<WinTpIoThreadPool>(MinThreads, MaxThreads);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+// WinTpIoThreadPool - Windows Thread Pool implementation
+
+WinTpIoThreadPool::WinTpIoThreadPool(int InThreadCount, int InMaxThreadCount)
{
ZEN_ASSERT(InThreadCount);
@@ -31,7 +55,7 @@ WinIoThreadPool::WinIoThreadPool(int InThreadCount, int InMaxThreadCount)
SetThreadpoolCallbackCleanupGroup(&m_CallbackEnvironment, m_CleanupGroup, NULL);
}
-WinIoThreadPool::~WinIoThreadPool()
+WinTpIoThreadPool::~WinTpIoThreadPool()
{
// this will wait for all callbacks to complete and tear down the `CreateThreadpoolIo`
// object and release all related objects
@@ -42,7 +66,7 @@ WinIoThreadPool::~WinIoThreadPool()
}
void
-WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode)
+WinTpIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode)
{
ZEN_ASSERT(!m_ThreadPoolIo);
@@ -54,6 +78,251 @@ WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, voi
}
}
+void
+WinTpIoThreadPool::StartIo()
+{
+ StartThreadpoolIo(m_ThreadPoolIo);
+}
+
+void
+WinTpIoThreadPool::CancelIo()
+{
+ CancelThreadpoolIo(m_ThreadPoolIo);
+}
+
+//////////////////////////////////////////////////////////////////////////
+// ExplicitIoThreadPool - Raw IOCP + std::thread with load-based scaling
+
+static LoggerRef
+ExplicitIoPoolLog()
+{
+ static LoggerRef s_Log = logging::Get("iopool");
+ return s_Log;
+}
+
+ExplicitIoThreadPool::ExplicitIoThreadPool(int InMinThreadCount, int InMaxThreadCount)
+: m_MinThreads(InMinThreadCount)
+, m_MaxThreads(InMaxThreadCount)
+{
+ ZEN_ASSERT(InMinThreadCount > 0);
+
+ if (m_MaxThreads < m_MinThreads)
+ {
+ m_MaxThreads = m_MinThreads;
+ }
+
+ m_Iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+
+ if (!m_Iocp)
+ {
+ ZEN_LOG_ERROR(ExplicitIoPoolLog(), "failed to create I/O completion port: {}", GetLastError());
+ }
+}
+
+ExplicitIoThreadPool::~ExplicitIoThreadPool()
+{
+ m_ShuttingDown.store(true, std::memory_order::release);
+
+ // Post poison-pill completions to wake all threads
+ const int ThreadCount = m_TotalThreads.load(std::memory_order::acquire);
+ for (int i = 0; i < ThreadCount; ++i)
+ {
+ PostQueuedCompletionStatus(m_Iocp, 0, 0, nullptr);
+ }
+
+ // Join all threads
+ {
+ RwLock::ExclusiveLockScope _(m_ThreadListLock);
+ for (auto& Thread : m_Threads)
+ {
+ if (Thread.joinable())
+ {
+ Thread.join();
+ }
+ }
+ m_Threads.clear();
+ }
+
+ if (m_Iocp)
+ {
+ CloseHandle(m_Iocp);
+ m_Iocp = nullptr;
+ }
+}
+
+void
+ExplicitIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode)
+{
+ ZEN_ASSERT(m_Iocp);
+ ZEN_ASSERT(!m_Callback);
+
+ m_Callback = Callback;
+ m_Context = Context;
+
+ // Associate the I/O handle with our completion port
+ HANDLE Result = CreateIoCompletionPort(IoHandle, m_Iocp, /* CompletionKey */ 0, 0);
+
+ if (!Result)
+ {
+ ErrorCode = MakeErrorCodeFromLastError();
+ return;
+ }
+
+ // Now spawn the initial worker threads
+ for (int i = 0; i < m_MinThreads; ++i)
+ {
+ SpawnWorkerThread();
+ }
+}
+
+void
+ExplicitIoThreadPool::StartIo()
+{
+ // No-op for raw IOCP - completions are posted automatically
+}
+
+void
+ExplicitIoThreadPool::CancelIo()
+{
+ // No-op for raw IOCP - completions are posted automatically
+}
+
+void
+ExplicitIoThreadPool::PruneExitedThreads()
+{
+ // Must be called under m_ThreadListLock
+ if (m_ExitedThreadIds.empty())
+ {
+ return;
+ }
+
+ for (auto It = m_Threads.begin(); It != m_Threads.end();)
+ {
+ auto IdIt = std::find(m_ExitedThreadIds.begin(), m_ExitedThreadIds.end(), It->get_id());
+ if (IdIt != m_ExitedThreadIds.end())
+ {
+ It->join();
+ It = m_Threads.erase(It);
+ m_ExitedThreadIds.erase(IdIt);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+void
+ExplicitIoThreadPool::SpawnWorkerThread()
+{
+ RwLock::ExclusiveLockScope _(m_ThreadListLock);
+
+ PruneExitedThreads();
+ ++m_TotalThreads;
+ m_Threads.emplace_back([this] { WorkerThreadMain(); });
+}
+
+void
+ExplicitIoThreadPool::WorkerThreadMain()
+{
+ static std::atomic<int> s_ThreadIndex{0};
+ const int ThreadIndex = ++s_ThreadIndex;
+ ExtendableStringBuilder<16> ThreadName;
+ ThreadName << "xpio_" << ThreadIndex;
+ SetCurrentThreadName(ThreadName);
+
+ static constexpr DWORD kIdleTimeoutMs = 15000;
+
+ while (!m_ShuttingDown.load(std::memory_order::acquire))
+ {
+ DWORD BytesTransferred = 0;
+ ULONG_PTR CompletionKey = 0;
+ OVERLAPPED* pOverlapped = nullptr;
+
+ BOOL Success = GetQueuedCompletionStatus(m_Iocp, &BytesTransferred, &CompletionKey, &pOverlapped, kIdleTimeoutMs);
+
+ if (m_ShuttingDown.load(std::memory_order::acquire))
+ {
+ break;
+ }
+
+ if (!Success && !pOverlapped)
+ {
+ DWORD Error = GetLastError();
+
+ if (Error == WAIT_TIMEOUT)
+ {
+ // Timeout - consider scaling down
+ const int CurrentTotal = m_TotalThreads.load(std::memory_order::acquire);
+ if (CurrentTotal > m_MinThreads)
+ {
+ // Try to claim this thread for exit by decrementing the count.
+ // Use CAS to avoid thundering herd of exits.
+ int Expected = CurrentTotal;
+ if (m_TotalThreads.compare_exchange_strong(Expected, CurrentTotal - 1, std::memory_order::acq_rel))
+ {
+ ZEN_LOG_DEBUG(ExplicitIoPoolLog(),
+ "scaling down I/O thread (idle timeout), {} threads remaining",
+ CurrentTotal - 1);
+ {
+ RwLock::ExclusiveLockScope _(m_ThreadListLock);
+ m_ExitedThreadIds.push_back(std::this_thread::get_id());
+ }
+ return; // Thread exits
+ }
+ }
+ continue;
+ }
+
+ // Some other error with no overlapped - unexpected
+ ZEN_LOG_WARN(ExplicitIoPoolLog(), "GetQueuedCompletionStatus failed with error {}", Error);
+ continue;
+ }
+
+ if (!pOverlapped)
+ {
+ // Poison pill (null overlapped) - shutdown signal
+ break;
+ }
+
+ // Got a real completion - determine the I/O result
+ ULONG IoResult = NO_ERROR;
+ if (!Success)
+ {
+ IoResult = GetLastError();
+ }
+
+ // Track active threads for scale-up decisions
+ const int ActiveBefore = m_ActiveCount.fetch_add(1, std::memory_order::acq_rel);
+ const int TotalNow = m_TotalThreads.load(std::memory_order::acquire);
+
+ // Scale up: if all threads are now busy and we haven't hit the max, spawn another
+ if ((ActiveBefore + 1) >= TotalNow && TotalNow < m_MaxThreads)
+ {
+ // Use CAS to ensure only one thread triggers the scale-up
+ int Expected = TotalNow;
+ if (m_TotalThreads.compare_exchange_strong(Expected, TotalNow + 1, std::memory_order::acq_rel))
+ {
+ ZEN_LOG_DEBUG(ExplicitIoPoolLog(), "scaling up I/O thread pool, {} -> {} threads", TotalNow, TotalNow + 1);
+
+ // Spawn outside the hot path - but we need the thread list lock
+ // We already incremented m_TotalThreads, so do the actual spawn
+ {
+ RwLock::ExclusiveLockScope _(m_ThreadListLock);
+ PruneExitedThreads();
+ m_Threads.emplace_back([this] { WorkerThreadMain(); });
+ }
+ }
+ }
+
+ // Invoke the callback with the same signature as PTP_WIN32_IO_CALLBACK
+ // Parameters: Instance, Context, Overlapped, IoResult, NumberOfBytesTransferred, Io
+ m_Callback(nullptr, m_Context, pOverlapped, IoResult, BytesTransferred, nullptr);
+
+ m_ActiveCount.fetch_sub(1, std::memory_order::release);
+ }
+}
+
} // namespace zen
#endif
diff --git a/src/zenhttp/servers/iothreadpool.h b/src/zenhttp/servers/iothreadpool.h
index e75e95e58..f6bfce450 100644
--- a/src/zenhttp/servers/iothreadpool.h
+++ b/src/zenhttp/servers/iothreadpool.h
@@ -4,21 +4,66 @@
#include <zencore/zencore.h>
+#include <zencore/thread.h>
+
#if ZEN_PLATFORM_WINDOWS
# include <zencore/windows.h>
+# include <memory>
# include <system_error>
namespace zen {
+/**
+ * @brief Abstract base class for I/O thread pools used by the http.sys server.
+ *
+ * Two implementations are available:
+ * - WinTpIoThreadPool: Uses the Windows Thread Pool API (default, current behavior)
+ * - ExplicitIoThreadPool: Uses raw IOCP + std::thread with load-based scaling
+ */
class WinIoThreadPool
{
public:
- WinIoThreadPool(int InThreadCount, int InMaxThreadCount);
- ~WinIoThreadPool();
+ virtual ~WinIoThreadPool() = default;
+
+ /**
+ * @brief Bind an I/O handle to this pool with the given callback
+ */
+ virtual void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) = 0;
+
+ /**
+ * @brief Called before issuing an async I/O operation.
+ * The Windows TP implementation calls StartThreadpoolIo; the explicit implementation is a no-op.
+ */
+ virtual void StartIo() = 0;
- void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode);
- inline PTP_IO Iocp() const { return m_ThreadPoolIo; }
+ /**
+ * @brief Called when an async I/O operation fails synchronously (to undo StartIo).
+ * The Windows TP implementation calls CancelThreadpoolIo; the explicit implementation is a no-op.
+ */
+ virtual void CancelIo() = 0;
+
+ /**
+ * @brief Factory method to create an I/O thread pool
+ * @param UseExplicitThreads If true, creates an ExplicitIoThreadPool; otherwise creates a WinTpIoThreadPool
+ * @param MinThreads Minimum number of threads
+ * @param MaxThreads Maximum number of threads
+ */
+ static std::unique_ptr<WinIoThreadPool> Create(bool UseExplicitThreads, int MinThreads, int MaxThreads);
+};
+
+/**
+ * @brief Windows Thread Pool implementation (wraps CreateThreadpool/CreateThreadpoolIo)
+ */
+class WinTpIoThreadPool : public WinIoThreadPool
+{
+public:
+ WinTpIoThreadPool(int InThreadCount, int InMaxThreadCount);
+ ~WinTpIoThreadPool();
+
+ virtual void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) override;
+ virtual void StartIo() override;
+ virtual void CancelIo() override;
private:
PTP_POOL m_ThreadPool = nullptr;
@@ -27,5 +72,44 @@ private:
TP_CALLBACK_ENVIRON m_CallbackEnvironment;
};
+/**
+ * @brief Explicit IOCP + std::thread implementation with load-based thread scaling
+ *
+ * Creates a raw I/O completion port and manages worker threads directly. Threads
+ * scale up when all are busy servicing completions, and scale down when idle for
+ * an extended period.
+ */
+class ExplicitIoThreadPool : public WinIoThreadPool
+{
+public:
+ ExplicitIoThreadPool(int InMinThreadCount, int InMaxThreadCount);
+ ~ExplicitIoThreadPool();
+
+ virtual void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) override;
+ virtual void StartIo() override;
+ virtual void CancelIo() override;
+
+private:
+ void WorkerThreadMain();
+ void SpawnWorkerThread();
+ void PruneExitedThreads();
+
+ HANDLE m_Iocp = nullptr;
+
+ PTP_WIN32_IO_CALLBACK m_Callback = nullptr;
+ void* m_Context = nullptr;
+
+ int m_MinThreads;
+ int m_MaxThreads;
+
+ std::atomic<int> m_TotalThreads{0};
+ std::atomic<int> m_ActiveCount{0};
+ std::atomic<bool> m_ShuttingDown{false};
+
+ RwLock m_ThreadListLock;
+ std::vector<std::thread> m_Threads;
+ std::vector<std::thread::id> m_ExitedThreadIds;
+};
+
} // namespace zen
#endif
diff --git a/src/zenhttp/servers/wshttpsys.cpp b/src/zenhttp/servers/wshttpsys.cpp
index af320172d..6abf805ec 100644
--- a/src/zenhttp/servers/wshttpsys.cpp
+++ b/src/zenhttp/servers/wshttpsys.cpp
@@ -4,6 +4,7 @@
#if ZEN_WITH_HTTPSYS
+# include "iothreadpool.h"
# include "wsframecodec.h"
# include <zencore/logging.h>
@@ -23,12 +24,12 @@ WsHttpSysLog()
WsHttpSysConnection::WsHttpSysConnection(HANDLE RequestQueueHandle,
HTTP_REQUEST_ID RequestId,
IWebSocketHandler& Handler,
- PTP_IO Iocp,
+ WinIoThreadPool& IoThreadPool,
HttpServer* Server)
: m_RequestQueueHandle(RequestQueueHandle)
, m_RequestId(RequestId)
, m_Handler(Handler)
-, m_Iocp(Iocp)
+, m_IoThreadPool(IoThreadPool)
, m_HttpServer(Server)
, m_ReadBuffer(8192)
{
@@ -98,7 +99,7 @@ WsHttpSysConnection::IssueAsyncRead()
ZeroMemory(&m_ReadIoContext.Overlapped, sizeof(OVERLAPPED));
- StartThreadpoolIo(m_Iocp);
+ m_IoThreadPool.StartIo();
ULONG Result = HttpReceiveRequestEntityBody(m_RequestQueueHandle,
m_RequestId,
@@ -110,7 +111,7 @@ WsHttpSysConnection::IssueAsyncRead()
if (Result != NO_ERROR && Result != ERROR_IO_PENDING)
{
- CancelThreadpoolIo(m_Iocp);
+ m_IoThreadPool.CancelIo();
m_OutstandingOps.fetch_sub(1, std::memory_order_relaxed);
if (m_IsOpen.exchange(false))
@@ -315,7 +316,7 @@ WsHttpSysConnection::FlushWriteQueue()
ZeroMemory(&m_WriteIoContext.Overlapped, sizeof(OVERLAPPED));
- StartThreadpoolIo(m_Iocp);
+ m_IoThreadPool.StartIo();
ULONG Result = HttpSendResponseEntityBody(m_RequestQueueHandle,
m_RequestId,
@@ -330,7 +331,7 @@ WsHttpSysConnection::FlushWriteQueue()
if (Result != NO_ERROR && Result != ERROR_IO_PENDING)
{
- CancelThreadpoolIo(m_Iocp);
+ m_IoThreadPool.CancelIo();
m_OutstandingOps.fetch_sub(1, std::memory_order_relaxed);
ZEN_LOG_DEBUG(WsHttpSysLog(), "WebSocket async write failed: {}", Result);
diff --git a/src/zenhttp/servers/wshttpsys.h b/src/zenhttp/servers/wshttpsys.h
index 6015e3873..7b10fe563 100644
--- a/src/zenhttp/servers/wshttpsys.h
+++ b/src/zenhttp/servers/wshttpsys.h
@@ -20,6 +20,7 @@
namespace zen {
class HttpServer;
+class WinIoThreadPool;
/**
* WebSocket connection over an http.sys opaque-mode connection
@@ -39,7 +40,11 @@ class HttpServer;
class WsHttpSysConnection : public WebSocketConnection
{
public:
- WsHttpSysConnection(HANDLE RequestQueueHandle, HTTP_REQUEST_ID RequestId, IWebSocketHandler& Handler, PTP_IO Iocp, HttpServer* Server);
+ WsHttpSysConnection(HANDLE RequestQueueHandle,
+ HTTP_REQUEST_ID RequestId,
+ IWebSocketHandler& Handler,
+ WinIoThreadPool& IoThreadPool,
+ HttpServer* Server);
~WsHttpSysConnection() override;
/**
@@ -76,7 +81,7 @@ private:
HANDLE m_RequestQueueHandle;
HTTP_REQUEST_ID m_RequestId;
IWebSocketHandler& m_Handler;
- PTP_IO m_Iocp;
+ WinIoThreadPool& m_IoThreadPool;
HttpServer* m_HttpServer;
// Tagged OVERLAPPED contexts for concurrent read and write