// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #include #include #include #include #include #include namespace zen { class SessionLogStore; class SessionLog; /** Session tracker * * Acts as a log and session info concentrator when dealing with multiple * servers and external processes acting as a group. */ class SessionsService { public: struct SessionInfo { Oid Id; std::string AppName; std::string Mode; std::string Platform; // Reported by the client, e.g. "windows", "linux", "macos" uint32_t ClientPid = 0; // Non-zero = local PID to probe for liveness. 0 = don't track. Oid ParentSessionId; // Optional task/action identifier used to associate this session with a // specific unit of work. Distinct from ParentSessionId, which records // process/session ancestry. Oid JobId; CbObject Metadata; DateTime CreatedAt; DateTime UpdatedAt; DateTime EndedAt{0}; }; /// Stored form of a log entry. The string fields are arena-borrowed /// `const char*` — they live in the owning Session's MemoryArena and /// are valid only for that Session's lifetime. Default copy is /// intentionally shallow (string pointers are shared with the source); /// callers must not let copies outlive the originating Session. /// /// Build entries via `LogEntryInput` and route them through /// `Session::AppendLog` / `AppendLogBatch`, which intern logger names /// and arena-allocate the other strings before storing. struct LogEntry { DateTime Timestamp{0}; // Sentinel: Off means "no level set" (e.g. plain-text POSTed entries // where the client didn't include a level). Real log entries use // Trace..Critical, so Off is free to reuse as "omit on serialize". logging::LogLevel Level = logging::Off; // Arena pointers (null-terminated). Empty string is the default // — never null, so callers don't need to guard. const char* LoggerName = ""; // Interned: one canonical copy per unique name across the session. const char* Message = ""; // For structured entries: the rendered form (populated at intake). const char* Format = ""; // UE_LOGFMT template; "" for plain entries. CbObject Fields; // Present only when Format is non-empty. }; /// Input form used to build an entry on the way into a Session. The /// string_view fields are caller-borrowed; AppendLog interns/copies /// them into the Session's arena before any LogEntry is built. Use /// this struct rather than constructing LogEntry directly so the /// arena ownership invariant stays one-sided. struct LogEntryInput { DateTime Timestamp{0}; logging::LogLevel Level = logging::Off; std::string_view LoggerName; std::string_view Message; std::string_view Format; CbObject Fields; }; class Session : public TRefCounted { public: Session(const SessionInfo& Info, Ref Log = {}, ProcessHandle ClientProcess = {}); ~Session(); Session(Session&&) = delete; Session& operator=(Session&&) = delete; const SessionInfo& Info() const { return m_Info; } void UpdateMetadata(CbObjectView Metadata) { m_Info.Metadata = CbObject::Clone(Metadata); m_Info.UpdatedAt = DateTime::Now(); } void SetEndedAt(DateTime When) { m_Info.EndedAt = When; } /// Appends an entry to the in-memory deque and to the persisted /// log. Returns the new cursor value (m_TotalAppended post- /// increment). Logger name is interned, message and format are /// arena-allocated — the input's string_views may safely be /// caller-stack-bound. uint64_t AppendLog(LogEntryInput Input); /// Append-many counterpart that takes the deque lock exactly once /// for the whole batch. Use this when an inbound HTTP POST carries /// multiple entries — single-lock semantics keep entries from one /// caller contiguous on the wire even when other appends race in, /// and the WS-push observer can fire just once for the whole batch. /// Returns the new cursor (the value at the tail of the batch). uint64_t AppendLogBatch(std::span Inputs); std::vector GetLogEntries(uint32_t Limit = 0, uint32_t Offset = 0) const; uint64_t GetLogCount() const; /// Returns entries appended after the given cursor and the new cursor value. /// A cursor of 0 returns all entries currently in the deque. /// The returned LogEntries borrow strings from this Session's /// arena — callers must hold a Ref for as long as they /// keep the result. struct CursorResult { std::vector Entries; uint64_t Cursor; // new cursor for next poll uint64_t Count; // current deque size }; CursorResult GetLogEntriesAfter(uint64_t AfterCursor) const; // Seed this session with pre-existing log entries (e.g. loaded from disk // on startup). Sets the total-appended counter to reflect what was on // disk so cursors remain meaningful for historical sessions. The inputs // are interned/arena-allocated into this session. void PreloadEntries(std::span Tail, uint64_t TotalCount); /// Process handle used for client-liveness checks. Acquired at /// registration time (while the pid is known to refer to the reporting /// process) and held for the session's lifetime; on Windows this is a /// real HANDLE tied to the specific process instance and is immune to /// pid reuse. Invalid (IsValid() == false) for remote sessions or when /// OpenProcess() failed. Set once at construction — no synchronization /// needed for readers. const ProcessHandle& GetClientProcess() const { return m_ClientProcess; } ProcessHandle& GetClientProcess() { return m_ClientProcess; } static constexpr uint32_t MaxLogEntries = 10000; private: // Intern a logger name into m_LogArena and return the canonical // pointer for that name. Subsequent calls with the same string // return the same pointer. Caller must hold m_LogLock exclusive. const char* InternLoggerNameLocked(std::string_view Name); // Allocate a copy of Str into m_LogArena and return a null- // terminated pointer. No deduplication. Caller must hold m_LogLock // exclusive. Empty input returns "" (no allocation). const char* AllocateLogStringLocked(std::string_view Str); SessionInfo m_Info; Ref m_Log; ProcessHandle m_ClientProcess; mutable RwLock m_LogLock; eastl::deque m_LogEntries; uint64_t m_TotalAppended = 0; // monotonically increasing counter // String storage for the in-memory deque. LoggerName is interned // (one canonical copy per unique name); Message and Format are // duplicated per entry. Both die with the Session — so the // LogEntry pointers do too. Sized to fit a typical session's // strings in one chunk; spills to additional chunks otherwise. MemoryArena m_LogArena{4096}; tsl::robin_map m_InternedLoggerNames; }; /// Construct a SessionsService. If StorageRoot is non-empty, session /// metadata and logs are persisted under that directory (one subdirectory /// per session id) and previously-persisted sessions are loaded as ended. explicit SessionsService(std::filesystem::path StorageRoot = {}); ~SessionsService(); bool RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, std::string Platform, uint32_t ClientPid, const Oid& ParentSessionId, const Oid& JobId, CbObjectView Metadata); bool UpdateSession(const Oid& SessionId, CbObjectView Metadata); Ref GetSession(const Oid& SessionId) const; std::vector> GetSessions() const; std::vector> GetEndedSessions() const; /// Ends a session. If Reason is non-empty, a synthetic log line is /// appended to the session log before it's moved to ended so the /// historical log has a clear closing event. bool RemoveSession(const Oid& SessionId, std::string_view Reason = {}); uint64_t GetSessionCount() const; /// Appends a log entry to `SessionId` and, if the session exists, /// invokes the log-appended callback with the new cursor so downstream /// push subscribers (e.g. the HTTP WS broadcast) can deliver the delta /// without polling. Returns the new cursor, or 0 if the session is /// unknown. Fires the callback AFTER any internal locks are released /// so the callback can safely call back into this service. uint64_t AppendLog(const Oid& SessionId, LogEntryInput Input); /// Batch counterpart of AppendLog. Atomic with respect to other /// appends to the same session — entries land contiguously on the /// wire and persist in order — and fires exactly one push-callback /// for the whole batch. Empty batches and unknown sessions are /// no-ops returning 0. uint64_t AppendLogBatch(const Oid& SessionId, std::span Inputs); /// Observer fired after an entry is appended to any session. Replaces /// any previously set callback. Pass {} to clear. Only one listener is /// supported — the single consumer today is the HTTP WebSocket push. using LogAppendedCallback = std::function; void SetLogAppendedCallback(LogAppendedCallback Callback); /// Drop ended sessions that are too old, that push us over the count /// limit, or that push the on-disk footprint over the byte budget, and /// delete their persisted directories. Active sessions are never /// pruned. Returns the number removed by each criterion. struct PruneResult { size_t ExpiredByAge = 0; size_t ExpiredByCount = 0; size_t ExpiredByStorage = 0; }; PruneResult PruneExpired(TimeSpan MaxAge, size_t MaxCount, uint64_t MaxStorageBytes); /// End any active session whose tracked client process is no longer /// running. Sessions with an invalid ProcessHandle (remote, or /// OpenProcess failed at registration) are skipped. Returns the number /// of sessions ended by this pass. size_t CheckProcessLiveness(); // Tuning defaults. Expressed in whole days / bytes so they're easy to // override from a future command-line flag without touching internals. static constexpr int kDefaultMaxSessionAgeDays = 365; static constexpr size_t kDefaultMaxSessionCount = 1000; static constexpr uint64_t kDefaultMaxStorageBytes = 50ull * 1024 * 1024; // 50 MiB private: LoggerRef& Log() { return m_Log; } LoggerRef m_Log; mutable RwLock m_Lock; tsl::robin_map, Oid::Hasher> m_Sessions; std::vector> m_EndedSessions; std::unique_ptr m_SessionLogs; // Set once at wiring-time (single consumer), never reassigned while // hot, so no dedicated lock — just a plain member. Copy-on-call // guards against the theoretical re-register race below. LogAppendedCallback m_LogAppendedCallback; }; } // namespace zen