aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/servers
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-09 17:43:08 +0100
committerGitHub Enterprise <[email protected]>2026-03-09 17:43:08 +0100
commitb37b34ea6ad906f54e8104526e77ba66aed997da (patch)
treee80ce17d666aff6d2f0d73d4977128ffb4055476 /src/zenhttp/servers
parentadd fallback for zencache multirange (#816) (diff)
downloadzen-b37b34ea6ad906f54e8104526e77ba66aed997da.tar.xz
zen-b37b34ea6ad906f54e8104526e77ba66aed997da.zip
Dashboard overhaul, compute integration (#814)
- **Frontend dashboard overhaul**: Unified compute/main dashboards into a single shared UI. Added new pages for cache, projects, metrics, sessions, info (build/runtime config, system stats). Added live-update via WebSockets with pause control, sortable detail tables, themed styling. Refactored compute/hub/orchestrator pages into modular JS. - **HTTP server fixes and stats**: Fixed http.sys local-only fallback when default port is in use, implemented root endpoint redirect for http.sys, fixed Linux/Mac port reuse. Added /stats endpoint exposing HTTP server metrics (bytes transferred, request rates). Added WebSocket stats tracking. - **OTEL/diagnostics hardening**: Improved OTLP HTTP exporter with better error handling and resilience. Extended diagnostics services configuration. - **Session management**: Added new sessions service with HTTP endpoints for registering, updating, querying, and removing sessions. Includes session log file support. This is still WIP. - **CLI subcommand support**: Added support for commands with subcommands in the zen CLI tool, with improved command dispatch. - **Misc**: Exposed CPU usage/hostname to frontend, fixed JS compact binary float32/float64 decoding, limited projects displayed on front page to 25 sorted by last access, added vscode:// link support. Also contains some fixes from TSAN analysis.
Diffstat (limited to 'src/zenhttp/servers')
-rw-r--r--src/zenhttp/servers/httpasio.cpp104
-rw-r--r--src/zenhttp/servers/httpplugin.cpp2
-rw-r--r--src/zenhttp/servers/httpsys.cpp113
-rw-r--r--src/zenhttp/servers/wsasio.cpp18
-rw-r--r--src/zenhttp/servers/wsasio.h8
-rw-r--r--src/zenhttp/servers/wshttpsys.cpp23
-rw-r--r--src/zenhttp/servers/wshttpsys.h5
-rw-r--r--src/zenhttp/servers/wstest.cpp3
8 files changed, 194 insertions, 82 deletions
diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp
index 2cf051d14..f5178ebe8 100644
--- a/src/zenhttp/servers/httpasio.cpp
+++ b/src/zenhttp/servers/httpasio.cpp
@@ -531,6 +531,8 @@ public:
std::atomic<uint64_t> m_TotalBytesReceived{0};
std::atomic<uint64_t> m_TotalBytesSent{0};
+
+ HttpServer* m_HttpServer = nullptr;
};
/**
@@ -949,6 +951,7 @@ private:
void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount);
void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, uint32_t RequestNumber, HttpResponse* ResponseToPop);
void CloseConnection();
+ void SendInlineResponse(uint32_t RequestNumber, std::string_view StatusLine, std::string_view Headers = {}, std::string_view Body = {});
HttpAsioServerImpl& m_Server;
asio::streambuf m_RequestBuffer;
@@ -1167,6 +1170,38 @@ HttpServerConnection::CloseConnection()
}
void
+HttpServerConnection::SendInlineResponse(uint32_t RequestNumber,
+ std::string_view StatusLine,
+ std::string_view Headers,
+ std::string_view Body)
+{
+ ExtendableStringBuilder<256> ResponseBuilder;
+ ResponseBuilder << "HTTP/1.1 " << StatusLine << "\r\n";
+ if (!Headers.empty())
+ {
+ ResponseBuilder << Headers;
+ }
+ if (!m_RequestData.IsKeepAlive())
+ {
+ ResponseBuilder << "Connection: close\r\n";
+ }
+ ResponseBuilder << "\r\n";
+ if (!Body.empty())
+ {
+ ResponseBuilder << Body;
+ }
+ auto ResponseView = ResponseBuilder.ToView();
+ IoBuffer ResponseData(IoBuffer::Clone, ResponseView.data(), ResponseView.size());
+ auto Buffer = asio::buffer(ResponseData.GetData(), ResponseData.GetSize());
+ asio::async_write(
+ *m_Socket.get(),
+ Buffer,
+ [Conn = AsSharedPtr(), RequestNumber, Response = std::move(ResponseData)](const asio::error_code& Ec, std::size_t ByteCount) {
+ Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr);
+ });
+}
+
+void
HttpServerConnection::HandleRequest()
{
ZEN_MEMSCOPE(GetHttpasioTag());
@@ -1204,7 +1239,9 @@ HttpServerConnection::HandleRequest()
return;
}
- Ref<WsAsioConnection> WsConn(new WsAsioConnection(std::move(Conn->m_Socket), *WsHandler));
+ Conn->m_Server.m_HttpServer->OnWebSocketConnectionOpened();
+ Ref<WsAsioConnection> WsConn(
+ new WsAsioConnection(std::move(Conn->m_Socket), *WsHandler, Conn->m_Server.m_HttpServer));
Ref<WebSocketConnection> WsConnRef(WsConn.Get());
WsHandler->OnWebSocketOpen(std::move(WsConnRef));
@@ -1241,6 +1278,8 @@ HttpServerConnection::HandleRequest()
{
ZEN_TRACE_CPU("asio::HandleRequest");
+ m_Server.m_HttpServer->MarkRequest();
+
auto RemoteEndpoint = m_Socket->remote_endpoint();
bool IsLocalConnection = m_Socket->local_endpoint().address() == RemoteEndpoint.address();
@@ -1378,51 +1417,24 @@ HttpServerConnection::HandleRequest()
}
}
- if (m_RequestData.RequestVerb() == HttpVerb::kHead)
+ // If a default redirect is configured and the request is for the root path, send a 302
+ std::string_view DefaultRedirect = m_Server.m_HttpServer->GetDefaultRedirect();
+ if (!DefaultRedirect.empty() && (m_RequestData.Url() == "/" || m_RequestData.Url().empty()))
{
- std::string_view Response =
- "HTTP/1.1 404 NOT FOUND\r\n"
- "\r\n"sv;
-
- if (!m_RequestData.IsKeepAlive())
- {
- Response =
- "HTTP/1.1 404 NOT FOUND\r\n"
- "Connection: close\r\n"
- "\r\n"sv;
- }
-
- asio::async_write(*m_Socket.get(),
- asio::buffer(Response),
- [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) {
- Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr);
- });
+ ExtendableStringBuilder<128> Headers;
+ Headers << "Location: " << DefaultRedirect << "\r\nContent-Length: 0\r\n";
+ SendInlineResponse(RequestNumber, "302 Found"sv, Headers.ToView());
+ }
+ else if (m_RequestData.RequestVerb() == HttpVerb::kHead)
+ {
+ SendInlineResponse(RequestNumber, "404 NOT FOUND"sv);
}
else
{
- std::string_view Response =
- "HTTP/1.1 404 NOT FOUND\r\n"
- "Content-Length: 23\r\n"
- "Content-Type: text/plain\r\n"
- "\r\n"
- "No suitable route found"sv;
-
- if (!m_RequestData.IsKeepAlive())
- {
- Response =
- "HTTP/1.1 404 NOT FOUND\r\n"
- "Content-Length: 23\r\n"
- "Content-Type: text/plain\r\n"
- "Connection: close\r\n"
- "\r\n"
- "No suitable route found"sv;
- }
-
- asio::async_write(*m_Socket.get(),
- asio::buffer(Response),
- [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) {
- Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr);
- });
+ SendInlineResponse(RequestNumber,
+ "404 NOT FOUND"sv,
+ "Content-Length: 23\r\nContent-Type: text/plain\r\n"sv,
+ "No suitable route found"sv);
}
}
@@ -1448,8 +1460,11 @@ struct HttpAcceptor
m_Acceptor.set_option(exclusive_address(true));
m_AlternateProtocolAcceptor.set_option(exclusive_address(true));
#else // ZEN_PLATFORM_WINDOWS
- m_Acceptor.set_option(asio::socket_base::reuse_address(false));
- m_AlternateProtocolAcceptor.set_option(asio::socket_base::reuse_address(false));
+ // Allow binding to a port in TIME_WAIT so the server can restart immediately
+ // after a previous instance exits. On Linux this does not allow two processes
+ // to actively listen on the same port simultaneously.
+ m_Acceptor.set_option(asio::socket_base::reuse_address(true));
+ m_AlternateProtocolAcceptor.set_option(asio::socket_base::reuse_address(true));
#endif // ZEN_PLATFORM_WINDOWS
m_Acceptor.set_option(asio::ip::tcp::no_delay(true));
@@ -2092,6 +2107,7 @@ HttpAsioServer::HttpAsioServer(const AsioConfig& Config)
: m_InitialConfig(Config)
, m_Impl(std::make_unique<asio_http::HttpAsioServerImpl>())
{
+ m_Impl->m_HttpServer = this;
ZEN_DEBUG("Request object size: {} ({:#x})", sizeof(HttpRequestParser), sizeof(HttpRequestParser));
}
diff --git a/src/zenhttp/servers/httpplugin.cpp b/src/zenhttp/servers/httpplugin.cpp
index 021b941bd..4bf8c61bb 100644
--- a/src/zenhttp/servers/httpplugin.cpp
+++ b/src/zenhttp/servers/httpplugin.cpp
@@ -378,6 +378,8 @@ HttpPluginConnectionHandler::HandleRequest()
{
ZEN_TRACE_CPU("http_plugin::HandleRequest");
+ m_Server->MarkRequest();
+
HttpPluginServerRequest Request(m_RequestParser, *Service, m_RequestParser.Body());
const HttpVerb RequestVerb = Request.RequestVerb();
diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp
index cf639c114..dfe6bb6aa 100644
--- a/src/zenhttp/servers/httpsys.cpp
+++ b/src/zenhttp/servers/httpsys.cpp
@@ -451,6 +451,8 @@ public:
inline uint16_t GetResponseCode() const { return m_ResponseCode; }
inline int64_t GetResponseBodySize() const { return m_TotalDataSize; }
+ void SetLocationHeader(std::string_view Location) { m_LocationHeader = Location; }
+
private:
eastl::fixed_vector<HTTP_DATA_CHUNK, 16> m_HttpDataChunks;
uint64_t m_TotalDataSize = 0; // Sum of all chunk sizes
@@ -460,6 +462,7 @@ private:
bool m_IsInitialResponse = true;
HttpContentType m_ContentType = HttpContentType::kBinary;
eastl::fixed_vector<IoBuffer, 16> m_DataBuffers;
+ std::string m_LocationHeader;
void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> Blobs);
};
@@ -715,6 +718,15 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode)
ContentTypeHeader->pRawValue = ContentTypeString.data();
ContentTypeHeader->RawValueLength = (USHORT)ContentTypeString.size();
+ // Location header (for redirects)
+
+ if (!m_LocationHeader.empty())
+ {
+ PHTTP_KNOWN_HEADER LocationHeader = &HttpResponse.Headers.KnownHeaders[HttpHeaderLocation];
+ LocationHeader->pRawValue = m_LocationHeader.data();
+ LocationHeader->RawValueLength = (USHORT)m_LocationHeader.size();
+ }
+
std::string_view ReasonString = ReasonStringForHttpResultCode(m_ResponseCode);
HttpResponse.StatusCode = m_ResponseCode;
@@ -916,7 +928,10 @@ HttpAsyncWorkRequest::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTr
ZEN_UNUSED(IoResult, NumberOfBytesTransferred);
- ZEN_WARN("Unexpected I/O completion during async work! IoResult: {}, NumberOfBytesTransferred: {}", IoResult, NumberOfBytesTransferred);
+ ZEN_WARN("Unexpected I/O completion during async work! IoResult: {} ({:#x}), NumberOfBytesTransferred: {}",
+ GetSystemErrorAsString(IoResult),
+ IoResult,
+ NumberOfBytesTransferred);
return this;
}
@@ -1083,7 +1098,10 @@ HttpSysServer::InitializeServer(int BasePort)
if (Result != NO_ERROR)
{
- ZEN_ERROR("Failed to create server session for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result);
+ ZEN_ERROR("Failed to create server session for '{}': {} ({:#x})",
+ WideToUtf8(WildcardUrlPath),
+ GetSystemErrorAsString(Result),
+ Result);
return 0;
}
@@ -1092,7 +1110,7 @@ HttpSysServer::InitializeServer(int BasePort)
if (Result != NO_ERROR)
{
- ZEN_ERROR("Failed to create URL group for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result);
+ ZEN_ERROR("Failed to create URL group for '{}': {} ({:#x})", WideToUtf8(WildcardUrlPath), GetSystemErrorAsString(Result), Result);
return 0;
}
@@ -1116,7 +1134,9 @@ HttpSysServer::InitializeServer(int BasePort)
if ((Result == ERROR_SHARING_VIOLATION))
{
- ZEN_INFO("Desired port {} is in use (HttpAddUrlToUrlGroup returned: {}), retrying", EffectivePort, Result);
+ ZEN_INFO("Desired port {} is in use (HttpAddUrlToUrlGroup returned: {}), retrying",
+ EffectivePort,
+ GetSystemErrorAsString(Result));
Sleep(500);
Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, WildcardUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0);
@@ -1138,7 +1158,9 @@ HttpSysServer::InitializeServer(int BasePort)
{
for (uint32_t Retries = 0; (Result == ERROR_SHARING_VIOLATION) && (Retries < 3); Retries++)
{
- ZEN_INFO("Desired port {} is in use (HttpAddUrlToUrlGroup returned: {}), retrying", EffectivePort, Result);
+ ZEN_INFO("Desired port {} is in use (HttpAddUrlToUrlGroup returned: {}), retrying",
+ EffectivePort,
+ GetSystemErrorAsString(Result));
Sleep(500);
Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, WildcardUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0);
}
@@ -1173,17 +1195,18 @@ HttpSysServer::InitializeServer(int BasePort)
const std::u8string_view Hosts[] = {u8"[::1]"sv, u8"localhost"sv, u8"127.0.0.1"sv};
- ULONG InternalResult = ERROR_SHARING_VIOLATION;
- for (int PortOffset = 0; (InternalResult == ERROR_SHARING_VIOLATION) && (PortOffset < 10); ++PortOffset)
+ bool ShouldRetryNextPort = true;
+ for (int PortOffset = 0; ShouldRetryNextPort && (PortOffset < 10); ++PortOffset)
{
- EffectivePort = BasePort + (PortOffset * 100);
+ EffectivePort = BasePort + (PortOffset * 100);
+ ShouldRetryNextPort = false;
for (const std::u8string_view Host : Hosts)
{
WideStringBuilder<64> LocalUrlPath;
LocalUrlPath << u8"http://"sv << Host << u8":"sv << int64_t(EffectivePort) << u8"/"sv;
- InternalResult = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, LocalUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0);
+ ULONG InternalResult = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, LocalUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0);
if (InternalResult == NO_ERROR)
{
@@ -1191,11 +1214,25 @@ HttpSysServer::InitializeServer(int BasePort)
m_BaseUris.push_back(LocalUrlPath.c_str());
}
+ else if (InternalResult == ERROR_SHARING_VIOLATION || InternalResult == ERROR_ACCESS_DENIED)
+ {
+ // Port may be owned by another process's wildcard registration (access denied)
+ // or actively in use (sharing violation) — retry on a different port
+ ShouldRetryNextPort = true;
+ }
else
{
- break;
+ ZEN_WARN("Failed to register local handler '{}': {} ({:#x})",
+ WideToUtf8(LocalUrlPath),
+ GetSystemErrorAsString(InternalResult),
+ InternalResult);
}
}
+
+ if (!m_BaseUris.empty())
+ {
+ break;
+ }
}
}
else
@@ -1211,7 +1248,10 @@ HttpSysServer::InitializeServer(int BasePort)
if (m_BaseUris.empty())
{
- ZEN_ERROR("Failed to add base URL to URL group for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result);
+ ZEN_ERROR("Failed to add base URL to URL group for '{}': {} ({:#x})",
+ WideToUtf8(WildcardUrlPath),
+ GetSystemErrorAsString(Result),
+ Result);
return 0;
}
@@ -1229,7 +1269,10 @@ HttpSysServer::InitializeServer(int BasePort)
if (Result != NO_ERROR)
{
- ZEN_ERROR("Failed to create request queue for '{}': {:#x}", WideToUtf8(m_BaseUris.front()), Result);
+ ZEN_ERROR("Failed to create request queue for '{}': {} ({:#x})",
+ WideToUtf8(m_BaseUris.front()),
+ GetSystemErrorAsString(Result),
+ Result);
return 0;
}
@@ -1241,7 +1284,10 @@ HttpSysServer::InitializeServer(int BasePort)
if (Result != NO_ERROR)
{
- ZEN_ERROR("Failed to set server binding property for '{}': {:#x}", WideToUtf8(m_BaseUris.front()), Result);
+ ZEN_ERROR("Failed to set server binding property for '{}': {} ({:#x})",
+ WideToUtf8(m_BaseUris.front()),
+ GetSystemErrorAsString(Result),
+ Result);
return 0;
}
@@ -1273,7 +1319,7 @@ HttpSysServer::InitializeServer(int BasePort)
if (Result != NO_ERROR)
{
- ZEN_WARN("changing request queue length to {} failed: {}", QueueLength, Result);
+ ZEN_WARN("changing request queue length to {} failed: {} ({:#x})", QueueLength, GetSystemErrorAsString(Result), Result);
}
}
@@ -1295,21 +1341,6 @@ HttpSysServer::InitializeServer(int BasePort)
ZEN_INFO("Started http.sys server at '{}'", WideToUtf8(m_BaseUris.front()));
}
- // This is not available in all Windows SDK versions so for now we can't use recently
- // released functionality. We should investigate how to get more recent SDK releases
- // into the build
-
-# if 0
- if (HttpIsFeatureSupported(/* HttpFeatureHttp3 */ (HTTP_FEATURE_ID) 4))
- {
- ZEN_DEBUG("HTTP3 is available");
- }
- else
- {
- ZEN_DEBUG("HTTP3 is NOT available");
- }
-# endif
-
return EffectivePort;
}
@@ -1695,6 +1726,8 @@ HttpSysTransaction::InvokeRequestHandler(HttpService& Service, IoBuffer Payload)
{
HttpSysServerRequest& ThisRequest = m_HandlerRequest.emplace(*this, Service, Payload);
+ m_HttpServer.MarkRequest();
+
// Default request handling
# if ZEN_WITH_OTEL
@@ -2245,8 +2278,12 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT
if (SendResult == NO_ERROR)
{
- Ref<WsHttpSysConnection> WsConn(
- new WsHttpSysConnection(RequestQueueHandle, RequestId, *WsHandler, Transaction().Iocp()));
+ Transaction().Server().OnWebSocketConnectionOpened();
+ Ref<WsHttpSysConnection> WsConn(new WsHttpSysConnection(RequestQueueHandle,
+ RequestId,
+ *WsHandler,
+ Transaction().Iocp(),
+ &Transaction().Server()));
Ref<WebSocketConnection> WsConnRef(WsConn.Get());
WsHandler->OnWebSocketOpen(std::move(WsConnRef));
@@ -2255,7 +2292,7 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT
return nullptr;
}
- ZEN_WARN("WebSocket 101 send failed: {}", SendResult);
+ ZEN_WARN("WebSocket 101 send failed: {} ({:#x})", GetSystemErrorAsString(SendResult), SendResult);
// WebSocket upgrade failed — return nullptr since ServerRequest()
// was never populated (no InvokeRequestHandler call)
@@ -2330,6 +2367,18 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT
return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv);
}
}
+ else
+ {
+ // If a default redirect is configured and the request is for the root path, send a 302
+ std::string_view DefaultRedirect = Transaction().Server().GetDefaultRedirect();
+ std::string_view RawUrl(HttpReq->pRawUrl, HttpReq->RawUrlLength);
+ if (!DefaultRedirect.empty() && (RawUrl == "/" || RawUrl.empty()))
+ {
+ auto* Response = new HttpMessageResponseRequest(Transaction(), 302);
+ Response->SetLocationHeader(DefaultRedirect);
+ return Response;
+ }
+ }
// Unable to route
return new HttpMessageResponseRequest(Transaction(), 404, "No suitable route found"sv);
diff --git a/src/zenhttp/servers/wsasio.cpp b/src/zenhttp/servers/wsasio.cpp
index 3e31b58bc..b2543277a 100644
--- a/src/zenhttp/servers/wsasio.cpp
+++ b/src/zenhttp/servers/wsasio.cpp
@@ -4,6 +4,7 @@
#include "wsframecodec.h"
#include <zencore/logging.h>
+#include <zenhttp/httpserver.h>
namespace zen::asio_http {
@@ -16,15 +17,20 @@ WsLog()
//////////////////////////////////////////////////////////////////////////
-WsAsioConnection::WsAsioConnection(std::unique_ptr<asio::ip::tcp::socket> Socket, IWebSocketHandler& Handler)
+WsAsioConnection::WsAsioConnection(std::unique_ptr<asio::ip::tcp::socket> Socket, IWebSocketHandler& Handler, HttpServer* Server)
: m_Socket(std::move(Socket))
, m_Handler(Handler)
+, m_HttpServer(Server)
{
}
WsAsioConnection::~WsAsioConnection()
{
m_IsOpen.store(false);
+ if (m_HttpServer)
+ {
+ m_HttpServer->OnWebSocketConnectionClosed();
+ }
}
void
@@ -101,6 +107,11 @@ WsAsioConnection::ProcessReceivedData()
m_ReadBuffer.consume(Frame.BytesConsumed);
+ if (m_HttpServer)
+ {
+ m_HttpServer->OnWebSocketFrameReceived(Frame.BytesConsumed);
+ }
+
switch (Frame.Opcode)
{
case WebSocketOpcode::kText:
@@ -219,6 +230,11 @@ WsAsioConnection::DoClose(uint16_t Code, std::string_view Reason)
void
WsAsioConnection::EnqueueWrite(std::vector<uint8_t> Frame)
{
+ if (m_HttpServer)
+ {
+ m_HttpServer->OnWebSocketFrameSent(Frame.size());
+ }
+
bool ShouldFlush = false;
m_WriteLock.WithExclusiveLock([&] {
diff --git a/src/zenhttp/servers/wsasio.h b/src/zenhttp/servers/wsasio.h
index d8ffdc00a..e8bb3b1d2 100644
--- a/src/zenhttp/servers/wsasio.h
+++ b/src/zenhttp/servers/wsasio.h
@@ -14,6 +14,10 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include <memory>
#include <vector>
+namespace zen {
+class HttpServer;
+} // namespace zen
+
namespace zen::asio_http {
/**
@@ -27,10 +31,11 @@ namespace zen::asio_http {
* connection alive for the duration of the async operation. The service layer
* also holds a Ref<WebSocketConnection>.
*/
+
class WsAsioConnection : public WebSocketConnection
{
public:
- WsAsioConnection(std::unique_ptr<asio::ip::tcp::socket> Socket, IWebSocketHandler& Handler);
+ WsAsioConnection(std::unique_ptr<asio::ip::tcp::socket> Socket, IWebSocketHandler& Handler, HttpServer* Server);
~WsAsioConnection() override;
/**
@@ -58,6 +63,7 @@ private:
std::unique_ptr<asio::ip::tcp::socket> m_Socket;
IWebSocketHandler& m_Handler;
+ zen::HttpServer* m_HttpServer;
asio::streambuf m_ReadBuffer;
RwLock m_WriteLock;
diff --git a/src/zenhttp/servers/wshttpsys.cpp b/src/zenhttp/servers/wshttpsys.cpp
index 3408b64b3..af320172d 100644
--- a/src/zenhttp/servers/wshttpsys.cpp
+++ b/src/zenhttp/servers/wshttpsys.cpp
@@ -7,6 +7,7 @@
# include "wsframecodec.h"
# include <zencore/logging.h>
+# include <zenhttp/httpserver.h>
namespace zen {
@@ -19,11 +20,16 @@ WsHttpSysLog()
//////////////////////////////////////////////////////////////////////////
-WsHttpSysConnection::WsHttpSysConnection(HANDLE RequestQueueHandle, HTTP_REQUEST_ID RequestId, IWebSocketHandler& Handler, PTP_IO Iocp)
+WsHttpSysConnection::WsHttpSysConnection(HANDLE RequestQueueHandle,
+ HTTP_REQUEST_ID RequestId,
+ IWebSocketHandler& Handler,
+ PTP_IO Iocp,
+ HttpServer* Server)
: m_RequestQueueHandle(RequestQueueHandle)
, m_RequestId(RequestId)
, m_Handler(Handler)
, m_Iocp(Iocp)
+, m_HttpServer(Server)
, m_ReadBuffer(8192)
{
m_ReadIoContext.ContextType = HttpSysIoContext::Type::kWebSocketRead;
@@ -40,6 +46,11 @@ WsHttpSysConnection::~WsHttpSysConnection()
{
Disconnect();
}
+
+ if (m_HttpServer)
+ {
+ m_HttpServer->OnWebSocketConnectionClosed();
+ }
}
void
@@ -174,6 +185,11 @@ WsHttpSysConnection::ProcessReceivedData()
// Remove consumed bytes
m_Accumulated.erase(m_Accumulated.begin(), m_Accumulated.begin() + Frame.BytesConsumed);
+ if (m_HttpServer)
+ {
+ m_HttpServer->OnWebSocketFrameReceived(Frame.BytesConsumed);
+ }
+
switch (Frame.Opcode)
{
case WebSocketOpcode::kText:
@@ -250,6 +266,11 @@ WsHttpSysConnection::ProcessReceivedData()
void
WsHttpSysConnection::EnqueueWrite(std::vector<uint8_t> Frame)
{
+ if (m_HttpServer)
+ {
+ m_HttpServer->OnWebSocketFrameSent(Frame.size());
+ }
+
bool ShouldFlush = false;
{
diff --git a/src/zenhttp/servers/wshttpsys.h b/src/zenhttp/servers/wshttpsys.h
index d854289e0..6015e3873 100644
--- a/src/zenhttp/servers/wshttpsys.h
+++ b/src/zenhttp/servers/wshttpsys.h
@@ -19,6 +19,8 @@
namespace zen {
+class HttpServer;
+
/**
* WebSocket connection over an http.sys opaque-mode connection
*
@@ -37,7 +39,7 @@ namespace zen {
class WsHttpSysConnection : public WebSocketConnection
{
public:
- WsHttpSysConnection(HANDLE RequestQueueHandle, HTTP_REQUEST_ID RequestId, IWebSocketHandler& Handler, PTP_IO Iocp);
+ WsHttpSysConnection(HANDLE RequestQueueHandle, HTTP_REQUEST_ID RequestId, IWebSocketHandler& Handler, PTP_IO Iocp, HttpServer* Server);
~WsHttpSysConnection() override;
/**
@@ -75,6 +77,7 @@ private:
HTTP_REQUEST_ID m_RequestId;
IWebSocketHandler& m_Handler;
PTP_IO m_Iocp;
+ HttpServer* m_HttpServer;
// Tagged OVERLAPPED contexts for concurrent read and write
HttpSysIoContext m_ReadIoContext{};
diff --git a/src/zenhttp/servers/wstest.cpp b/src/zenhttp/servers/wstest.cpp
index fd023c490..2134e4ff1 100644
--- a/src/zenhttp/servers/wstest.cpp
+++ b/src/zenhttp/servers/wstest.cpp
@@ -767,13 +767,12 @@ namespace {
void OnWsMessage(const WebSocketMessage& Msg) override
{
- m_MessageCount.fetch_add(1);
-
if (Msg.Opcode == WebSocketOpcode::kText)
{
std::string_view Text(static_cast<const char*>(Msg.Payload.Data()), Msg.Payload.Size());
m_LastMessage = std::string(Text);
}
+ m_MessageCount.fetch_add(1);
}
void OnWsClose(uint16_t Code, [[maybe_unused]] std::string_view Reason) override