diff options
Diffstat (limited to 'src/zenserver/sessions/httpsessions.h')
| -rw-r--r-- | src/zenserver/sessions/httpsessions.h | 60 |
1 files changed, 56 insertions, 4 deletions
diff --git a/src/zenserver/sessions/httpsessions.h b/src/zenserver/sessions/httpsessions.h index 6ebe61c8d..2c0185176 100644 --- a/src/zenserver/sessions/httpsessions.h +++ b/src/zenserver/sessions/httpsessions.h @@ -13,6 +13,8 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <asio/steady_timer.hpp> ZEN_THIRD_PARTY_INCLUDES_END +#include <atomic> + namespace zen { class SessionsService; @@ -69,14 +71,64 @@ private: SessionsStats m_SessionsStats; metrics::OperationTiming m_HttpRequests; - // WebSocket push - RwLock m_WsConnectionsLock; - std::vector<Ref<WebSocketConnection>> m_WsConnections; - asio::steady_timer m_PushTimer; + // WebSocket push. + // + // Each connection can subscribe to a single session's log stream. The + // subscription is optional (SubscribedSessionId == Oid::Zero means + // "session-list broadcasts only"). LastSentCursor is the cursor value + // most recently delivered for the subscribed session; the broadcaster + // uses it to pull the correct delta from the service. + // + // Id is a process-monotonic generation token assigned at OnWebSocket- + // Open. Pointer matching is fine for OnWebSocketMessage / + // OnWebSocketClose where the live `WebSocketConnection&` parameter is + // unambiguous; Id-based matching is reserved for any future code path + // that wants to refer to a subscriber across lock releases without + // risking a slot-reuse mix-up. + struct WsSubscriber + { + Ref<WebSocketConnection> Connection; + uint64_t Id = 0; + Oid SubscribedSessionId = Oid::Zero; + uint64_t LastSentCursor = 0; + + // True iff the subscriber is currently subscribed to `Session`. + // Centralizes the (SubscribedSessionId != Oid::Zero) sentinel + // check that broadcaster, cursor-bump path, and any future filter + // would otherwise each open-code. + bool IsSubscribedTo(const Oid& Session) const { return SubscribedSessionId != Oid::Zero && SubscribedSessionId == Session; } + + // Drop the active subscription. After this returns, IsSubscribedTo + // is false for every session id. + void Unsubscribe() + { + SubscribedSessionId = Oid::Zero; + LastSentCursor = 0; + } + }; + RwLock m_WsConnectionsLock; + std::vector<WsSubscriber> m_WsConnections; + std::atomic_uint64_t m_NextSubscriberId{1}; // 0 reserved as "not yet assigned" + asio::steady_timer m_PushTimer; void BroadcastSessions(); void EnqueuePushTimer(); + // Event-driven log push. Called from SessionsService's log-appended + // callback; iterates subscribers of the given session and ships any + // entries they haven't seen yet. + void BroadcastLogAppended(const Oid& SessionId, uint64_t NewCursor); + + // Periodic cleanup of old / excess ended sessions + asio::steady_timer m_CleanupTimer; + void EnqueueCleanupTimer(); + void RunCleanup(); + + // Periodic client-process liveness check for locally-connected sessions. + asio::steady_timer m_LivenessTimer; + void EnqueueLivenessTimer(); + void RunLivenessCheck(); + Oid m_SelfSessionId = Oid::Zero; CbObject BuildSessionListResponse(); |