aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/sessions/httpsessions.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/sessions/httpsessions.h')
-rw-r--r--src/zenserver/sessions/httpsessions.h60
1 files changed, 56 insertions, 4 deletions
diff --git a/src/zenserver/sessions/httpsessions.h b/src/zenserver/sessions/httpsessions.h
index 6ebe61c8d..2c0185176 100644
--- a/src/zenserver/sessions/httpsessions.h
+++ b/src/zenserver/sessions/httpsessions.h
@@ -13,6 +13,8 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <asio/steady_timer.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <atomic>
+
namespace zen {
class SessionsService;
@@ -69,14 +71,64 @@ private:
SessionsStats m_SessionsStats;
metrics::OperationTiming m_HttpRequests;
- // WebSocket push
- RwLock m_WsConnectionsLock;
- std::vector<Ref<WebSocketConnection>> m_WsConnections;
- asio::steady_timer m_PushTimer;
+ // WebSocket push.
+ //
+ // Each connection can subscribe to a single session's log stream. The
+ // subscription is optional (SubscribedSessionId == Oid::Zero means
+ // "session-list broadcasts only"). LastSentCursor is the cursor value
+ // most recently delivered for the subscribed session; the broadcaster
+ // uses it to pull the correct delta from the service.
+ //
+ // Id is a process-monotonic generation token assigned at OnWebSocket-
+ // Open. Pointer matching is fine for OnWebSocketMessage /
+ // OnWebSocketClose where the live `WebSocketConnection&` parameter is
+ // unambiguous; Id-based matching is reserved for any future code path
+ // that wants to refer to a subscriber across lock releases without
+ // risking a slot-reuse mix-up.
+ struct WsSubscriber
+ {
+ Ref<WebSocketConnection> Connection;
+ uint64_t Id = 0;
+ Oid SubscribedSessionId = Oid::Zero;
+ uint64_t LastSentCursor = 0;
+
+ // True iff the subscriber is currently subscribed to `Session`.
+ // Centralizes the (SubscribedSessionId != Oid::Zero) sentinel
+ // check that broadcaster, cursor-bump path, and any future filter
+ // would otherwise each open-code.
+ bool IsSubscribedTo(const Oid& Session) const { return SubscribedSessionId != Oid::Zero && SubscribedSessionId == Session; }
+
+ // Drop the active subscription. After this returns, IsSubscribedTo
+ // is false for every session id.
+ void Unsubscribe()
+ {
+ SubscribedSessionId = Oid::Zero;
+ LastSentCursor = 0;
+ }
+ };
+ RwLock m_WsConnectionsLock;
+ std::vector<WsSubscriber> m_WsConnections;
+ std::atomic_uint64_t m_NextSubscriberId{1}; // 0 reserved as "not yet assigned"
+ asio::steady_timer m_PushTimer;
void BroadcastSessions();
void EnqueuePushTimer();
+ // Event-driven log push. Called from SessionsService's log-appended
+ // callback; iterates subscribers of the given session and ships any
+ // entries they haven't seen yet.
+ void BroadcastLogAppended(const Oid& SessionId, uint64_t NewCursor);
+
+ // Periodic cleanup of old / excess ended sessions
+ asio::steady_timer m_CleanupTimer;
+ void EnqueueCleanupTimer();
+ void RunCleanup();
+
+ // Periodic client-process liveness check for locally-connected sessions.
+ asio::steady_timer m_LivenessTimer;
+ void EnqueueLivenessTimer();
+ void RunLivenessCheck();
+
Oid m_SelfSessionId = Oid::Zero;
CbObject BuildSessionListResponse();