// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #include namespace zen { class SessionsService; class HttpSessionsService final : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider, public IWebSocketHandler { public: HttpSessionsService(HttpStatusService& StatusService, HttpStatsService& StatsService, SessionsService& Sessions, asio::io_context& IoContext); virtual ~HttpSessionsService(); virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& Request) override; virtual void HandleStatusRequest(HttpServerRequest& Request) override; virtual void HandleStatsRequest(HttpServerRequest& Request) override; virtual CbObject CollectStats() override; virtual uint64_t GetActivityCounter() override; void SetSelfSessionId(const Oid& Id) { m_SelfSessionId = Id; } // IWebSocketHandler void OnWebSocketOpen(Ref Connection, std::string_view RelativeUri) override; void OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg) override; void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason) override; private: struct SessionsStats { std::atomic_uint64_t SessionReadCount{}; std::atomic_uint64_t SessionWriteCount{}; std::atomic_uint64_t SessionDeleteCount{}; std::atomic_uint64_t SessionListCount{}; std::atomic_uint64_t RequestCount{}; std::atomic_uint64_t BadRequestCount{}; }; inline LoggerRef Log() { return m_Log; } LoggerRef m_Log; void Initialize(); void ListSessionsRequest(HttpRouterRequest& Req); void SessionRequest(HttpRouterRequest& Req); void SessionLogRequest(HttpRouterRequest& Req); HttpStatusService& m_StatusService; HttpStatsService& m_StatsService; HttpRequestRouter m_Router; SessionsService& m_Sessions; SessionsStats m_SessionsStats; metrics::OperationTiming m_HttpRequests; // WebSocket push. // // Each connection can subscribe to a single session's log stream. The // subscription is optional (SubscribedSessionId == Oid::Zero means // "session-list broadcasts only"). LastSentCursor is the cursor value // most recently delivered for the subscribed session; the broadcaster // uses it to pull the correct delta from the service. // // Id is a process-monotonic generation token assigned at OnWebSocket- // Open. Pointer matching is fine for OnWebSocketMessage / // OnWebSocketClose where the live `WebSocketConnection&` parameter is // unambiguous; Id-based matching is reserved for any future code path // that wants to refer to a subscriber across lock releases without // risking a slot-reuse mix-up. struct WsSubscriber { Ref Connection; uint64_t Id = 0; Oid SubscribedSessionId = Oid::Zero; uint64_t LastSentCursor = 0; // True iff the subscriber is currently subscribed to `Session`. // Centralizes the (SubscribedSessionId != Oid::Zero) sentinel // check that broadcaster, cursor-bump path, and any future filter // would otherwise each open-code. bool IsSubscribedTo(const Oid& Session) const { return SubscribedSessionId != Oid::Zero && SubscribedSessionId == Session; } // Drop the active subscription. After this returns, IsSubscribedTo // is false for every session id. void Unsubscribe() { SubscribedSessionId = Oid::Zero; LastSentCursor = 0; } }; RwLock m_WsConnectionsLock; std::vector m_WsConnections; std::atomic_uint64_t m_NextSubscriberId{1}; // 0 reserved as "not yet assigned" asio::steady_timer m_PushTimer; void BroadcastSessions(); void EnqueuePushTimer(); // Event-driven log push. Called from SessionsService's log-appended // callback; iterates subscribers of the given session and ships any // entries they haven't seen yet. void BroadcastLogAppended(const Oid& SessionId, uint64_t NewCursor); // Periodic cleanup of old / excess ended sessions asio::steady_timer m_CleanupTimer; void EnqueueCleanupTimer(); void RunCleanup(); // Periodic client-process liveness check for locally-connected sessions. asio::steady_timer m_LivenessTimer; void EnqueueLivenessTimer(); void RunLivenessCheck(); Oid m_SelfSessionId = Oid::Zero; CbObject BuildSessionListResponse(); }; } // namespace zen