aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/sessions/sessions.h
blob: a722704e0a32fd7dcee69db60c9962b1b4e73d60 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
// Copyright Epic Games, Inc. All Rights Reserved.

#pragma once

#include <zencore/compactbinary.h>
#include <zencore/logbase.h>
#include <zencore/memory/memoryarena.h>
#include <zencore/process.h>
#include <zencore/thread.h>
#include <zencore/uid.h>

ZEN_THIRD_PARTY_INCLUDES_START
#include <EASTL/deque.h>
#include <tsl/robin_map.h>
ZEN_THIRD_PARTY_INCLUDES_END
#include <filesystem>
#include <functional>
#include <optional>
#include <span>
#include <string>
#include <vector>

namespace zen {

class SessionLogStore;
class SessionLog;

/** Session tracker
 *
 * Acts as a log and session info concentrator when dealing with multiple
 * servers and external processes acting as a group.
 */

class SessionsService
{
public:
	struct SessionInfo
	{
		Oid			Id;
		std::string AppName;
		std::string Mode;
		std::string Platform;		// Reported by the client, e.g. "windows", "linux", "macos"
		uint32_t	ClientPid = 0;	// Non-zero = local PID to probe for liveness. 0 = don't track.
		Oid			ParentSessionId;
		// Optional task/action identifier used to associate this session with a
		// specific unit of work. Distinct from ParentSessionId, which records
		// process/session ancestry.
		Oid		 JobId;
		CbObject Metadata;
		DateTime CreatedAt;
		DateTime UpdatedAt;
		DateTime EndedAt{0};
	};

	/// Stored form of a log entry. The string fields are arena-borrowed
	/// `const char*` — they live in the owning Session's MemoryArena and
	/// are valid only for that Session's lifetime. Default copy is
	/// intentionally shallow (string pointers are shared with the source);
	/// callers must not let copies outlive the originating Session.
	///
	/// Build entries via `LogEntryInput` and route them through
	/// `Session::AppendLog` / `AppendLogBatch`, which intern logger names
	/// and arena-allocate the other strings before storing.
	struct LogEntry
	{
		DateTime Timestamp{0};
		// Sentinel: Off means "no level set" (e.g. plain-text POSTed entries
		// where the client didn't include a level). Real log entries use
		// Trace..Critical, so Off is free to reuse as "omit on serialize".
		logging::LogLevel Level = logging::Off;
		// Arena pointers (null-terminated). Empty string is the default
		// — never null, so callers don't need to guard.
		const char* LoggerName = "";  // Interned: one canonical copy per unique name across the session.
		const char* Message	   = "";  // For structured entries: the rendered form (populated at intake).
		const char* Format	   = "";  // UE_LOGFMT template; "" for plain entries.
		CbObject	Fields;			  // Present only when Format is non-empty.
	};

	/// Input form used to build an entry on the way into a Session. The
	/// string_view fields are caller-borrowed; AppendLog interns/copies
	/// them into the Session's arena before any LogEntry is built. Use
	/// this struct rather than constructing LogEntry directly so the
	/// arena ownership invariant stays one-sided.
	struct LogEntryInput
	{
		DateTime		  Timestamp{0};
		logging::LogLevel Level = logging::Off;
		std::string_view  LoggerName;
		std::string_view  Message;
		std::string_view  Format;
		CbObject		  Fields;
	};

	class Session : public TRefCounted<Session>
	{
	public:
		Session(const SessionInfo& Info, Ref<SessionLog> Log = {}, ProcessHandle ClientProcess = {});
		~Session();

		Session(Session&&) = delete;
		Session& operator=(Session&&) = delete;

		const SessionInfo& Info() const { return m_Info; }
		void			   UpdateMetadata(CbObjectView Metadata)
		{
			m_Info.Metadata	 = CbObject::Clone(Metadata);
			m_Info.UpdatedAt = DateTime::Now();
		}

		void SetEndedAt(DateTime When) { m_Info.EndedAt = When; }

		/// Appends an entry to the in-memory deque and to the persisted
		/// log. Returns the new cursor value (m_TotalAppended post-
		/// increment). Logger name is interned, message and format are
		/// arena-allocated — the input's string_views may safely be
		/// caller-stack-bound.
		uint64_t AppendLog(LogEntryInput Input);

		/// Append-many counterpart that takes the deque lock exactly once
		/// for the whole batch. Use this when an inbound HTTP POST carries
		/// multiple entries — single-lock semantics keep entries from one
		/// caller contiguous on the wire even when other appends race in,
		/// and the WS-push observer can fire just once for the whole batch.
		/// Returns the new cursor (the value at the tail of the batch).
		uint64_t AppendLogBatch(std::span<LogEntryInput> Inputs);

		std::vector<LogEntry> GetLogEntries(uint32_t Limit = 0, uint32_t Offset = 0) const;
		uint64_t			  GetLogCount() const;

		/// Returns entries appended after the given cursor and the new cursor value.
		/// A cursor of 0 returns all entries currently in the deque.
		/// The returned LogEntries borrow strings from this Session's
		/// arena — callers must hold a Ref<Session> for as long as they
		/// keep the result.
		struct CursorResult
		{
			std::vector<LogEntry> Entries;
			uint64_t			  Cursor;  // new cursor for next poll
			uint64_t			  Count;   // current deque size
		};
		CursorResult GetLogEntriesAfter(uint64_t AfterCursor) const;

		// Seed this session with pre-existing log entries (e.g. loaded from disk
		// on startup). Sets the total-appended counter to reflect what was on
		// disk so cursors remain meaningful for historical sessions. The inputs
		// are interned/arena-allocated into this session.
		void PreloadEntries(std::span<const LogEntryInput> Tail, uint64_t TotalCount);

		/// Process handle used for client-liveness checks. Acquired at
		/// registration time (while the pid is known to refer to the reporting
		/// process) and held for the session's lifetime; on Windows this is a
		/// real HANDLE tied to the specific process instance and is immune to
		/// pid reuse. Invalid (IsValid() == false) for remote sessions or when
		/// OpenProcess() failed. Set once at construction — no synchronization
		/// needed for readers.
		const ProcessHandle& GetClientProcess() const { return m_ClientProcess; }
		ProcessHandle&		 GetClientProcess() { return m_ClientProcess; }

		static constexpr uint32_t MaxLogEntries = 10000;

	private:
		// Intern a logger name into m_LogArena and return the canonical
		// pointer for that name. Subsequent calls with the same string
		// return the same pointer. Caller must hold m_LogLock exclusive.
		const char* InternLoggerNameLocked(std::string_view Name);

		// Allocate a copy of Str into m_LogArena and return a null-
		// terminated pointer. No deduplication. Caller must hold m_LogLock
		// exclusive. Empty input returns "" (no allocation).
		const char* AllocateLogStringLocked(std::string_view Str);

		SessionInfo			   m_Info;
		Ref<SessionLog>		   m_Log;
		ProcessHandle		   m_ClientProcess;
		mutable RwLock		   m_LogLock;
		eastl::deque<LogEntry> m_LogEntries;
		uint64_t			   m_TotalAppended = 0;	 // monotonically increasing counter
		// String storage for the in-memory deque. LoggerName is interned
		// (one canonical copy per unique name); Message and Format are
		// duplicated per entry. Both die with the Session — so the
		// LogEntry pointers do too. Sized to fit a typical session's
		// strings in one chunk; spills to additional chunks otherwise.
		MemoryArena									  m_LogArena{4096};
		tsl::robin_map<std::string_view, const char*> m_InternedLoggerNames;
	};

	/// Construct a SessionsService. If StorageRoot is non-empty, session
	/// metadata and logs are persisted under that directory (one subdirectory
	/// per session id) and previously-persisted sessions are loaded as ended.
	explicit SessionsService(std::filesystem::path StorageRoot = {});
	~SessionsService();

	bool					  RegisterSession(const Oid&   SessionId,
											  std::string  AppName,
											  std::string  Mode,
											  std::string  Platform,
											  uint32_t	   ClientPid,
											  const Oid&   ParentSessionId,
											  const Oid&   JobId,
											  CbObjectView Metadata);
	bool					  UpdateSession(const Oid& SessionId, CbObjectView Metadata);
	Ref<Session>			  GetSession(const Oid& SessionId) const;
	std::vector<Ref<Session>> GetSessions() const;
	std::vector<Ref<Session>> GetEndedSessions() const;
	/// Ends a session. If Reason is non-empty, a synthetic log line is
	/// appended to the session log before it's moved to ended so the
	/// historical log has a clear closing event.
	bool	 RemoveSession(const Oid& SessionId, std::string_view Reason = {});
	uint64_t GetSessionCount() const;

	/// Appends a log entry to `SessionId` and, if the session exists,
	/// invokes the log-appended callback with the new cursor so downstream
	/// push subscribers (e.g. the HTTP WS broadcast) can deliver the delta
	/// without polling. Returns the new cursor, or 0 if the session is
	/// unknown. Fires the callback AFTER any internal locks are released
	/// so the callback can safely call back into this service.
	uint64_t AppendLog(const Oid& SessionId, LogEntryInput Input);

	/// Batch counterpart of AppendLog. Atomic with respect to other
	/// appends to the same session — entries land contiguously on the
	/// wire and persist in order — and fires exactly one push-callback
	/// for the whole batch. Empty batches and unknown sessions are
	/// no-ops returning 0.
	uint64_t AppendLogBatch(const Oid& SessionId, std::span<LogEntryInput> Inputs);

	/// Observer fired after an entry is appended to any session. Replaces
	/// any previously set callback. Pass {} to clear. Only one listener is
	/// supported — the single consumer today is the HTTP WebSocket push.
	using LogAppendedCallback = std::function<void(const Oid& SessionId, uint64_t NewCursor)>;
	void SetLogAppendedCallback(LogAppendedCallback Callback);

	/// Drop ended sessions that are too old, that push us over the count
	/// limit, or that push the on-disk footprint over the byte budget, and
	/// delete their persisted directories. Active sessions are never
	/// pruned. Returns the number removed by each criterion.
	struct PruneResult
	{
		size_t ExpiredByAge		= 0;
		size_t ExpiredByCount	= 0;
		size_t ExpiredByStorage = 0;
	};
	PruneResult PruneExpired(TimeSpan MaxAge, size_t MaxCount, uint64_t MaxStorageBytes);

	/// End any active session whose tracked client process is no longer
	/// running. Sessions with an invalid ProcessHandle (remote, or
	/// OpenProcess failed at registration) are skipped. Returns the number
	/// of sessions ended by this pass.
	size_t CheckProcessLiveness();

	// Tuning defaults. Expressed in whole days / bytes so they're easy to
	// override from a future command-line flag without touching internals.
	static constexpr int	  kDefaultMaxSessionAgeDays = 365;
	static constexpr size_t	  kDefaultMaxSessionCount	= 1000;
	static constexpr uint64_t kDefaultMaxStorageBytes	= 50ull * 1024 * 1024;	// 50 MiB

private:
	LoggerRef& Log() { return m_Log; }

	LoggerRef									   m_Log;
	mutable RwLock								   m_Lock;
	tsl::robin_map<Oid, Ref<Session>, Oid::Hasher> m_Sessions;
	std::vector<Ref<Session>>					   m_EndedSessions;
	std::unique_ptr<SessionLogStore>			   m_SessionLogs;
	// Set once at wiring-time (single consumer), never reassigned while
	// hot, so no dedicated lock — just a plain member. Copy-on-call
	// guards against the theoretical re-register race below.
	LogAppendedCallback m_LogAppendedCallback;
};

}  // namespace zen