diff options
Diffstat (limited to 'src/zenserver/sessions/sessions.h')
| -rw-r--r-- | src/zenserver/sessions/sessions.h | 193 |
1 files changed, 174 insertions, 19 deletions
diff --git a/src/zenserver/sessions/sessions.h b/src/zenserver/sessions/sessions.h index a84ca6506..a722704e0 100644 --- a/src/zenserver/sessions/sessions.h +++ b/src/zenserver/sessions/sessions.h @@ -4,6 +4,8 @@ #include <zencore/compactbinary.h> #include <zencore/logbase.h> +#include <zencore/memory/memoryarena.h> +#include <zencore/process.h> #include <zencore/thread.h> #include <zencore/uid.h> @@ -11,7 +13,10 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <EASTL/deque.h> #include <tsl/robin_map.h> ZEN_THIRD_PARTY_INCLUDES_END +#include <filesystem> +#include <functional> #include <optional> +#include <span> #include <string> #include <vector> @@ -34,25 +39,62 @@ public: Oid Id; std::string AppName; std::string Mode; - Oid JobId; - CbObject Metadata; - DateTime CreatedAt; - DateTime UpdatedAt; - DateTime EndedAt{0}; + std::string Platform; // Reported by the client, e.g. "windows", "linux", "macos" + uint32_t ClientPid = 0; // Non-zero = local PID to probe for liveness. 0 = don't track. + Oid ParentSessionId; + // Optional task/action identifier used to associate this session with a + // specific unit of work. Distinct from ParentSessionId, which records + // process/session ancestry. + Oid JobId; + CbObject Metadata; + DateTime CreatedAt; + DateTime UpdatedAt; + DateTime EndedAt{0}; }; + /// Stored form of a log entry. The string fields are arena-borrowed + /// `const char*` — they live in the owning Session's MemoryArena and + /// are valid only for that Session's lifetime. Default copy is + /// intentionally shallow (string pointers are shared with the source); + /// callers must not let copies outlive the originating Session. + /// + /// Build entries via `LogEntryInput` and route them through + /// `Session::AppendLog` / `AppendLogBatch`, which intern logger names + /// and arena-allocate the other strings before storing. struct LogEntry { - DateTime Timestamp; - std::string Level; - std::string Message; - CbObject Data; + DateTime Timestamp{0}; + // Sentinel: Off means "no level set" (e.g. plain-text POSTed entries + // where the client didn't include a level). Real log entries use + // Trace..Critical, so Off is free to reuse as "omit on serialize". + logging::LogLevel Level = logging::Off; + // Arena pointers (null-terminated). Empty string is the default + // — never null, so callers don't need to guard. + const char* LoggerName = ""; // Interned: one canonical copy per unique name across the session. + const char* Message = ""; // For structured entries: the rendered form (populated at intake). + const char* Format = ""; // UE_LOGFMT template; "" for plain entries. + CbObject Fields; // Present only when Format is non-empty. + }; + + /// Input form used to build an entry on the way into a Session. The + /// string_view fields are caller-borrowed; AppendLog interns/copies + /// them into the Session's arena before any LogEntry is built. Use + /// this struct rather than constructing LogEntry directly so the + /// arena ownership invariant stays one-sided. + struct LogEntryInput + { + DateTime Timestamp{0}; + logging::LogLevel Level = logging::Off; + std::string_view LoggerName; + std::string_view Message; + std::string_view Format; + CbObject Fields; }; class Session : public TRefCounted<Session> { public: - Session(const SessionInfo& Info); + Session(const SessionInfo& Info, Ref<SessionLog> Log = {}, ProcessHandle ClientProcess = {}); ~Session(); Session(Session&&) = delete; @@ -67,12 +109,29 @@ public: void SetEndedAt(DateTime When) { m_Info.EndedAt = When; } - void AppendLog(LogEntry Entry); + /// Appends an entry to the in-memory deque and to the persisted + /// log. Returns the new cursor value (m_TotalAppended post- + /// increment). Logger name is interned, message and format are + /// arena-allocated — the input's string_views may safely be + /// caller-stack-bound. + uint64_t AppendLog(LogEntryInput Input); + + /// Append-many counterpart that takes the deque lock exactly once + /// for the whole batch. Use this when an inbound HTTP POST carries + /// multiple entries — single-lock semantics keep entries from one + /// caller contiguous on the wire even when other appends race in, + /// and the WS-push observer can fire just once for the whole batch. + /// Returns the new cursor (the value at the tail of the batch). + uint64_t AppendLogBatch(std::span<LogEntryInput> Inputs); + std::vector<LogEntry> GetLogEntries(uint32_t Limit = 0, uint32_t Offset = 0) const; uint64_t GetLogCount() const; /// Returns entries appended after the given cursor and the new cursor value. /// A cursor of 0 returns all entries currently in the deque. + /// The returned LogEntries borrow strings from this Session's + /// arena — callers must hold a Ref<Session> for as long as they + /// keep the result. struct CursorResult { std::vector<LogEntry> Entries; @@ -81,26 +140,118 @@ public: }; CursorResult GetLogEntriesAfter(uint64_t AfterCursor) const; + // Seed this session with pre-existing log entries (e.g. loaded from disk + // on startup). Sets the total-appended counter to reflect what was on + // disk so cursors remain meaningful for historical sessions. The inputs + // are interned/arena-allocated into this session. + void PreloadEntries(std::span<const LogEntryInput> Tail, uint64_t TotalCount); + + /// Process handle used for client-liveness checks. Acquired at + /// registration time (while the pid is known to refer to the reporting + /// process) and held for the session's lifetime; on Windows this is a + /// real HANDLE tied to the specific process instance and is immune to + /// pid reuse. Invalid (IsValid() == false) for remote sessions or when + /// OpenProcess() failed. Set once at construction — no synchronization + /// needed for readers. + const ProcessHandle& GetClientProcess() const { return m_ClientProcess; } + ProcessHandle& GetClientProcess() { return m_ClientProcess; } + + static constexpr uint32_t MaxLogEntries = 10000; + private: + // Intern a logger name into m_LogArena and return the canonical + // pointer for that name. Subsequent calls with the same string + // return the same pointer. Caller must hold m_LogLock exclusive. + const char* InternLoggerNameLocked(std::string_view Name); + + // Allocate a copy of Str into m_LogArena and return a null- + // terminated pointer. No deduplication. Caller must hold m_LogLock + // exclusive. Empty input returns "" (no allocation). + const char* AllocateLogStringLocked(std::string_view Str); + SessionInfo m_Info; Ref<SessionLog> m_Log; + ProcessHandle m_ClientProcess; mutable RwLock m_LogLock; eastl::deque<LogEntry> m_LogEntries; uint64_t m_TotalAppended = 0; // monotonically increasing counter - - static constexpr uint32_t MaxLogEntries = 10000; + // String storage for the in-memory deque. LoggerName is interned + // (one canonical copy per unique name); Message and Format are + // duplicated per entry. Both die with the Session — so the + // LogEntry pointers do too. Sized to fit a typical session's + // strings in one chunk; spills to additional chunks otherwise. + MemoryArena m_LogArena{4096}; + tsl::robin_map<std::string_view, const char*> m_InternedLoggerNames; }; - SessionsService(); + /// Construct a SessionsService. If StorageRoot is non-empty, session + /// metadata and logs are persisted under that directory (one subdirectory + /// per session id) and previously-persisted sessions are loaded as ended. + explicit SessionsService(std::filesystem::path StorageRoot = {}); ~SessionsService(); - bool RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, const Oid& JobId, CbObjectView Metadata); - bool UpdateSession(const Oid& SessionId, CbObjectView Metadata); - Ref<Session> GetSession(const Oid& SessionId) const; + bool RegisterSession(const Oid& SessionId, + std::string AppName, + std::string Mode, + std::string Platform, + uint32_t ClientPid, + const Oid& ParentSessionId, + const Oid& JobId, + CbObjectView Metadata); + bool UpdateSession(const Oid& SessionId, CbObjectView Metadata); + Ref<Session> GetSession(const Oid& SessionId) const; std::vector<Ref<Session>> GetSessions() const; std::vector<Ref<Session>> GetEndedSessions() const; - bool RemoveSession(const Oid& SessionId); - uint64_t GetSessionCount() const; + /// Ends a session. If Reason is non-empty, a synthetic log line is + /// appended to the session log before it's moved to ended so the + /// historical log has a clear closing event. + bool RemoveSession(const Oid& SessionId, std::string_view Reason = {}); + uint64_t GetSessionCount() const; + + /// Appends a log entry to `SessionId` and, if the session exists, + /// invokes the log-appended callback with the new cursor so downstream + /// push subscribers (e.g. the HTTP WS broadcast) can deliver the delta + /// without polling. Returns the new cursor, or 0 if the session is + /// unknown. Fires the callback AFTER any internal locks are released + /// so the callback can safely call back into this service. + uint64_t AppendLog(const Oid& SessionId, LogEntryInput Input); + + /// Batch counterpart of AppendLog. Atomic with respect to other + /// appends to the same session — entries land contiguously on the + /// wire and persist in order — and fires exactly one push-callback + /// for the whole batch. Empty batches and unknown sessions are + /// no-ops returning 0. + uint64_t AppendLogBatch(const Oid& SessionId, std::span<LogEntryInput> Inputs); + + /// Observer fired after an entry is appended to any session. Replaces + /// any previously set callback. Pass {} to clear. Only one listener is + /// supported — the single consumer today is the HTTP WebSocket push. + using LogAppendedCallback = std::function<void(const Oid& SessionId, uint64_t NewCursor)>; + void SetLogAppendedCallback(LogAppendedCallback Callback); + + /// Drop ended sessions that are too old, that push us over the count + /// limit, or that push the on-disk footprint over the byte budget, and + /// delete their persisted directories. Active sessions are never + /// pruned. Returns the number removed by each criterion. + struct PruneResult + { + size_t ExpiredByAge = 0; + size_t ExpiredByCount = 0; + size_t ExpiredByStorage = 0; + }; + PruneResult PruneExpired(TimeSpan MaxAge, size_t MaxCount, uint64_t MaxStorageBytes); + + /// End any active session whose tracked client process is no longer + /// running. Sessions with an invalid ProcessHandle (remote, or + /// OpenProcess failed at registration) are skipped. Returns the number + /// of sessions ended by this pass. + size_t CheckProcessLiveness(); + + // Tuning defaults. Expressed in whole days / bytes so they're easy to + // override from a future command-line flag without touching internals. + static constexpr int kDefaultMaxSessionAgeDays = 365; + static constexpr size_t kDefaultMaxSessionCount = 1000; + static constexpr uint64_t kDefaultMaxStorageBytes = 50ull * 1024 * 1024; // 50 MiB private: LoggerRef& Log() { return m_Log; } @@ -110,6 +261,10 @@ private: tsl::robin_map<Oid, Ref<Session>, Oid::Hasher> m_Sessions; std::vector<Ref<Session>> m_EndedSessions; std::unique_ptr<SessionLogStore> m_SessionLogs; + // Set once at wiring-time (single consumer), never reassigned while + // hot, so no dedicated lock — just a plain member. Copy-on-call + // guards against the theoretical re-register race below. + LogAppendedCallback m_LogAppendedCallback; }; } // namespace zen |