diff options
| author | Stefan Boberg <[email protected]> | 2026-03-09 17:43:08 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-09 17:43:08 +0100 |
| commit | b37b34ea6ad906f54e8104526e77ba66aed997da (patch) | |
| tree | e80ce17d666aff6d2f0d73d4977128ffb4055476 /src/zenhttp | |
| parent | add fallback for zencache multirange (#816) (diff) | |
| download | zen-b37b34ea6ad906f54e8104526e77ba66aed997da.tar.xz zen-b37b34ea6ad906f54e8104526e77ba66aed997da.zip | |
Dashboard overhaul, compute integration (#814)
- **Frontend dashboard overhaul**: Unified compute/main dashboards into a single shared UI. Added new pages for cache, projects, metrics, sessions, info (build/runtime config, system stats). Added live-update via WebSockets with pause control, sortable detail tables, themed styling. Refactored compute/hub/orchestrator pages into modular JS.
- **HTTP server fixes and stats**: Fixed http.sys local-only fallback when default port is in use, implemented root endpoint redirect for http.sys, fixed Linux/Mac port reuse. Added /stats endpoint exposing HTTP server metrics (bytes transferred, request rates). Added WebSocket stats tracking.
- **OTEL/diagnostics hardening**: Improved OTLP HTTP exporter with better error handling and resilience. Extended diagnostics services configuration.
- **Session management**: Added new sessions service with HTTP endpoints for registering, updating, querying, and removing sessions. Includes session log file support. This is still WIP.
- **CLI subcommand support**: Added support for commands with subcommands in the zen CLI tool, with improved command dispatch.
- **Misc**: Exposed CPU usage/hostname to frontend, fixed JS compact binary float32/float64 decoding, limited projects displayed on front page to 25 sorted by last access, added vscode:// link support.
Also contains some fixes from TSAN analysis.
Diffstat (limited to 'src/zenhttp')
| -rw-r--r-- | src/zenhttp/httpserver.cpp | 34 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpclient.h | 9 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpserver.h | 73 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpstats.h | 47 | ||||
| -rw-r--r-- | src/zenhttp/monitoring/httpstats.cpp | 195 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpasio.cpp | 104 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpplugin.cpp | 2 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpsys.cpp | 113 | ||||
| -rw-r--r-- | src/zenhttp/servers/wsasio.cpp | 18 | ||||
| -rw-r--r-- | src/zenhttp/servers/wsasio.h | 8 | ||||
| -rw-r--r-- | src/zenhttp/servers/wshttpsys.cpp | 23 | ||||
| -rw-r--r-- | src/zenhttp/servers/wshttpsys.h | 5 | ||||
| -rw-r--r-- | src/zenhttp/servers/wstest.cpp | 3 |
13 files changed, 537 insertions, 97 deletions
diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp index d798c46d9..9bae95690 100644 --- a/src/zenhttp/httpserver.cpp +++ b/src/zenhttp/httpserver.cpp @@ -28,6 +28,7 @@ #include <zencore/thread.h> #include <zenhttp/packageformat.h> #include <zentelemetry/otlptrace.h> +#include <zentelemetry/stats.h> #include <charconv> #include <mutex> @@ -1094,6 +1095,39 @@ HttpServer::SetHttpRequestFilter(IHttpRequestFilter* RequestFilter) OnSetHttpRequestFilter(RequestFilter); } +CbObject +HttpServer::CollectStats() +{ + CbObjectWriter Cbo; + + metrics::EmitSnapshot("requests", m_RequestMeter, Cbo); + + Cbo.BeginObject("bytes"); + { + Cbo << "received" << GetTotalBytesReceived(); + Cbo << "sent" << GetTotalBytesSent(); + } + Cbo.EndObject(); + + Cbo.BeginObject("websockets"); + { + Cbo << "active_connections" << GetActiveWebSocketConnectionCount(); + Cbo << "frames_received" << m_WsFramesReceived.load(std::memory_order_relaxed); + Cbo << "frames_sent" << m_WsFramesSent.load(std::memory_order_relaxed); + Cbo << "bytes_received" << m_WsBytesReceived.load(std::memory_order_relaxed); + Cbo << "bytes_sent" << m_WsBytesSent.load(std::memory_order_relaxed); + } + Cbo.EndObject(); + + return Cbo.Save(); +} + +void +HttpServer::HandleStatsRequest(HttpServerRequest& Request) +{ + Request.WriteResponse(HttpResponseCode::OK, CollectStats()); +} + ////////////////////////////////////////////////////////////////////////// HttpRpcHandler::HttpRpcHandler() diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h index bec4984db..1bb36a298 100644 --- a/src/zenhttp/include/zenhttp/httpclient.h +++ b/src/zenhttp/include/zenhttp/httpclient.h @@ -118,6 +118,15 @@ private: class HttpClientBase; +/** HTTP Client + * + * This is safe for use on multiple threads simultaneously, as each + * instance maintains an internal connection pool and will synchronize + * access to it as needed. + * + * Uses libcurl under the hood. We currently only use HTTP 1.1 features. + * + */ class HttpClient { public: diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h index c1152dc3e..0e1714669 100644 --- a/src/zenhttp/include/zenhttp/httpserver.h +++ b/src/zenhttp/include/zenhttp/httpserver.h @@ -13,6 +13,8 @@ #include <zencore/uid.h> #include <zenhttp/httpcommon.h> +#include <zentelemetry/stats.h> + #include <functional> #include <gsl/gsl-lite.hpp> #include <list> @@ -203,12 +205,34 @@ private: int m_UriPrefixLength = 0; }; +struct IHttpStatsProvider +{ + /** Handle an HTTP stats request, writing the response directly. + * Implementations may inspect query parameters on the request + * to include optional detailed breakdowns. + */ + virtual void HandleStatsRequest(HttpServerRequest& Request) = 0; + + /** Return the provider's current stats as a CbObject snapshot. + * Used by the WebSocket push thread to broadcast live updates + * without requiring an HttpServerRequest. Providers that do + * not override this will be skipped in WebSocket broadcasts. + */ + virtual CbObject CollectStats() { return {}; } +}; + +struct IHttpStatsService +{ + virtual void RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider) = 0; + virtual void UnregisterHandler(std::string_view Id, IHttpStatsProvider& Provider) = 0; +}; + /** HTTP server * * Implements the main event loop to service HTTP requests, and handles routing * requests to the appropriate handler as registered via RegisterService */ -class HttpServer : public RefCounted +class HttpServer : public RefCounted, public IHttpStatsProvider { public: void RegisterService(HttpService& Service); @@ -235,10 +259,46 @@ public: virtual uint64_t GetTotalBytesReceived() const { return 0; } virtual uint64_t GetTotalBytesSent() const { return 0; } + /** Mark that a request has been handled. Called by server implementations. */ + void MarkRequest() { m_RequestMeter.Mark(); } + + /** Set a default redirect path for root requests */ + void SetDefaultRedirect(std::string_view Path) { m_DefaultRedirect = Path; } + + std::string_view GetDefaultRedirect() const { return m_DefaultRedirect; } + + /** Track active WebSocket connections — called by server implementations on upgrade/close. */ + void OnWebSocketConnectionOpened() { m_ActiveWebSocketConnections.fetch_add(1, std::memory_order_relaxed); } + void OnWebSocketConnectionClosed() { m_ActiveWebSocketConnections.fetch_sub(1, std::memory_order_relaxed); } + uint64_t GetActiveWebSocketConnectionCount() const { return m_ActiveWebSocketConnections.load(std::memory_order_relaxed); } + + /** Track WebSocket frame and byte counters — called by WS connection implementations per frame. */ + void OnWebSocketFrameReceived(uint64_t Bytes) + { + m_WsFramesReceived.fetch_add(1, std::memory_order_relaxed); + m_WsBytesReceived.fetch_add(Bytes, std::memory_order_relaxed); + } + void OnWebSocketFrameSent(uint64_t Bytes) + { + m_WsFramesSent.fetch_add(1, std::memory_order_relaxed); + m_WsBytesSent.fetch_add(Bytes, std::memory_order_relaxed); + } + + // IHttpStatsProvider + virtual CbObject CollectStats() override; + virtual void HandleStatsRequest(HttpServerRequest& Request) override; + private: std::vector<HttpService*> m_KnownServices; int m_EffectivePort = 0; std::string m_ExternalHost; + metrics::Meter m_RequestMeter; + std::string m_DefaultRedirect; + std::atomic<uint64_t> m_ActiveWebSocketConnections{0}; + std::atomic<uint64_t> m_WsFramesReceived{0}; + std::atomic<uint64_t> m_WsFramesSent{0}; + std::atomic<uint64_t> m_WsBytesReceived{0}; + std::atomic<uint64_t> m_WsBytesSent{0}; virtual void OnRegisterService(HttpService& Service) = 0; virtual int OnInitialize(int BasePort, std::filesystem::path DataDir) = 0; @@ -456,17 +516,6 @@ private: bool HandlePackageOffers(HttpService& Service, HttpServerRequest& Request, Ref<IHttpPackageHandler>& PackageHandlerRef); -struct IHttpStatsProvider -{ - virtual void HandleStatsRequest(HttpServerRequest& Request) = 0; -}; - -struct IHttpStatsService -{ - virtual void RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider) = 0; - virtual void UnregisterHandler(std::string_view Id, IHttpStatsProvider& Provider) = 0; -}; - void http_forcelink(); // internal void websocket_forcelink(); // internal diff --git a/src/zenhttp/include/zenhttp/httpstats.h b/src/zenhttp/include/zenhttp/httpstats.h index e6fea6765..460315faf 100644 --- a/src/zenhttp/include/zenhttp/httpstats.h +++ b/src/zenhttp/include/zenhttp/httpstats.h @@ -3,23 +3,50 @@ #pragma once #include <zencore/logging.h> +#include <zencore/thread.h> #include <zenhttp/httpserver.h> +#include <zenhttp/websocket.h> +#include <atomic> #include <map> +#include <memory> +#include <thread> +#include <vector> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <asio/io_context.hpp> +#include <asio/steady_timer.hpp> +ZEN_THIRD_PARTY_INCLUDES_END namespace zen { -class HttpStatsService : public HttpService, public IHttpStatsService +class HttpStatsService : public HttpService, public IHttpStatsService, public IWebSocketHandler { public: - HttpStatsService(); + /// Construct without an io_context — optionally uses a dedicated push thread + /// for WebSocket stats broadcasting. + explicit HttpStatsService(bool EnableWebSockets = false); + + /// Construct with an external io_context — uses an asio timer instead + /// of a dedicated thread for WebSocket stats broadcasting. + /// The caller must ensure the io_context outlives this service and that + /// its run loop is active. + HttpStatsService(asio::io_context& IoContext, bool EnableWebSockets = true); + ~HttpStatsService(); + void Shutdown(); + virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& Request) override; virtual void RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider) override; virtual void UnregisterHandler(std::string_view Id, IHttpStatsProvider& Provider) override; + // IWebSocketHandler + void OnWebSocketOpen(Ref<WebSocketConnection> Connection) override; + void OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg) override; + void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason) override; + private: LoggerRef m_Log; HttpRequestRouter m_Router; @@ -28,6 +55,22 @@ private: RwLock m_Lock; std::map<std::string, IHttpStatsProvider*> m_Providers; + + // WebSocket push + RwLock m_WsConnectionsLock; + std::vector<Ref<WebSocketConnection>> m_WsConnections; + std::atomic<bool> m_PushEnabled{false}; + + void BroadcastStats(); + + // Thread-based push (when no io_context is provided) + std::thread m_PushThread; + Event m_PushEvent; + void PushThreadFunction(); + + // Timer-based push (when an io_context is provided) + std::unique_ptr<asio::steady_timer> m_PushTimer; + void EnqueuePushTimer(); }; } // namespace zen diff --git a/src/zenhttp/monitoring/httpstats.cpp b/src/zenhttp/monitoring/httpstats.cpp index b097a0d3f..2370def0c 100644 --- a/src/zenhttp/monitoring/httpstats.cpp +++ b/src/zenhttp/monitoring/httpstats.cpp @@ -3,15 +3,57 @@ #include "zenhttp/httpstats.h" #include <zencore/compactbinarybuilder.h> +#include <zencore/string.h> +#include <zencore/thread.h> +#include <zencore/trace.h> namespace zen { -HttpStatsService::HttpStatsService() : m_Log(logging::Get("stats")) +HttpStatsService::HttpStatsService(bool EnableWebSockets) : m_Log(logging::Get("stats")) { + if (EnableWebSockets) + { + m_PushEnabled.store(true); + m_PushThread = std::thread([this] { PushThreadFunction(); }); + } +} + +HttpStatsService::HttpStatsService(asio::io_context& IoContext, bool EnableWebSockets) : m_Log(logging::Get("stats")) +{ + if (EnableWebSockets) + { + m_PushEnabled.store(true); + m_PushTimer = std::make_unique<asio::steady_timer>(IoContext); + EnqueuePushTimer(); + } } HttpStatsService::~HttpStatsService() { + Shutdown(); +} + +void +HttpStatsService::Shutdown() +{ + if (!m_PushEnabled.exchange(false)) + { + return; + } + + if (m_PushTimer) + { + m_PushTimer->cancel(); + m_PushTimer.reset(); + } + + if (m_PushThread.joinable()) + { + m_PushEvent.Set(); + m_PushThread.join(); + } + + m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.clear(); }); } const char* @@ -39,6 +81,7 @@ HttpStatsService::UnregisterHandler(std::string_view Id, IHttpStatsProvider& Pro void HttpStatsService::HandleRequest(HttpServerRequest& Request) { + ZEN_TRACE_CPU("HttpStatsService::HandleRequest"); using namespace std::literals; std::string_view Key = Request.RelativeUri(); @@ -89,4 +132,154 @@ HttpStatsService::HandleRequest(HttpServerRequest& Request) } } +////////////////////////////////////////////////////////////////////////// +// +// IWebSocketHandler +// + +void +HttpStatsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection) +{ + ZEN_TRACE_CPU("HttpStatsService::OnWebSocketOpen"); + ZEN_INFO("Stats WebSocket client connected"); + + m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); + + // Send initial state immediately + if (m_PushThread.joinable()) + { + m_PushEvent.Set(); + } +} + +void +HttpStatsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/) +{ + // No client-to-server messages expected +} + +void +HttpStatsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason) +{ + ZEN_TRACE_CPU("HttpStatsService::OnWebSocketClose"); + ZEN_INFO("Stats WebSocket client disconnected (code {})", Code); + + m_WsConnectionsLock.WithExclusiveLock([&] { + auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) { + return C.Get() == &Conn; + }); + m_WsConnections.erase(It, m_WsConnections.end()); + }); +} + +////////////////////////////////////////////////////////////////////////// +// +// Stats broadcast +// + +void +HttpStatsService::BroadcastStats() +{ + ZEN_TRACE_CPU("HttpStatsService::BroadcastStats"); + std::vector<Ref<WebSocketConnection>> Connections; + m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; }); + + if (Connections.empty()) + { + return; + } + + // Collect stats from all providers + ExtendableStringBuilder<4096> JsonBuilder; + JsonBuilder.Append("{"); + + bool First = true; + { + RwLock::SharedLockScope _(m_Lock); + for (auto& [Id, Provider] : m_Providers) + { + CbObject Stats = Provider->CollectStats(); + if (!Stats) + { + continue; + } + + if (!First) + { + JsonBuilder.Append(","); + } + First = false; + + // Emit as "provider_id": { ... } + JsonBuilder.Append("\""); + JsonBuilder.Append(Id); + JsonBuilder.Append("\":"); + + ExtendableStringBuilder<2048> StatsJson; + Stats.ToJson(StatsJson); + JsonBuilder.Append(StatsJson.ToView()); + } + } + + JsonBuilder.Append("}"); + + std::string_view Json = JsonBuilder.ToView(); + for (auto& Conn : Connections) + { + if (Conn->IsOpen()) + { + Conn->SendText(Json); + } + } +} + +////////////////////////////////////////////////////////////////////////// +// +// Thread-based push (fallback when no io_context) +// + +void +HttpStatsService::PushThreadFunction() +{ + SetCurrentThreadName("stats_ws_push"); + + while (m_PushEnabled.load()) + { + m_PushEvent.Wait(5000); + m_PushEvent.Reset(); + + if (!m_PushEnabled.load()) + { + break; + } + + BroadcastStats(); + } +} + +////////////////////////////////////////////////////////////////////////// +// +// Timer-based push (when io_context is provided) +// + +void +HttpStatsService::EnqueuePushTimer() +{ + if (!m_PushTimer) + { + return; + } + + m_PushTimer->expires_after(std::chrono::seconds(5)); + m_PushTimer->async_wait([this](const asio::error_code& Ec) { + if (Ec) + { + return; + } + + BroadcastStats(); + EnqueuePushTimer(); + }); +} + } // namespace zen diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp index 2cf051d14..f5178ebe8 100644 --- a/src/zenhttp/servers/httpasio.cpp +++ b/src/zenhttp/servers/httpasio.cpp @@ -531,6 +531,8 @@ public: std::atomic<uint64_t> m_TotalBytesReceived{0}; std::atomic<uint64_t> m_TotalBytesSent{0}; + + HttpServer* m_HttpServer = nullptr; }; /** @@ -949,6 +951,7 @@ private: void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount); void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, uint32_t RequestNumber, HttpResponse* ResponseToPop); void CloseConnection(); + void SendInlineResponse(uint32_t RequestNumber, std::string_view StatusLine, std::string_view Headers = {}, std::string_view Body = {}); HttpAsioServerImpl& m_Server; asio::streambuf m_RequestBuffer; @@ -1167,6 +1170,38 @@ HttpServerConnection::CloseConnection() } void +HttpServerConnection::SendInlineResponse(uint32_t RequestNumber, + std::string_view StatusLine, + std::string_view Headers, + std::string_view Body) +{ + ExtendableStringBuilder<256> ResponseBuilder; + ResponseBuilder << "HTTP/1.1 " << StatusLine << "\r\n"; + if (!Headers.empty()) + { + ResponseBuilder << Headers; + } + if (!m_RequestData.IsKeepAlive()) + { + ResponseBuilder << "Connection: close\r\n"; + } + ResponseBuilder << "\r\n"; + if (!Body.empty()) + { + ResponseBuilder << Body; + } + auto ResponseView = ResponseBuilder.ToView(); + IoBuffer ResponseData(IoBuffer::Clone, ResponseView.data(), ResponseView.size()); + auto Buffer = asio::buffer(ResponseData.GetData(), ResponseData.GetSize()); + asio::async_write( + *m_Socket.get(), + Buffer, + [Conn = AsSharedPtr(), RequestNumber, Response = std::move(ResponseData)](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); + }); +} + +void HttpServerConnection::HandleRequest() { ZEN_MEMSCOPE(GetHttpasioTag()); @@ -1204,7 +1239,9 @@ HttpServerConnection::HandleRequest() return; } - Ref<WsAsioConnection> WsConn(new WsAsioConnection(std::move(Conn->m_Socket), *WsHandler)); + Conn->m_Server.m_HttpServer->OnWebSocketConnectionOpened(); + Ref<WsAsioConnection> WsConn( + new WsAsioConnection(std::move(Conn->m_Socket), *WsHandler, Conn->m_Server.m_HttpServer)); Ref<WebSocketConnection> WsConnRef(WsConn.Get()); WsHandler->OnWebSocketOpen(std::move(WsConnRef)); @@ -1241,6 +1278,8 @@ HttpServerConnection::HandleRequest() { ZEN_TRACE_CPU("asio::HandleRequest"); + m_Server.m_HttpServer->MarkRequest(); + auto RemoteEndpoint = m_Socket->remote_endpoint(); bool IsLocalConnection = m_Socket->local_endpoint().address() == RemoteEndpoint.address(); @@ -1378,51 +1417,24 @@ HttpServerConnection::HandleRequest() } } - if (m_RequestData.RequestVerb() == HttpVerb::kHead) + // If a default redirect is configured and the request is for the root path, send a 302 + std::string_view DefaultRedirect = m_Server.m_HttpServer->GetDefaultRedirect(); + if (!DefaultRedirect.empty() && (m_RequestData.Url() == "/" || m_RequestData.Url().empty())) { - std::string_view Response = - "HTTP/1.1 404 NOT FOUND\r\n" - "\r\n"sv; - - if (!m_RequestData.IsKeepAlive()) - { - Response = - "HTTP/1.1 404 NOT FOUND\r\n" - "Connection: close\r\n" - "\r\n"sv; - } - - asio::async_write(*m_Socket.get(), - asio::buffer(Response), - [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { - Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); - }); + ExtendableStringBuilder<128> Headers; + Headers << "Location: " << DefaultRedirect << "\r\nContent-Length: 0\r\n"; + SendInlineResponse(RequestNumber, "302 Found"sv, Headers.ToView()); + } + else if (m_RequestData.RequestVerb() == HttpVerb::kHead) + { + SendInlineResponse(RequestNumber, "404 NOT FOUND"sv); } else { - std::string_view Response = - "HTTP/1.1 404 NOT FOUND\r\n" - "Content-Length: 23\r\n" - "Content-Type: text/plain\r\n" - "\r\n" - "No suitable route found"sv; - - if (!m_RequestData.IsKeepAlive()) - { - Response = - "HTTP/1.1 404 NOT FOUND\r\n" - "Content-Length: 23\r\n" - "Content-Type: text/plain\r\n" - "Connection: close\r\n" - "\r\n" - "No suitable route found"sv; - } - - asio::async_write(*m_Socket.get(), - asio::buffer(Response), - [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { - Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); - }); + SendInlineResponse(RequestNumber, + "404 NOT FOUND"sv, + "Content-Length: 23\r\nContent-Type: text/plain\r\n"sv, + "No suitable route found"sv); } } @@ -1448,8 +1460,11 @@ struct HttpAcceptor m_Acceptor.set_option(exclusive_address(true)); m_AlternateProtocolAcceptor.set_option(exclusive_address(true)); #else // ZEN_PLATFORM_WINDOWS - m_Acceptor.set_option(asio::socket_base::reuse_address(false)); - m_AlternateProtocolAcceptor.set_option(asio::socket_base::reuse_address(false)); + // Allow binding to a port in TIME_WAIT so the server can restart immediately + // after a previous instance exits. On Linux this does not allow two processes + // to actively listen on the same port simultaneously. + m_Acceptor.set_option(asio::socket_base::reuse_address(true)); + m_AlternateProtocolAcceptor.set_option(asio::socket_base::reuse_address(true)); #endif // ZEN_PLATFORM_WINDOWS m_Acceptor.set_option(asio::ip::tcp::no_delay(true)); @@ -2092,6 +2107,7 @@ HttpAsioServer::HttpAsioServer(const AsioConfig& Config) : m_InitialConfig(Config) , m_Impl(std::make_unique<asio_http::HttpAsioServerImpl>()) { + m_Impl->m_HttpServer = this; ZEN_DEBUG("Request object size: {} ({:#x})", sizeof(HttpRequestParser), sizeof(HttpRequestParser)); } diff --git a/src/zenhttp/servers/httpplugin.cpp b/src/zenhttp/servers/httpplugin.cpp index 021b941bd..4bf8c61bb 100644 --- a/src/zenhttp/servers/httpplugin.cpp +++ b/src/zenhttp/servers/httpplugin.cpp @@ -378,6 +378,8 @@ HttpPluginConnectionHandler::HandleRequest() { ZEN_TRACE_CPU("http_plugin::HandleRequest"); + m_Server->MarkRequest(); + HttpPluginServerRequest Request(m_RequestParser, *Service, m_RequestParser.Body()); const HttpVerb RequestVerb = Request.RequestVerb(); diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index cf639c114..dfe6bb6aa 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -451,6 +451,8 @@ public: inline uint16_t GetResponseCode() const { return m_ResponseCode; } inline int64_t GetResponseBodySize() const { return m_TotalDataSize; } + void SetLocationHeader(std::string_view Location) { m_LocationHeader = Location; } + private: eastl::fixed_vector<HTTP_DATA_CHUNK, 16> m_HttpDataChunks; uint64_t m_TotalDataSize = 0; // Sum of all chunk sizes @@ -460,6 +462,7 @@ private: bool m_IsInitialResponse = true; HttpContentType m_ContentType = HttpContentType::kBinary; eastl::fixed_vector<IoBuffer, 16> m_DataBuffers; + std::string m_LocationHeader; void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> Blobs); }; @@ -715,6 +718,15 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) ContentTypeHeader->pRawValue = ContentTypeString.data(); ContentTypeHeader->RawValueLength = (USHORT)ContentTypeString.size(); + // Location header (for redirects) + + if (!m_LocationHeader.empty()) + { + PHTTP_KNOWN_HEADER LocationHeader = &HttpResponse.Headers.KnownHeaders[HttpHeaderLocation]; + LocationHeader->pRawValue = m_LocationHeader.data(); + LocationHeader->RawValueLength = (USHORT)m_LocationHeader.size(); + } + std::string_view ReasonString = ReasonStringForHttpResultCode(m_ResponseCode); HttpResponse.StatusCode = m_ResponseCode; @@ -916,7 +928,10 @@ HttpAsyncWorkRequest::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTr ZEN_UNUSED(IoResult, NumberOfBytesTransferred); - ZEN_WARN("Unexpected I/O completion during async work! IoResult: {}, NumberOfBytesTransferred: {}", IoResult, NumberOfBytesTransferred); + ZEN_WARN("Unexpected I/O completion during async work! IoResult: {} ({:#x}), NumberOfBytesTransferred: {}", + GetSystemErrorAsString(IoResult), + IoResult, + NumberOfBytesTransferred); return this; } @@ -1083,7 +1098,10 @@ HttpSysServer::InitializeServer(int BasePort) if (Result != NO_ERROR) { - ZEN_ERROR("Failed to create server session for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result); + ZEN_ERROR("Failed to create server session for '{}': {} ({:#x})", + WideToUtf8(WildcardUrlPath), + GetSystemErrorAsString(Result), + Result); return 0; } @@ -1092,7 +1110,7 @@ HttpSysServer::InitializeServer(int BasePort) if (Result != NO_ERROR) { - ZEN_ERROR("Failed to create URL group for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result); + ZEN_ERROR("Failed to create URL group for '{}': {} ({:#x})", WideToUtf8(WildcardUrlPath), GetSystemErrorAsString(Result), Result); return 0; } @@ -1116,7 +1134,9 @@ HttpSysServer::InitializeServer(int BasePort) if ((Result == ERROR_SHARING_VIOLATION)) { - ZEN_INFO("Desired port {} is in use (HttpAddUrlToUrlGroup returned: {}), retrying", EffectivePort, Result); + ZEN_INFO("Desired port {} is in use (HttpAddUrlToUrlGroup returned: {}), retrying", + EffectivePort, + GetSystemErrorAsString(Result)); Sleep(500); Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, WildcardUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0); @@ -1138,7 +1158,9 @@ HttpSysServer::InitializeServer(int BasePort) { for (uint32_t Retries = 0; (Result == ERROR_SHARING_VIOLATION) && (Retries < 3); Retries++) { - ZEN_INFO("Desired port {} is in use (HttpAddUrlToUrlGroup returned: {}), retrying", EffectivePort, Result); + ZEN_INFO("Desired port {} is in use (HttpAddUrlToUrlGroup returned: {}), retrying", + EffectivePort, + GetSystemErrorAsString(Result)); Sleep(500); Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, WildcardUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0); } @@ -1173,17 +1195,18 @@ HttpSysServer::InitializeServer(int BasePort) const std::u8string_view Hosts[] = {u8"[::1]"sv, u8"localhost"sv, u8"127.0.0.1"sv}; - ULONG InternalResult = ERROR_SHARING_VIOLATION; - for (int PortOffset = 0; (InternalResult == ERROR_SHARING_VIOLATION) && (PortOffset < 10); ++PortOffset) + bool ShouldRetryNextPort = true; + for (int PortOffset = 0; ShouldRetryNextPort && (PortOffset < 10); ++PortOffset) { - EffectivePort = BasePort + (PortOffset * 100); + EffectivePort = BasePort + (PortOffset * 100); + ShouldRetryNextPort = false; for (const std::u8string_view Host : Hosts) { WideStringBuilder<64> LocalUrlPath; LocalUrlPath << u8"http://"sv << Host << u8":"sv << int64_t(EffectivePort) << u8"/"sv; - InternalResult = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, LocalUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0); + ULONG InternalResult = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, LocalUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0); if (InternalResult == NO_ERROR) { @@ -1191,11 +1214,25 @@ HttpSysServer::InitializeServer(int BasePort) m_BaseUris.push_back(LocalUrlPath.c_str()); } + else if (InternalResult == ERROR_SHARING_VIOLATION || InternalResult == ERROR_ACCESS_DENIED) + { + // Port may be owned by another process's wildcard registration (access denied) + // or actively in use (sharing violation) — retry on a different port + ShouldRetryNextPort = true; + } else { - break; + ZEN_WARN("Failed to register local handler '{}': {} ({:#x})", + WideToUtf8(LocalUrlPath), + GetSystemErrorAsString(InternalResult), + InternalResult); } } + + if (!m_BaseUris.empty()) + { + break; + } } } else @@ -1211,7 +1248,10 @@ HttpSysServer::InitializeServer(int BasePort) if (m_BaseUris.empty()) { - ZEN_ERROR("Failed to add base URL to URL group for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result); + ZEN_ERROR("Failed to add base URL to URL group for '{}': {} ({:#x})", + WideToUtf8(WildcardUrlPath), + GetSystemErrorAsString(Result), + Result); return 0; } @@ -1229,7 +1269,10 @@ HttpSysServer::InitializeServer(int BasePort) if (Result != NO_ERROR) { - ZEN_ERROR("Failed to create request queue for '{}': {:#x}", WideToUtf8(m_BaseUris.front()), Result); + ZEN_ERROR("Failed to create request queue for '{}': {} ({:#x})", + WideToUtf8(m_BaseUris.front()), + GetSystemErrorAsString(Result), + Result); return 0; } @@ -1241,7 +1284,10 @@ HttpSysServer::InitializeServer(int BasePort) if (Result != NO_ERROR) { - ZEN_ERROR("Failed to set server binding property for '{}': {:#x}", WideToUtf8(m_BaseUris.front()), Result); + ZEN_ERROR("Failed to set server binding property for '{}': {} ({:#x})", + WideToUtf8(m_BaseUris.front()), + GetSystemErrorAsString(Result), + Result); return 0; } @@ -1273,7 +1319,7 @@ HttpSysServer::InitializeServer(int BasePort) if (Result != NO_ERROR) { - ZEN_WARN("changing request queue length to {} failed: {}", QueueLength, Result); + ZEN_WARN("changing request queue length to {} failed: {} ({:#x})", QueueLength, GetSystemErrorAsString(Result), Result); } } @@ -1295,21 +1341,6 @@ HttpSysServer::InitializeServer(int BasePort) ZEN_INFO("Started http.sys server at '{}'", WideToUtf8(m_BaseUris.front())); } - // This is not available in all Windows SDK versions so for now we can't use recently - // released functionality. We should investigate how to get more recent SDK releases - // into the build - -# if 0 - if (HttpIsFeatureSupported(/* HttpFeatureHttp3 */ (HTTP_FEATURE_ID) 4)) - { - ZEN_DEBUG("HTTP3 is available"); - } - else - { - ZEN_DEBUG("HTTP3 is NOT available"); - } -# endif - return EffectivePort; } @@ -1695,6 +1726,8 @@ HttpSysTransaction::InvokeRequestHandler(HttpService& Service, IoBuffer Payload) { HttpSysServerRequest& ThisRequest = m_HandlerRequest.emplace(*this, Service, Payload); + m_HttpServer.MarkRequest(); + // Default request handling # if ZEN_WITH_OTEL @@ -2245,8 +2278,12 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT if (SendResult == NO_ERROR) { - Ref<WsHttpSysConnection> WsConn( - new WsHttpSysConnection(RequestQueueHandle, RequestId, *WsHandler, Transaction().Iocp())); + Transaction().Server().OnWebSocketConnectionOpened(); + Ref<WsHttpSysConnection> WsConn(new WsHttpSysConnection(RequestQueueHandle, + RequestId, + *WsHandler, + Transaction().Iocp(), + &Transaction().Server())); Ref<WebSocketConnection> WsConnRef(WsConn.Get()); WsHandler->OnWebSocketOpen(std::move(WsConnRef)); @@ -2255,7 +2292,7 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT return nullptr; } - ZEN_WARN("WebSocket 101 send failed: {}", SendResult); + ZEN_WARN("WebSocket 101 send failed: {} ({:#x})", GetSystemErrorAsString(SendResult), SendResult); // WebSocket upgrade failed — return nullptr since ServerRequest() // was never populated (no InvokeRequestHandler call) @@ -2330,6 +2367,18 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv); } } + else + { + // If a default redirect is configured and the request is for the root path, send a 302 + std::string_view DefaultRedirect = Transaction().Server().GetDefaultRedirect(); + std::string_view RawUrl(HttpReq->pRawUrl, HttpReq->RawUrlLength); + if (!DefaultRedirect.empty() && (RawUrl == "/" || RawUrl.empty())) + { + auto* Response = new HttpMessageResponseRequest(Transaction(), 302); + Response->SetLocationHeader(DefaultRedirect); + return Response; + } + } // Unable to route return new HttpMessageResponseRequest(Transaction(), 404, "No suitable route found"sv); diff --git a/src/zenhttp/servers/wsasio.cpp b/src/zenhttp/servers/wsasio.cpp index 3e31b58bc..b2543277a 100644 --- a/src/zenhttp/servers/wsasio.cpp +++ b/src/zenhttp/servers/wsasio.cpp @@ -4,6 +4,7 @@ #include "wsframecodec.h" #include <zencore/logging.h> +#include <zenhttp/httpserver.h> namespace zen::asio_http { @@ -16,15 +17,20 @@ WsLog() ////////////////////////////////////////////////////////////////////////// -WsAsioConnection::WsAsioConnection(std::unique_ptr<asio::ip::tcp::socket> Socket, IWebSocketHandler& Handler) +WsAsioConnection::WsAsioConnection(std::unique_ptr<asio::ip::tcp::socket> Socket, IWebSocketHandler& Handler, HttpServer* Server) : m_Socket(std::move(Socket)) , m_Handler(Handler) +, m_HttpServer(Server) { } WsAsioConnection::~WsAsioConnection() { m_IsOpen.store(false); + if (m_HttpServer) + { + m_HttpServer->OnWebSocketConnectionClosed(); + } } void @@ -101,6 +107,11 @@ WsAsioConnection::ProcessReceivedData() m_ReadBuffer.consume(Frame.BytesConsumed); + if (m_HttpServer) + { + m_HttpServer->OnWebSocketFrameReceived(Frame.BytesConsumed); + } + switch (Frame.Opcode) { case WebSocketOpcode::kText: @@ -219,6 +230,11 @@ WsAsioConnection::DoClose(uint16_t Code, std::string_view Reason) void WsAsioConnection::EnqueueWrite(std::vector<uint8_t> Frame) { + if (m_HttpServer) + { + m_HttpServer->OnWebSocketFrameSent(Frame.size()); + } + bool ShouldFlush = false; m_WriteLock.WithExclusiveLock([&] { diff --git a/src/zenhttp/servers/wsasio.h b/src/zenhttp/servers/wsasio.h index d8ffdc00a..e8bb3b1d2 100644 --- a/src/zenhttp/servers/wsasio.h +++ b/src/zenhttp/servers/wsasio.h @@ -14,6 +14,10 @@ ZEN_THIRD_PARTY_INCLUDES_END #include <memory> #include <vector> +namespace zen { +class HttpServer; +} // namespace zen + namespace zen::asio_http { /** @@ -27,10 +31,11 @@ namespace zen::asio_http { * connection alive for the duration of the async operation. The service layer * also holds a Ref<WebSocketConnection>. */ + class WsAsioConnection : public WebSocketConnection { public: - WsAsioConnection(std::unique_ptr<asio::ip::tcp::socket> Socket, IWebSocketHandler& Handler); + WsAsioConnection(std::unique_ptr<asio::ip::tcp::socket> Socket, IWebSocketHandler& Handler, HttpServer* Server); ~WsAsioConnection() override; /** @@ -58,6 +63,7 @@ private: std::unique_ptr<asio::ip::tcp::socket> m_Socket; IWebSocketHandler& m_Handler; + zen::HttpServer* m_HttpServer; asio::streambuf m_ReadBuffer; RwLock m_WriteLock; diff --git a/src/zenhttp/servers/wshttpsys.cpp b/src/zenhttp/servers/wshttpsys.cpp index 3408b64b3..af320172d 100644 --- a/src/zenhttp/servers/wshttpsys.cpp +++ b/src/zenhttp/servers/wshttpsys.cpp @@ -7,6 +7,7 @@ # include "wsframecodec.h" # include <zencore/logging.h> +# include <zenhttp/httpserver.h> namespace zen { @@ -19,11 +20,16 @@ WsHttpSysLog() ////////////////////////////////////////////////////////////////////////// -WsHttpSysConnection::WsHttpSysConnection(HANDLE RequestQueueHandle, HTTP_REQUEST_ID RequestId, IWebSocketHandler& Handler, PTP_IO Iocp) +WsHttpSysConnection::WsHttpSysConnection(HANDLE RequestQueueHandle, + HTTP_REQUEST_ID RequestId, + IWebSocketHandler& Handler, + PTP_IO Iocp, + HttpServer* Server) : m_RequestQueueHandle(RequestQueueHandle) , m_RequestId(RequestId) , m_Handler(Handler) , m_Iocp(Iocp) +, m_HttpServer(Server) , m_ReadBuffer(8192) { m_ReadIoContext.ContextType = HttpSysIoContext::Type::kWebSocketRead; @@ -40,6 +46,11 @@ WsHttpSysConnection::~WsHttpSysConnection() { Disconnect(); } + + if (m_HttpServer) + { + m_HttpServer->OnWebSocketConnectionClosed(); + } } void @@ -174,6 +185,11 @@ WsHttpSysConnection::ProcessReceivedData() // Remove consumed bytes m_Accumulated.erase(m_Accumulated.begin(), m_Accumulated.begin() + Frame.BytesConsumed); + if (m_HttpServer) + { + m_HttpServer->OnWebSocketFrameReceived(Frame.BytesConsumed); + } + switch (Frame.Opcode) { case WebSocketOpcode::kText: @@ -250,6 +266,11 @@ WsHttpSysConnection::ProcessReceivedData() void WsHttpSysConnection::EnqueueWrite(std::vector<uint8_t> Frame) { + if (m_HttpServer) + { + m_HttpServer->OnWebSocketFrameSent(Frame.size()); + } + bool ShouldFlush = false; { diff --git a/src/zenhttp/servers/wshttpsys.h b/src/zenhttp/servers/wshttpsys.h index d854289e0..6015e3873 100644 --- a/src/zenhttp/servers/wshttpsys.h +++ b/src/zenhttp/servers/wshttpsys.h @@ -19,6 +19,8 @@ namespace zen { +class HttpServer; + /** * WebSocket connection over an http.sys opaque-mode connection * @@ -37,7 +39,7 @@ namespace zen { class WsHttpSysConnection : public WebSocketConnection { public: - WsHttpSysConnection(HANDLE RequestQueueHandle, HTTP_REQUEST_ID RequestId, IWebSocketHandler& Handler, PTP_IO Iocp); + WsHttpSysConnection(HANDLE RequestQueueHandle, HTTP_REQUEST_ID RequestId, IWebSocketHandler& Handler, PTP_IO Iocp, HttpServer* Server); ~WsHttpSysConnection() override; /** @@ -75,6 +77,7 @@ private: HTTP_REQUEST_ID m_RequestId; IWebSocketHandler& m_Handler; PTP_IO m_Iocp; + HttpServer* m_HttpServer; // Tagged OVERLAPPED contexts for concurrent read and write HttpSysIoContext m_ReadIoContext{}; diff --git a/src/zenhttp/servers/wstest.cpp b/src/zenhttp/servers/wstest.cpp index fd023c490..2134e4ff1 100644 --- a/src/zenhttp/servers/wstest.cpp +++ b/src/zenhttp/servers/wstest.cpp @@ -767,13 +767,12 @@ namespace { void OnWsMessage(const WebSocketMessage& Msg) override { - m_MessageCount.fetch_add(1); - if (Msg.Opcode == WebSocketOpcode::kText) { std::string_view Text(static_cast<const char*>(Msg.Payload.Data()), Msg.Payload.Size()); m_LastMessage = std::string(Text); } + m_MessageCount.fetch_add(1); } void OnWsClose(uint16_t Code, [[maybe_unused]] std::string_view Reason) override |