1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
// Copyright Epic Games, Inc. All Rights Reserved.
#pragma once
#include <zenhttp/httpserver.h>
#include <zenhttp/httpstats.h>
#include <zenhttp/httpstatus.h>
#include <zenhttp/websocket.h>
#include <zentelemetry/stats.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <asio/io_context.hpp>
#include <asio/steady_timer.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
#include <atomic>
namespace zen {
class SessionsService;
class HttpSessionsService final : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider, public IWebSocketHandler
{
public:
HttpSessionsService(HttpStatusService& StatusService,
HttpStatsService& StatsService,
SessionsService& Sessions,
asio::io_context& IoContext);
virtual ~HttpSessionsService();
virtual const char* BaseUri() const override;
virtual void HandleRequest(HttpServerRequest& Request) override;
virtual void HandleStatusRequest(HttpServerRequest& Request) override;
virtual void HandleStatsRequest(HttpServerRequest& Request) override;
virtual CbObject CollectStats() override;
virtual uint64_t GetActivityCounter() override;
void SetSelfSessionId(const Oid& Id) { m_SelfSessionId = Id; }
// IWebSocketHandler
void OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri) override;
void OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg) override;
void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason) override;
private:
struct SessionsStats
{
std::atomic_uint64_t SessionReadCount{};
std::atomic_uint64_t SessionWriteCount{};
std::atomic_uint64_t SessionDeleteCount{};
std::atomic_uint64_t SessionListCount{};
std::atomic_uint64_t RequestCount{};
std::atomic_uint64_t BadRequestCount{};
};
inline LoggerRef Log() { return m_Log; }
LoggerRef m_Log;
void Initialize();
void ListSessionsRequest(HttpRouterRequest& Req);
void SessionRequest(HttpRouterRequest& Req);
void SessionLogRequest(HttpRouterRequest& Req);
HttpStatusService& m_StatusService;
HttpStatsService& m_StatsService;
HttpRequestRouter m_Router;
SessionsService& m_Sessions;
SessionsStats m_SessionsStats;
metrics::OperationTiming m_HttpRequests;
// WebSocket push.
//
// Each connection can subscribe to a single session's log stream. The
// subscription is optional (SubscribedSessionId == Oid::Zero means
// "session-list broadcasts only"). LastSentCursor is the cursor value
// most recently delivered for the subscribed session; the broadcaster
// uses it to pull the correct delta from the service.
//
// Id is a process-monotonic generation token assigned at OnWebSocket-
// Open. Pointer matching is fine for OnWebSocketMessage /
// OnWebSocketClose where the live `WebSocketConnection&` parameter is
// unambiguous; Id-based matching is reserved for any future code path
// that wants to refer to a subscriber across lock releases without
// risking a slot-reuse mix-up.
struct WsSubscriber
{
Ref<WebSocketConnection> Connection;
uint64_t Id = 0;
Oid SubscribedSessionId = Oid::Zero;
uint64_t LastSentCursor = 0;
// True iff the subscriber is currently subscribed to `Session`.
// Centralizes the (SubscribedSessionId != Oid::Zero) sentinel
// check that broadcaster, cursor-bump path, and any future filter
// would otherwise each open-code.
bool IsSubscribedTo(const Oid& Session) const { return SubscribedSessionId != Oid::Zero && SubscribedSessionId == Session; }
// Drop the active subscription. After this returns, IsSubscribedTo
// is false for every session id.
void Unsubscribe()
{
SubscribedSessionId = Oid::Zero;
LastSentCursor = 0;
}
};
RwLock m_WsConnectionsLock;
std::vector<WsSubscriber> m_WsConnections;
std::atomic_uint64_t m_NextSubscriberId{1}; // 0 reserved as "not yet assigned"
asio::steady_timer m_PushTimer;
void BroadcastSessions();
void EnqueuePushTimer();
// Event-driven log push. Called from SessionsService's log-appended
// callback; iterates subscribers of the given session and ships any
// entries they haven't seen yet.
void BroadcastLogAppended(const Oid& SessionId, uint64_t NewCursor);
// Periodic cleanup of old / excess ended sessions
asio::steady_timer m_CleanupTimer;
void EnqueueCleanupTimer();
void RunCleanup();
// Periodic client-process liveness check for locally-connected sessions.
asio::steady_timer m_LivenessTimer;
void EnqueueLivenessTimer();
void RunLivenessCheck();
Oid m_SelfSessionId = Oid::Zero;
CbObject BuildSessionListResponse();
};
} // namespace zen
|