diff options
| author | Stefan Boberg <[email protected]> | 2026-03-09 17:43:08 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-09 17:43:08 +0100 |
| commit | b37b34ea6ad906f54e8104526e77ba66aed997da (patch) | |
| tree | e80ce17d666aff6d2f0d73d4977128ffb4055476 /src/zenhttp/servers | |
| parent | add fallback for zencache multirange (#816) (diff) | |
| download | zen-b37b34ea6ad906f54e8104526e77ba66aed997da.tar.xz zen-b37b34ea6ad906f54e8104526e77ba66aed997da.zip | |
Dashboard overhaul, compute integration (#814)
- **Frontend dashboard overhaul**: Unified compute/main dashboards into a single shared UI. Added new pages for cache, projects, metrics, sessions, info (build/runtime config, system stats). Added live-update via WebSockets with pause control, sortable detail tables, themed styling. Refactored compute/hub/orchestrator pages into modular JS.
- **HTTP server fixes and stats**: Fixed http.sys local-only fallback when default port is in use, implemented root endpoint redirect for http.sys, fixed Linux/Mac port reuse. Added /stats endpoint exposing HTTP server metrics (bytes transferred, request rates). Added WebSocket stats tracking.
- **OTEL/diagnostics hardening**: Improved OTLP HTTP exporter with better error handling and resilience. Extended diagnostics services configuration.
- **Session management**: Added new sessions service with HTTP endpoints for registering, updating, querying, and removing sessions. Includes session log file support. This is still WIP.
- **CLI subcommand support**: Added support for commands with subcommands in the zen CLI tool, with improved command dispatch.
- **Misc**: Exposed CPU usage/hostname to frontend, fixed JS compact binary float32/float64 decoding, limited projects displayed on front page to 25 sorted by last access, added vscode:// link support.
Also contains some fixes from TSAN analysis.
Diffstat (limited to 'src/zenhttp/servers')
| -rw-r--r-- | src/zenhttp/servers/httpasio.cpp | 104 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpplugin.cpp | 2 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpsys.cpp | 113 | ||||
| -rw-r--r-- | src/zenhttp/servers/wsasio.cpp | 18 | ||||
| -rw-r--r-- | src/zenhttp/servers/wsasio.h | 8 | ||||
| -rw-r--r-- | src/zenhttp/servers/wshttpsys.cpp | 23 | ||||
| -rw-r--r-- | src/zenhttp/servers/wshttpsys.h | 5 | ||||
| -rw-r--r-- | src/zenhttp/servers/wstest.cpp | 3 |
8 files changed, 194 insertions, 82 deletions
diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp index 2cf051d14..f5178ebe8 100644 --- a/src/zenhttp/servers/httpasio.cpp +++ b/src/zenhttp/servers/httpasio.cpp @@ -531,6 +531,8 @@ public: std::atomic<uint64_t> m_TotalBytesReceived{0}; std::atomic<uint64_t> m_TotalBytesSent{0}; + + HttpServer* m_HttpServer = nullptr; }; /** @@ -949,6 +951,7 @@ private: void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount); void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, uint32_t RequestNumber, HttpResponse* ResponseToPop); void CloseConnection(); + void SendInlineResponse(uint32_t RequestNumber, std::string_view StatusLine, std::string_view Headers = {}, std::string_view Body = {}); HttpAsioServerImpl& m_Server; asio::streambuf m_RequestBuffer; @@ -1167,6 +1170,38 @@ HttpServerConnection::CloseConnection() } void +HttpServerConnection::SendInlineResponse(uint32_t RequestNumber, + std::string_view StatusLine, + std::string_view Headers, + std::string_view Body) +{ + ExtendableStringBuilder<256> ResponseBuilder; + ResponseBuilder << "HTTP/1.1 " << StatusLine << "\r\n"; + if (!Headers.empty()) + { + ResponseBuilder << Headers; + } + if (!m_RequestData.IsKeepAlive()) + { + ResponseBuilder << "Connection: close\r\n"; + } + ResponseBuilder << "\r\n"; + if (!Body.empty()) + { + ResponseBuilder << Body; + } + auto ResponseView = ResponseBuilder.ToView(); + IoBuffer ResponseData(IoBuffer::Clone, ResponseView.data(), ResponseView.size()); + auto Buffer = asio::buffer(ResponseData.GetData(), ResponseData.GetSize()); + asio::async_write( + *m_Socket.get(), + Buffer, + [Conn = AsSharedPtr(), RequestNumber, Response = std::move(ResponseData)](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); + }); +} + +void HttpServerConnection::HandleRequest() { ZEN_MEMSCOPE(GetHttpasioTag()); @@ -1204,7 +1239,9 @@ HttpServerConnection::HandleRequest() return; } - Ref<WsAsioConnection> WsConn(new WsAsioConnection(std::move(Conn->m_Socket), *WsHandler)); + Conn->m_Server.m_HttpServer->OnWebSocketConnectionOpened(); + Ref<WsAsioConnection> WsConn( + new WsAsioConnection(std::move(Conn->m_Socket), *WsHandler, Conn->m_Server.m_HttpServer)); Ref<WebSocketConnection> WsConnRef(WsConn.Get()); WsHandler->OnWebSocketOpen(std::move(WsConnRef)); @@ -1241,6 +1278,8 @@ HttpServerConnection::HandleRequest() { ZEN_TRACE_CPU("asio::HandleRequest"); + m_Server.m_HttpServer->MarkRequest(); + auto RemoteEndpoint = m_Socket->remote_endpoint(); bool IsLocalConnection = m_Socket->local_endpoint().address() == RemoteEndpoint.address(); @@ -1378,51 +1417,24 @@ HttpServerConnection::HandleRequest() } } - if (m_RequestData.RequestVerb() == HttpVerb::kHead) + // If a default redirect is configured and the request is for the root path, send a 302 + std::string_view DefaultRedirect = m_Server.m_HttpServer->GetDefaultRedirect(); + if (!DefaultRedirect.empty() && (m_RequestData.Url() == "/" || m_RequestData.Url().empty())) { - std::string_view Response = - "HTTP/1.1 404 NOT FOUND\r\n" - "\r\n"sv; - - if (!m_RequestData.IsKeepAlive()) - { - Response = - "HTTP/1.1 404 NOT FOUND\r\n" - "Connection: close\r\n" - "\r\n"sv; - } - - asio::async_write(*m_Socket.get(), - asio::buffer(Response), - [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { - Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); - }); + ExtendableStringBuilder<128> Headers; + Headers << "Location: " << DefaultRedirect << "\r\nContent-Length: 0\r\n"; + SendInlineResponse(RequestNumber, "302 Found"sv, Headers.ToView()); + } + else if (m_RequestData.RequestVerb() == HttpVerb::kHead) + { + SendInlineResponse(RequestNumber, "404 NOT FOUND"sv); } else { - std::string_view Response = - "HTTP/1.1 404 NOT FOUND\r\n" - "Content-Length: 23\r\n" - "Content-Type: text/plain\r\n" - "\r\n" - "No suitable route found"sv; - - if (!m_RequestData.IsKeepAlive()) - { - Response = - "HTTP/1.1 404 NOT FOUND\r\n" - "Content-Length: 23\r\n" - "Content-Type: text/plain\r\n" - "Connection: close\r\n" - "\r\n" - "No suitable route found"sv; - } - - asio::async_write(*m_Socket.get(), - asio::buffer(Response), - [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { - Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); - }); + SendInlineResponse(RequestNumber, + "404 NOT FOUND"sv, + "Content-Length: 23\r\nContent-Type: text/plain\r\n"sv, + "No suitable route found"sv); } } @@ -1448,8 +1460,11 @@ struct HttpAcceptor m_Acceptor.set_option(exclusive_address(true)); m_AlternateProtocolAcceptor.set_option(exclusive_address(true)); #else // ZEN_PLATFORM_WINDOWS - m_Acceptor.set_option(asio::socket_base::reuse_address(false)); - m_AlternateProtocolAcceptor.set_option(asio::socket_base::reuse_address(false)); + // Allow binding to a port in TIME_WAIT so the server can restart immediately + // after a previous instance exits. On Linux this does not allow two processes + // to actively listen on the same port simultaneously. + m_Acceptor.set_option(asio::socket_base::reuse_address(true)); + m_AlternateProtocolAcceptor.set_option(asio::socket_base::reuse_address(true)); #endif // ZEN_PLATFORM_WINDOWS m_Acceptor.set_option(asio::ip::tcp::no_delay(true)); @@ -2092,6 +2107,7 @@ HttpAsioServer::HttpAsioServer(const AsioConfig& Config) : m_InitialConfig(Config) , m_Impl(std::make_unique<asio_http::HttpAsioServerImpl>()) { + m_Impl->m_HttpServer = this; ZEN_DEBUG("Request object size: {} ({:#x})", sizeof(HttpRequestParser), sizeof(HttpRequestParser)); } diff --git a/src/zenhttp/servers/httpplugin.cpp b/src/zenhttp/servers/httpplugin.cpp index 021b941bd..4bf8c61bb 100644 --- a/src/zenhttp/servers/httpplugin.cpp +++ b/src/zenhttp/servers/httpplugin.cpp @@ -378,6 +378,8 @@ HttpPluginConnectionHandler::HandleRequest() { ZEN_TRACE_CPU("http_plugin::HandleRequest"); + m_Server->MarkRequest(); + HttpPluginServerRequest Request(m_RequestParser, *Service, m_RequestParser.Body()); const HttpVerb RequestVerb = Request.RequestVerb(); diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index cf639c114..dfe6bb6aa 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -451,6 +451,8 @@ public: inline uint16_t GetResponseCode() const { return m_ResponseCode; } inline int64_t GetResponseBodySize() const { return m_TotalDataSize; } + void SetLocationHeader(std::string_view Location) { m_LocationHeader = Location; } + private: eastl::fixed_vector<HTTP_DATA_CHUNK, 16> m_HttpDataChunks; uint64_t m_TotalDataSize = 0; // Sum of all chunk sizes @@ -460,6 +462,7 @@ private: bool m_IsInitialResponse = true; HttpContentType m_ContentType = HttpContentType::kBinary; eastl::fixed_vector<IoBuffer, 16> m_DataBuffers; + std::string m_LocationHeader; void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> Blobs); }; @@ -715,6 +718,15 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) ContentTypeHeader->pRawValue = ContentTypeString.data(); ContentTypeHeader->RawValueLength = (USHORT)ContentTypeString.size(); + // Location header (for redirects) + + if (!m_LocationHeader.empty()) + { + PHTTP_KNOWN_HEADER LocationHeader = &HttpResponse.Headers.KnownHeaders[HttpHeaderLocation]; + LocationHeader->pRawValue = m_LocationHeader.data(); + LocationHeader->RawValueLength = (USHORT)m_LocationHeader.size(); + } + std::string_view ReasonString = ReasonStringForHttpResultCode(m_ResponseCode); HttpResponse.StatusCode = m_ResponseCode; @@ -916,7 +928,10 @@ HttpAsyncWorkRequest::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTr ZEN_UNUSED(IoResult, NumberOfBytesTransferred); - ZEN_WARN("Unexpected I/O completion during async work! IoResult: {}, NumberOfBytesTransferred: {}", IoResult, NumberOfBytesTransferred); + ZEN_WARN("Unexpected I/O completion during async work! IoResult: {} ({:#x}), NumberOfBytesTransferred: {}", + GetSystemErrorAsString(IoResult), + IoResult, + NumberOfBytesTransferred); return this; } @@ -1083,7 +1098,10 @@ HttpSysServer::InitializeServer(int BasePort) if (Result != NO_ERROR) { - ZEN_ERROR("Failed to create server session for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result); + ZEN_ERROR("Failed to create server session for '{}': {} ({:#x})", + WideToUtf8(WildcardUrlPath), + GetSystemErrorAsString(Result), + Result); return 0; } @@ -1092,7 +1110,7 @@ HttpSysServer::InitializeServer(int BasePort) if (Result != NO_ERROR) { - ZEN_ERROR("Failed to create URL group for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result); + ZEN_ERROR("Failed to create URL group for '{}': {} ({:#x})", WideToUtf8(WildcardUrlPath), GetSystemErrorAsString(Result), Result); return 0; } @@ -1116,7 +1134,9 @@ HttpSysServer::InitializeServer(int BasePort) if ((Result == ERROR_SHARING_VIOLATION)) { - ZEN_INFO("Desired port {} is in use (HttpAddUrlToUrlGroup returned: {}), retrying", EffectivePort, Result); + ZEN_INFO("Desired port {} is in use (HttpAddUrlToUrlGroup returned: {}), retrying", + EffectivePort, + GetSystemErrorAsString(Result)); Sleep(500); Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, WildcardUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0); @@ -1138,7 +1158,9 @@ HttpSysServer::InitializeServer(int BasePort) { for (uint32_t Retries = 0; (Result == ERROR_SHARING_VIOLATION) && (Retries < 3); Retries++) { - ZEN_INFO("Desired port {} is in use (HttpAddUrlToUrlGroup returned: {}), retrying", EffectivePort, Result); + ZEN_INFO("Desired port {} is in use (HttpAddUrlToUrlGroup returned: {}), retrying", + EffectivePort, + GetSystemErrorAsString(Result)); Sleep(500); Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, WildcardUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0); } @@ -1173,17 +1195,18 @@ HttpSysServer::InitializeServer(int BasePort) const std::u8string_view Hosts[] = {u8"[::1]"sv, u8"localhost"sv, u8"127.0.0.1"sv}; - ULONG InternalResult = ERROR_SHARING_VIOLATION; - for (int PortOffset = 0; (InternalResult == ERROR_SHARING_VIOLATION) && (PortOffset < 10); ++PortOffset) + bool ShouldRetryNextPort = true; + for (int PortOffset = 0; ShouldRetryNextPort && (PortOffset < 10); ++PortOffset) { - EffectivePort = BasePort + (PortOffset * 100); + EffectivePort = BasePort + (PortOffset * 100); + ShouldRetryNextPort = false; for (const std::u8string_view Host : Hosts) { WideStringBuilder<64> LocalUrlPath; LocalUrlPath << u8"http://"sv << Host << u8":"sv << int64_t(EffectivePort) << u8"/"sv; - InternalResult = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, LocalUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0); + ULONG InternalResult = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, LocalUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0); if (InternalResult == NO_ERROR) { @@ -1191,11 +1214,25 @@ HttpSysServer::InitializeServer(int BasePort) m_BaseUris.push_back(LocalUrlPath.c_str()); } + else if (InternalResult == ERROR_SHARING_VIOLATION || InternalResult == ERROR_ACCESS_DENIED) + { + // Port may be owned by another process's wildcard registration (access denied) + // or actively in use (sharing violation) — retry on a different port + ShouldRetryNextPort = true; + } else { - break; + ZEN_WARN("Failed to register local handler '{}': {} ({:#x})", + WideToUtf8(LocalUrlPath), + GetSystemErrorAsString(InternalResult), + InternalResult); } } + + if (!m_BaseUris.empty()) + { + break; + } } } else @@ -1211,7 +1248,10 @@ HttpSysServer::InitializeServer(int BasePort) if (m_BaseUris.empty()) { - ZEN_ERROR("Failed to add base URL to URL group for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result); + ZEN_ERROR("Failed to add base URL to URL group for '{}': {} ({:#x})", + WideToUtf8(WildcardUrlPath), + GetSystemErrorAsString(Result), + Result); return 0; } @@ -1229,7 +1269,10 @@ HttpSysServer::InitializeServer(int BasePort) if (Result != NO_ERROR) { - ZEN_ERROR("Failed to create request queue for '{}': {:#x}", WideToUtf8(m_BaseUris.front()), Result); + ZEN_ERROR("Failed to create request queue for '{}': {} ({:#x})", + WideToUtf8(m_BaseUris.front()), + GetSystemErrorAsString(Result), + Result); return 0; } @@ -1241,7 +1284,10 @@ HttpSysServer::InitializeServer(int BasePort) if (Result != NO_ERROR) { - ZEN_ERROR("Failed to set server binding property for '{}': {:#x}", WideToUtf8(m_BaseUris.front()), Result); + ZEN_ERROR("Failed to set server binding property for '{}': {} ({:#x})", + WideToUtf8(m_BaseUris.front()), + GetSystemErrorAsString(Result), + Result); return 0; } @@ -1273,7 +1319,7 @@ HttpSysServer::InitializeServer(int BasePort) if (Result != NO_ERROR) { - ZEN_WARN("changing request queue length to {} failed: {}", QueueLength, Result); + ZEN_WARN("changing request queue length to {} failed: {} ({:#x})", QueueLength, GetSystemErrorAsString(Result), Result); } } @@ -1295,21 +1341,6 @@ HttpSysServer::InitializeServer(int BasePort) ZEN_INFO("Started http.sys server at '{}'", WideToUtf8(m_BaseUris.front())); } - // This is not available in all Windows SDK versions so for now we can't use recently - // released functionality. We should investigate how to get more recent SDK releases - // into the build - -# if 0 - if (HttpIsFeatureSupported(/* HttpFeatureHttp3 */ (HTTP_FEATURE_ID) 4)) - { - ZEN_DEBUG("HTTP3 is available"); - } - else - { - ZEN_DEBUG("HTTP3 is NOT available"); - } -# endif - return EffectivePort; } @@ -1695,6 +1726,8 @@ HttpSysTransaction::InvokeRequestHandler(HttpService& Service, IoBuffer Payload) { HttpSysServerRequest& ThisRequest = m_HandlerRequest.emplace(*this, Service, Payload); + m_HttpServer.MarkRequest(); + // Default request handling # if ZEN_WITH_OTEL @@ -2245,8 +2278,12 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT if (SendResult == NO_ERROR) { - Ref<WsHttpSysConnection> WsConn( - new WsHttpSysConnection(RequestQueueHandle, RequestId, *WsHandler, Transaction().Iocp())); + Transaction().Server().OnWebSocketConnectionOpened(); + Ref<WsHttpSysConnection> WsConn(new WsHttpSysConnection(RequestQueueHandle, + RequestId, + *WsHandler, + Transaction().Iocp(), + &Transaction().Server())); Ref<WebSocketConnection> WsConnRef(WsConn.Get()); WsHandler->OnWebSocketOpen(std::move(WsConnRef)); @@ -2255,7 +2292,7 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT return nullptr; } - ZEN_WARN("WebSocket 101 send failed: {}", SendResult); + ZEN_WARN("WebSocket 101 send failed: {} ({:#x})", GetSystemErrorAsString(SendResult), SendResult); // WebSocket upgrade failed — return nullptr since ServerRequest() // was never populated (no InvokeRequestHandler call) @@ -2330,6 +2367,18 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv); } } + else + { + // If a default redirect is configured and the request is for the root path, send a 302 + std::string_view DefaultRedirect = Transaction().Server().GetDefaultRedirect(); + std::string_view RawUrl(HttpReq->pRawUrl, HttpReq->RawUrlLength); + if (!DefaultRedirect.empty() && (RawUrl == "/" || RawUrl.empty())) + { + auto* Response = new HttpMessageResponseRequest(Transaction(), 302); + Response->SetLocationHeader(DefaultRedirect); + return Response; + } + } // Unable to route return new HttpMessageResponseRequest(Transaction(), 404, "No suitable route found"sv); diff --git a/src/zenhttp/servers/wsasio.cpp b/src/zenhttp/servers/wsasio.cpp index 3e31b58bc..b2543277a 100644 --- a/src/zenhttp/servers/wsasio.cpp +++ b/src/zenhttp/servers/wsasio.cpp @@ -4,6 +4,7 @@ #include "wsframecodec.h" #include <zencore/logging.h> +#include <zenhttp/httpserver.h> namespace zen::asio_http { @@ -16,15 +17,20 @@ WsLog() ////////////////////////////////////////////////////////////////////////// -WsAsioConnection::WsAsioConnection(std::unique_ptr<asio::ip::tcp::socket> Socket, IWebSocketHandler& Handler) +WsAsioConnection::WsAsioConnection(std::unique_ptr<asio::ip::tcp::socket> Socket, IWebSocketHandler& Handler, HttpServer* Server) : m_Socket(std::move(Socket)) , m_Handler(Handler) +, m_HttpServer(Server) { } WsAsioConnection::~WsAsioConnection() { m_IsOpen.store(false); + if (m_HttpServer) + { + m_HttpServer->OnWebSocketConnectionClosed(); + } } void @@ -101,6 +107,11 @@ WsAsioConnection::ProcessReceivedData() m_ReadBuffer.consume(Frame.BytesConsumed); + if (m_HttpServer) + { + m_HttpServer->OnWebSocketFrameReceived(Frame.BytesConsumed); + } + switch (Frame.Opcode) { case WebSocketOpcode::kText: @@ -219,6 +230,11 @@ WsAsioConnection::DoClose(uint16_t Code, std::string_view Reason) void WsAsioConnection::EnqueueWrite(std::vector<uint8_t> Frame) { + if (m_HttpServer) + { + m_HttpServer->OnWebSocketFrameSent(Frame.size()); + } + bool ShouldFlush = false; m_WriteLock.WithExclusiveLock([&] { diff --git a/src/zenhttp/servers/wsasio.h b/src/zenhttp/servers/wsasio.h index d8ffdc00a..e8bb3b1d2 100644 --- a/src/zenhttp/servers/wsasio.h +++ b/src/zenhttp/servers/wsasio.h @@ -14,6 +14,10 @@ ZEN_THIRD_PARTY_INCLUDES_END #include <memory> #include <vector> +namespace zen { +class HttpServer; +} // namespace zen + namespace zen::asio_http { /** @@ -27,10 +31,11 @@ namespace zen::asio_http { * connection alive for the duration of the async operation. The service layer * also holds a Ref<WebSocketConnection>. */ + class WsAsioConnection : public WebSocketConnection { public: - WsAsioConnection(std::unique_ptr<asio::ip::tcp::socket> Socket, IWebSocketHandler& Handler); + WsAsioConnection(std::unique_ptr<asio::ip::tcp::socket> Socket, IWebSocketHandler& Handler, HttpServer* Server); ~WsAsioConnection() override; /** @@ -58,6 +63,7 @@ private: std::unique_ptr<asio::ip::tcp::socket> m_Socket; IWebSocketHandler& m_Handler; + zen::HttpServer* m_HttpServer; asio::streambuf m_ReadBuffer; RwLock m_WriteLock; diff --git a/src/zenhttp/servers/wshttpsys.cpp b/src/zenhttp/servers/wshttpsys.cpp index 3408b64b3..af320172d 100644 --- a/src/zenhttp/servers/wshttpsys.cpp +++ b/src/zenhttp/servers/wshttpsys.cpp @@ -7,6 +7,7 @@ # include "wsframecodec.h" # include <zencore/logging.h> +# include <zenhttp/httpserver.h> namespace zen { @@ -19,11 +20,16 @@ WsHttpSysLog() ////////////////////////////////////////////////////////////////////////// -WsHttpSysConnection::WsHttpSysConnection(HANDLE RequestQueueHandle, HTTP_REQUEST_ID RequestId, IWebSocketHandler& Handler, PTP_IO Iocp) +WsHttpSysConnection::WsHttpSysConnection(HANDLE RequestQueueHandle, + HTTP_REQUEST_ID RequestId, + IWebSocketHandler& Handler, + PTP_IO Iocp, + HttpServer* Server) : m_RequestQueueHandle(RequestQueueHandle) , m_RequestId(RequestId) , m_Handler(Handler) , m_Iocp(Iocp) +, m_HttpServer(Server) , m_ReadBuffer(8192) { m_ReadIoContext.ContextType = HttpSysIoContext::Type::kWebSocketRead; @@ -40,6 +46,11 @@ WsHttpSysConnection::~WsHttpSysConnection() { Disconnect(); } + + if (m_HttpServer) + { + m_HttpServer->OnWebSocketConnectionClosed(); + } } void @@ -174,6 +185,11 @@ WsHttpSysConnection::ProcessReceivedData() // Remove consumed bytes m_Accumulated.erase(m_Accumulated.begin(), m_Accumulated.begin() + Frame.BytesConsumed); + if (m_HttpServer) + { + m_HttpServer->OnWebSocketFrameReceived(Frame.BytesConsumed); + } + switch (Frame.Opcode) { case WebSocketOpcode::kText: @@ -250,6 +266,11 @@ WsHttpSysConnection::ProcessReceivedData() void WsHttpSysConnection::EnqueueWrite(std::vector<uint8_t> Frame) { + if (m_HttpServer) + { + m_HttpServer->OnWebSocketFrameSent(Frame.size()); + } + bool ShouldFlush = false; { diff --git a/src/zenhttp/servers/wshttpsys.h b/src/zenhttp/servers/wshttpsys.h index d854289e0..6015e3873 100644 --- a/src/zenhttp/servers/wshttpsys.h +++ b/src/zenhttp/servers/wshttpsys.h @@ -19,6 +19,8 @@ namespace zen { +class HttpServer; + /** * WebSocket connection over an http.sys opaque-mode connection * @@ -37,7 +39,7 @@ namespace zen { class WsHttpSysConnection : public WebSocketConnection { public: - WsHttpSysConnection(HANDLE RequestQueueHandle, HTTP_REQUEST_ID RequestId, IWebSocketHandler& Handler, PTP_IO Iocp); + WsHttpSysConnection(HANDLE RequestQueueHandle, HTTP_REQUEST_ID RequestId, IWebSocketHandler& Handler, PTP_IO Iocp, HttpServer* Server); ~WsHttpSysConnection() override; /** @@ -75,6 +77,7 @@ private: HTTP_REQUEST_ID m_RequestId; IWebSocketHandler& m_Handler; PTP_IO m_Iocp; + HttpServer* m_HttpServer; // Tagged OVERLAPPED contexts for concurrent read and write HttpSysIoContext m_ReadIoContext{}; diff --git a/src/zenhttp/servers/wstest.cpp b/src/zenhttp/servers/wstest.cpp index fd023c490..2134e4ff1 100644 --- a/src/zenhttp/servers/wstest.cpp +++ b/src/zenhttp/servers/wstest.cpp @@ -767,13 +767,12 @@ namespace { void OnWsMessage(const WebSocketMessage& Msg) override { - m_MessageCount.fetch_add(1); - if (Msg.Opcode == WebSocketOpcode::kText) { std::string_view Text(static_cast<const char*>(Msg.Payload.Data()), Msg.Payload.Size()); m_LastMessage = std::string(Text); } + m_MessageCount.fetch_add(1); } void OnWsClose(uint16_t Code, [[maybe_unused]] std::string_view Reason) override |