diff options
Diffstat (limited to 'src/zenserver/sessions')
| -rw-r--r-- | src/zenserver/sessions/httpsessions.cpp | 504 | ||||
| -rw-r--r-- | src/zenserver/sessions/httpsessions.h | 60 | ||||
| -rw-r--r-- | src/zenserver/sessions/inprocsessionlogsink.cpp | 37 | ||||
| -rw-r--r-- | src/zenserver/sessions/logtemplate.cpp | 390 | ||||
| -rw-r--r-- | src/zenserver/sessions/logtemplate.h | 42 | ||||
| -rw-r--r-- | src/zenserver/sessions/sessions.cpp | 1166 | ||||
| -rw-r--r-- | src/zenserver/sessions/sessions.h | 193 |
7 files changed, 2262 insertions, 130 deletions
diff --git a/src/zenserver/sessions/httpsessions.cpp b/src/zenserver/sessions/httpsessions.cpp index 88db36828..1678ede60 100644 --- a/src/zenserver/sessions/httpsessions.cpp +++ b/src/zenserver/sessions/httpsessions.cpp @@ -7,8 +7,17 @@ #include <zencore/logging.h> #include <zencore/string.h> #include <zencore/trace.h> +#include "logtemplate.h" #include "sessions.h" +ZEN_THIRD_PARTY_INCLUDES_START +#include <EASTL/fixed_list.h> +#include <EASTL/fixed_vector.h> +#include <json11.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <limits> + namespace zen { using namespace std::literals; @@ -21,13 +30,21 @@ HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService, , m_StatsService(StatsService) , m_Sessions(Sessions) , m_PushTimer(IoContext) +, m_CleanupTimer(IoContext) +, m_LivenessTimer(IoContext) { Initialize(); } HttpSessionsService::~HttpSessionsService() { + // Break the callback edge before tearing anything else down so a + // late AppendLog on another thread can't fire BroadcastLogAppended + // after our subscriber list is gone. + m_Sessions.SetLogAppendedCallback({}); m_PushTimer.cancel(); + m_CleanupTimer.cancel(); + m_LivenessTimer.cancel(); m_StatsService.UnregisterHandler("sessions", *this); m_StatusService.UnregisterHandler("sessions", *this); } @@ -135,12 +152,36 @@ HttpSessionsService::Initialize() m_StatsService.RegisterHandler("sessions", *this); m_StatusService.RegisterHandler("sessions", *this); + // Event-driven log push: the service fires this every time an entry + // is appended (including the synthetic "session ended" line emitted + // by RemoveSession). Subscribers receive a binary CB frame carrying + // the delta. Safe to call BroadcastLogAppended from any thread — it + // does its own locking and SendBinary is async-queued by the WS + // transport. + m_Sessions.SetLogAppendedCallback([this](const Oid& SessionId, uint64_t NewCursor) { BroadcastLogAppended(SessionId, NewCursor); }); + EnqueuePushTimer(); + + // Run a cleanup pass shortly after startup so freshly-loaded historical + // data is pruned even if the server doesn't stay up for an hour. + m_CleanupTimer.expires_after(std::chrono::seconds(30)); + m_CleanupTimer.async_wait([this](const asio::error_code& Ec) { + if (Ec) + { + return; + } + RunCleanup(); + EnqueueCleanupTimer(); + }); + + EnqueueLivenessTimer(); } static void -WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info) +WriteSessionInfo(CbWriter& Writer, const SessionsService::Session& Session) { + const SessionsService::SessionInfo& Info = Session.Info(); + Writer << "id" << Info.Id; if (!Info.AppName.empty()) { @@ -150,6 +191,18 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info) { Writer << "mode" << Info.Mode; } + if (!Info.Platform.empty()) + { + Writer << "platform" << Info.Platform; + } + if (Info.ClientPid != 0) + { + Writer << "pid" << Info.ClientPid; + } + if (Info.ParentSessionId != Oid::Zero) + { + Writer << "parent_session_id" << Info.ParentSessionId; + } if (Info.JobId != Oid::Zero) { Writer << "jobid" << Info.JobId; @@ -161,6 +214,11 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info) Writer << "ended_at" << Info.EndedAt; } + if (const uint64_t LogCount = Session.GetLogCount(); LogCount > 0) + { + Writer << "log_count" << LogCount; + } + if (Info.Metadata.GetSize() > 0) { Writer.AddObject("metadata"sv, Info.Metadata); @@ -182,13 +240,13 @@ HttpSessionsService::BuildSessionListResponse() for (const Ref<SessionsService::Session>& Session : Active) { Response.BeginObject(); - WriteSessionInfo(Response, Session->Info()); + WriteSessionInfo(Response, *Session); Response.EndObject(); } for (const Ref<SessionsService::Session>& Session : Ended) { Response.BeginObject(); - WriteSessionInfo(Response, Session->Info()); + WriteSessionInfo(Response, *Session); Response.EndObject(); } Response.EndArray(); @@ -231,7 +289,7 @@ HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req) for (const Ref<SessionsService::Session>& Session : Sessions) { Response.BeginObject(); - WriteSessionInfo(Response, Session->Info()); + WriteSessionInfo(Response, *Session); Response.EndObject(); } Response.EndArray(); @@ -262,24 +320,51 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req) { CbObject RequestObject = ServerRequest.ReadPayloadObject(); + // Render the id into a stack buffer once for any success-reply + // paths below — avoids a std::string per POST/PUT. + char IdBuf[Oid::StringLength + 1] = {}; + SessionId.ToString(IdBuf); + const std::string_view IdStr(IdBuf, Oid::StringLength); + if (ServerRequest.RequestVerb() == HttpVerb::kPost) { std::string AppName(RequestObject["appname"sv].AsString()); std::string Mode(RequestObject["mode"sv].AsString()); - Oid JobId = RequestObject["jobid"sv].AsObjectId(); - CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView(); + std::string Platform(RequestObject["platform"sv].AsString()); + Oid ParentSessionId = RequestObject["parent_session_id"sv].AsObjectId(); + Oid JobId = RequestObject["jobid"sv].AsObjectId(); + CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView(); + + // Only trust a client-reported pid when the HTTP layer + // says the request is local (unix socket or a loopback + // TCP peer). A remote client's pid refers to a different + // machine's process table — opening a local handle with + // it would at best be meaningless, at worst a liveness + // false positive. + uint32_t ClientPid = 0; + if (ServerRequest.IsLocalMachineRequest()) + { + ClientPid = RequestObject["pid"sv].AsUInt32(); + } m_SessionsStats.SessionWriteCount++; - if (m_Sessions.RegisterSession(SessionId, std::move(AppName), std::move(Mode), JobId, MetadataView)) + if (m_Sessions.RegisterSession(SessionId, + std::move(AppName), + std::move(Mode), + std::move(Platform), + ClientPid, + ParentSessionId, + JobId, + MetadataView)) { - return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", SessionId)); + return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, IdStr); } else { // Already exists - try update instead if (m_Sessions.UpdateSession(SessionId, MetadataView)) { - return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", SessionId)); + return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, IdStr); } return ServerRequest.WriteResponse(HttpResponseCode::InternalServerError); } @@ -290,7 +375,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req) m_SessionsStats.SessionWriteCount++; if (m_Sessions.UpdateSession(SessionId, RequestObject["metadata"sv].AsObjectView())) { - return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", SessionId)); + return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, IdStr); } return ServerRequest.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, @@ -304,7 +389,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req) if (Session) { CbObjectWriter Response; - WriteSessionInfo(Response, Session->Info()); + WriteSessionInfo(Response, *Session); return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); } return ServerRequest.WriteResponse(HttpResponseCode::NotFound); @@ -312,7 +397,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req) case HttpVerb::kDelete: { m_SessionsStats.SessionDeleteCount++; - if (m_Sessions.RemoveSession(SessionId)) + if (m_Sessions.RemoveSession(SessionId, "client request"sv)) { return ServerRequest.WriteResponse(HttpResponseCode::OK); } @@ -334,17 +419,33 @@ static void WriteLogEntry(CbWriter& Writer, const SessionsService::LogEntry& Entry) { Writer << "timestamp" << Entry.Timestamp; - if (!Entry.Level.empty()) + if (Entry.Level != logging::Off) { - Writer << "level" << Entry.Level; + // Frontend renders on the string form (CSS class derives from it), so + // keep the wire format as the canonical lowercase name. + Writer << "level" << logging::ToString(Entry.Level); } - if (!Entry.Message.empty()) + const std::string_view LoggerName{Entry.LoggerName}; + if (!LoggerName.empty()) { - Writer << "message" << Entry.Message; + Writer << "logger" << LoggerName; } - if (Entry.Data.GetSize() > 0) + const std::string_view Message{Entry.Message}; + if (!Message.empty()) { - Writer.AddObject("data"sv, Entry.Data); + Writer << "message" << Message; + } + // Structured-log form alongside the rendered message so a future UI + // can offer field-level drill-down without another schema bump. The + // existing UI only looks at "message" and is unaffected. + const std::string_view Format{Entry.Format}; + if (!Format.empty()) + { + Writer << "format" << Format; + if (Entry.Fields.GetSize() > 0) + { + Writer.AddObject("fields"sv, Entry.Fields); + } } } @@ -378,12 +479,21 @@ HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req) if (ServerRequest.RequestContentType() == HttpContentType::kText) { - // Raw text - split by newlines, one entry per line + // Raw text - split by newlines, one entry per line. Collect + // into a batch and append atomically: keeps a single client's + // payload contiguous on the wire even when other clients race + // in, and fires the WS push observer just once for the whole + // batch instead of once per line. IoBuffer Payload = ServerRequest.ReadPayload(); std::string_view Text(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); const DateTime Now = DateTime::Now(); - size_t Pos = 0; + // 64 inline slots covers the typical SendLogBatch posting size + // (~50) without touching the heap. Spills to heap beyond that. + // LogEntryInput's string_views point into the request payload + // (Text), which lives for the duration of this handler. + eastl::fixed_vector<SessionsService::LogEntryInput, 64> Batch; + size_t Pos = 0; while (Pos < Text.size()) { size_t End = Text.find('\n', Pos); @@ -401,60 +511,115 @@ HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req) if (!Line.empty()) { - Session->AppendLog(SessionsService::LogEntry{ + Batch.push_back(SessionsService::LogEntryInput{ .Timestamp = Now, - .Message = std::string(Line), + .Message = Line, }); } Pos = End + 1; } + m_Sessions.AppendLogBatch(SessionId, Batch); } else { - // Structured log (JSON or CbObject) - // Accepts a single record or an "entries" array of records + // Structured log (JSON or CbObject). Accepts a single record + // or an "entries" array of records — collect into a batch so + // a single POST lands atomically and fires one WS push. CbObject RequestObject = ServerRequest.ReadPayloadObject(); const DateTime Now = DateTime::Now(); + // 64 inline slots covers the typical SendLogBatch posting size + // (~50) without touching the heap. Spills to heap beyond that. + // LogEntryInput's string_views borrow from the parsed + // RequestObject's underlying buffer (the logger / message / + // format strings on the wire); we keep RequestObject alive + // for the whole intake. + eastl::fixed_vector<SessionsService::LogEntryInput, 64> Batch; + + // Stable backing for messages we render from a structured + // template. fixed_list never moves nodes on insertion, so + // string_views into these strings stay valid until the list + // is destroyed at handler exit. 64 inline nodes match the + // batch's fixed-vector inline cap; spills to heap if a POST + // brings more. + eastl::fixed_list<std::string, 64> RenderedMessages; + auto AppendFromObject = [&](CbObjectView Obj) { - CbFieldView LevelField = Obj["level"sv]; - std::string_view Level; + CbFieldView LevelField = Obj["level"sv]; + logging::LogLevel Level = logging::Off; if (LevelField.IsString()) { - Level = LevelField.AsString(); + Level = logging::ParseLogLevelString(LevelField.AsString()); } else if (LevelField.IsInteger()) { int32_t LevelInt = LevelField.AsInt32(); if (LevelInt >= 0 && LevelInt < logging::LogLevelCount) { - Level = logging::ToString(static_cast<logging::LogLevel>(LevelInt)); + Level = static_cast<logging::LogLevel>(LevelInt); } } - std::string Message(Obj["message"sv].AsString()); - CbObjectView DataView = Obj["data"sv].AsObjectView(); - - Session->AppendLog(SessionsService::LogEntry{ - .Timestamp = Now, - .Level = std::string(Level), - .Message = std::move(Message), - .Data = CbObject::Clone(DataView), + const std::string_view LoggerName = Obj["logger"sv].AsString(); + + // Two entry shapes. Structured entries carry `format` + + // `fields` and no `message` — we render the template right + // here so the rest of the pipeline (in-memory deque, + // persisted log.bin, UI GET response) keeps working the + // same way for both shapes. + CbFieldView FormatField = Obj["format"sv]; + if (FormatField.IsString()) + { + const std::string_view FormatView = FormatField.AsString(); + CbObjectView FieldsView = Obj["fields"sv].AsObjectView(); + ExtendableStringBuilder<256> RenderedBuilder; + RenderLogTemplate(FormatView, FieldsView, RenderedBuilder); + + // Anchor the rendered string in the stable list so the + // LogEntryInput's view into it stays valid until the + // AppendLogBatch call below. + RenderedMessages.emplace_back(RenderedBuilder.ToView()); + const std::string& StoredRendered = RenderedMessages.back(); + + Batch.push_back(SessionsService::LogEntryInput{ + .Timestamp = Now, + .Level = Level, + .LoggerName = LoggerName, + .Message = StoredRendered, + .Format = FormatView, + .Fields = CbObject::Clone(FieldsView), + }); + return; + } + + // Plain entry. + Batch.push_back(SessionsService::LogEntryInput{ + .Timestamp = Now, + .Level = Level, + .LoggerName = LoggerName, + .Message = Obj["message"sv].AsString(), }); }; CbFieldView EntriesField = RequestObject["entries"sv]; if (EntriesField.IsArray()) { - for (CbFieldView Entry : EntriesField) + // Pre-reserve so the 50-ish entries from a typical + // SendLogBatch don't trigger 4-5 reallocations as the + // vector grows. + CbArrayView Arr = EntriesField.AsArrayView(); + Batch.reserve(Arr.Num()); + for (CbFieldView Entry : Arr) { AppendFromObject(Entry.AsObjectView()); } } else { + Batch.reserve(1); AppendFromObject(RequestObject); } + m_Sessions.AppendLogBatch(SessionId, Batch); } return ServerRequest.WriteResponse(HttpResponseCode::OK); @@ -547,13 +712,78 @@ HttpSessionsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::s { ZEN_UNUSED(RelativeUri); ZEN_INFO("Sessions WebSocket client connected"); - m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); + const uint64_t NewId = m_NextSubscriberId.fetch_add(1, std::memory_order_relaxed); + m_WsConnectionsLock.WithExclusiveLock( + [&] { m_WsConnections.push_back(WsSubscriber{.Connection = std::move(Connection), .Id = NewId}); }); } void -HttpSessionsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/) +HttpSessionsService::OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg) { - // No client-to-server messages expected + // Expected client→server protocol is JSON text frames; see + // sessions.js → _ws_send. Binary frames and malformed JSON are logged + // at debug and ignored so a confused client can't disturb others. + if (Msg.Opcode != WebSocketOpcode::kText) + { + return; + } + std::string_view PayloadText(static_cast<const char*>(Msg.Payload.GetData()), Msg.Payload.GetSize()); + std::string ParseError; + json11::Json Parsed = json11::Json::parse(std::string(PayloadText), ParseError); + if (!ParseError.empty() || !Parsed.is_object()) + { + ZEN_DEBUG("Ignoring malformed WebSocket frame: {}", ParseError.empty() ? "not an object" : ParseError); + return; + } + + const std::string& Type = Parsed["type"].string_value(); + if (Type == "sub_log") + { + const Oid SessionId = Oid::TryFromHexString(Parsed["session"].string_value()); + if (SessionId == Oid::Zero) + { + ZEN_DEBUG("sub_log with invalid session id '{}'", Parsed["session"].string_value()); + return; + } + // json11 reports int via int_value() (32-bit); cursors fit easily + // inside a session's lifetime so this is fine for the foreseeable + // future. Negative values are treated as 0. + const int CursorRaw = Parsed["cursor"].int_value(); + const uint64_t Cursor = CursorRaw > 0 ? static_cast<uint64_t>(CursorRaw) : 0; + + // Record the subscription and fire an immediate delta so we don't + // drop entries that landed between the client's HTTP replay and + // this frame. See BroadcastLogAppended for the broadcast flow. + m_WsConnectionsLock.WithExclusiveLock([&] { + for (WsSubscriber& Sub : m_WsConnections) + { + if (Sub.Connection.Get() == &Conn) + { + Sub.SubscribedSessionId = SessionId; + Sub.LastSentCursor = Cursor; + break; + } + } + }); + // Pass UINT64_MAX to force a flush even if the cursor hasn't + // advanced — the subscriber's LastSentCursor may already lag the + // tail (e.g. rapid posts before the client subscribed). + BroadcastLogAppended(SessionId, std::numeric_limits<uint64_t>::max()); + } + else if (Type == "unsub_log") + { + m_WsConnectionsLock.WithExclusiveLock([&] { + for (WsSubscriber& Sub : m_WsConnections) + { + if (Sub.Connection.Get() == &Conn) + { + Sub.Unsubscribe(); + break; + } + } + }); + } + // Unknown types are silently ignored so the protocol can grow. } void @@ -561,8 +791,8 @@ HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused] { ZEN_INFO("Sessions WebSocket client disconnected (code {})", Code); m_WsConnectionsLock.WithExclusiveLock([&] { - auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) { - return C.Get() == &Conn; + auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const WsSubscriber& Sub) { + return Sub.Connection.Get() == &Conn; }); m_WsConnections.erase(It, m_WsConnections.end()); }); @@ -571,8 +801,15 @@ HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused] void HttpSessionsService::BroadcastSessions() { - std::vector<Ref<WebSocketConnection>> Connections; - m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; }); + // 8 inline slots covers any realistic number of concurrent UI tabs; + // spills to heap beyond that. + eastl::fixed_vector<Ref<WebSocketConnection>, 8> Connections; + m_WsConnectionsLock.WithSharedLock([&] { + for (const WsSubscriber& Sub : m_WsConnections) + { + Connections.push_back(Sub.Connection); + } + }); if (Connections.empty()) { @@ -593,6 +830,107 @@ HttpSessionsService::BroadcastSessions() } void +HttpSessionsService::BroadcastLogAppended(const Oid& SessionId, uint64_t NewCursor) +{ + Ref<SessionsService::Session> Session = m_Sessions.GetSession(SessionId); + if (!Session) + { + // Session vanished (e.g. pruned) between the append and the + // broadcast. No entries to ship. + return; + } + + // Claim each subscriber's cursor and snapshot its delta atomically under + // the exclusive WS lock. Doing claim+fetch+cursor-bump together — rather + // than snapshot-shared / fetch-unlocked / bump-exclusive — closes the + // race where two concurrent BroadcastLogAppended calls would both + // observe the same FromCursor, fetch overlapping ranges, and ship the + // subscriber duplicate entries. Sends still happen after the lock is + // released to avoid holding it across async socket I/O. + struct PendingSend + { + Ref<WebSocketConnection> Connection; + SessionsService::Session::CursorResult Delta; + bool InitialSend; // true when FromCursor == 0 + }; + // 8 inline slots keeps the broadcast allocation-free for the typical UI + // case (1-2 tabs tailing one session); spills to heap if many clients + // happen to subscribe to the same session at once. + eastl::fixed_vector<PendingSend, 8> Sends; + m_WsConnectionsLock.WithExclusiveLock([&] { + for (WsSubscriber& Sub : m_WsConnections) + { + if (!Sub.IsSubscribedTo(SessionId)) + { + continue; + } + // Cheap gate: if the subscriber already has everything up to + // NewCursor, skip. Sub_log uses UINT64_MAX to force a flush. + if (NewCursor != std::numeric_limits<uint64_t>::max() && Sub.LastSentCursor >= NewCursor) + { + continue; + } + if (!Sub.Connection->IsOpen()) + { + continue; + } + const uint64_t FromCursor = Sub.LastSentCursor; + SessionsService::Session::CursorResult Delta = Session->GetLogEntriesAfter(FromCursor); + Sub.LastSentCursor = Delta.Cursor; + Sends.push_back({Sub.Connection, std::move(Delta), FromCursor == 0}); + } + }); + if (Sends.empty()) + { + return; + } + + // Render the hex id into a stack buffer — CbWriter only needs a + // string_view, so we avoid the 24-byte std::string allocation that + // Oid::ToString() would otherwise do on every broadcast. The buffer + // is StringLength + 1 because ToString writes a trailing NUL beyond + // the 24 hex chars; the view itself excludes the NUL. + char HexSessionIdBuf[Oid::StringLength + 1]; + SessionId.ToString(HexSessionIdBuf); + const std::string_view HexSessionId(HexSessionIdBuf, Oid::StringLength); + for (const PendingSend& Send : Sends) + { + if (Send.Delta.Entries.empty() && !Send.InitialSend) + { + // Nothing new and the subscriber was primed — nothing to send. + continue; + } + + // Binary CB frame — the client already has a CB parser + // (util/compactbinary.js). CB keeps structured entries typed end- + // to-end (hashes, ints, dates stay that way on the wire) and skips + // JSON escaping overhead on every append. Shape mirrors the HTTP + // GET response plus two routing fields (type + session). A fresh + // CbObjectWriter per iteration is required because the ctor calls + // BeginObject() to set up the implicit outer object — Save() then + // finalizes that object, leaving the writer in a state that + // Reset() doesn't restore. + CbObjectWriter Response; + Response << "type"sv + << "log"sv; + Response << "session"sv << HexSessionId; + Response << "cursor"sv << Send.Delta.Cursor; + Response << "count"sv << Send.Delta.Count; + Response.BeginArray("entries"sv); + for (const SessionsService::LogEntry& Entry : Send.Delta.Entries) + { + Response.BeginObject(); + WriteLogEntry(Response, Entry); + Response.EndObject(); + } + Response.EndArray(); + + CbObject Obj = Response.Save(); + Send.Connection->SendBinary(Obj.GetView()); + } +} + +void HttpSessionsService::EnqueuePushTimer() { m_PushTimer.expires_after(std::chrono::seconds(2)); @@ -607,4 +945,82 @@ HttpSessionsService::EnqueuePushTimer() }); } +////////////////////////////////////////////////////////////////////////// +// +// Periodic cleanup of expired / excess sessions +// + +void +HttpSessionsService::RunCleanup() +{ + const TimeSpan MaxAge = TimeSpan(SessionsService::kDefaultMaxSessionAgeDays, 0, 0, 0); + const size_t MaxCount = SessionsService::kDefaultMaxSessionCount; + const uint64_t MaxBytes = SessionsService::kDefaultMaxStorageBytes; + const SessionsService::PruneResult Result = m_Sessions.PruneExpired(MaxAge, MaxCount, MaxBytes); + if (Result.ExpiredByAge + Result.ExpiredByCount + Result.ExpiredByStorage > 0) + { + ZEN_INFO("Sessions cleanup: pruned {} by age, {} by count, {} by storage (max {} days, max {} sessions, max {} MiB)", + Result.ExpiredByAge, + Result.ExpiredByCount, + Result.ExpiredByStorage, + SessionsService::kDefaultMaxSessionAgeDays, + MaxCount, + MaxBytes / (1024 * 1024)); + } +} + +void +HttpSessionsService::EnqueueCleanupTimer() +{ + m_CleanupTimer.expires_after(std::chrono::hours(1)); + m_CleanupTimer.async_wait([this](const asio::error_code& Ec) { + if (Ec) + { + return; + } + RunCleanup(); + EnqueueCleanupTimer(); + }); +} + +////////////////////////////////////////////////////////////////////////// +// +// Periodic liveness check for tracked local client processes +// + +void +HttpSessionsService::RunLivenessCheck() +{ + const size_t EndedByDeadClient = m_Sessions.CheckProcessLiveness(); + if (EndedByDeadClient > 0) + { + ZEN_INFO("Sessions liveness: ended {} session(s) whose client process had exited", EndedByDeadClient); + } + else + { + // Debug-level so this doesn't spam at info every 30s, but lets an + // operator who's specifically investigating why their crashed + // session didn't clean up see whether anything is being tracked. + ZEN_DEBUG("Sessions liveness: no dead client processes found"); + } +} + +void +HttpSessionsService::EnqueueLivenessTimer() +{ + // 30s strikes a balance between crash-detection latency and + // per-session OpenProcess/GetExitCode overhead. Active sessions with + // no reported pid (remote clients) are skipped in the inner loop so + // the cost scales with local sessions only. + m_LivenessTimer.expires_after(std::chrono::seconds(30)); + m_LivenessTimer.async_wait([this](const asio::error_code& Ec) { + if (Ec) + { + return; + } + RunLivenessCheck(); + EnqueueLivenessTimer(); + }); +} + } // namespace zen diff --git a/src/zenserver/sessions/httpsessions.h b/src/zenserver/sessions/httpsessions.h index 6ebe61c8d..2c0185176 100644 --- a/src/zenserver/sessions/httpsessions.h +++ b/src/zenserver/sessions/httpsessions.h @@ -13,6 +13,8 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <asio/steady_timer.hpp> ZEN_THIRD_PARTY_INCLUDES_END +#include <atomic> + namespace zen { class SessionsService; @@ -69,14 +71,64 @@ private: SessionsStats m_SessionsStats; metrics::OperationTiming m_HttpRequests; - // WebSocket push - RwLock m_WsConnectionsLock; - std::vector<Ref<WebSocketConnection>> m_WsConnections; - asio::steady_timer m_PushTimer; + // WebSocket push. + // + // Each connection can subscribe to a single session's log stream. The + // subscription is optional (SubscribedSessionId == Oid::Zero means + // "session-list broadcasts only"). LastSentCursor is the cursor value + // most recently delivered for the subscribed session; the broadcaster + // uses it to pull the correct delta from the service. + // + // Id is a process-monotonic generation token assigned at OnWebSocket- + // Open. Pointer matching is fine for OnWebSocketMessage / + // OnWebSocketClose where the live `WebSocketConnection&` parameter is + // unambiguous; Id-based matching is reserved for any future code path + // that wants to refer to a subscriber across lock releases without + // risking a slot-reuse mix-up. + struct WsSubscriber + { + Ref<WebSocketConnection> Connection; + uint64_t Id = 0; + Oid SubscribedSessionId = Oid::Zero; + uint64_t LastSentCursor = 0; + + // True iff the subscriber is currently subscribed to `Session`. + // Centralizes the (SubscribedSessionId != Oid::Zero) sentinel + // check that broadcaster, cursor-bump path, and any future filter + // would otherwise each open-code. + bool IsSubscribedTo(const Oid& Session) const { return SubscribedSessionId != Oid::Zero && SubscribedSessionId == Session; } + + // Drop the active subscription. After this returns, IsSubscribedTo + // is false for every session id. + void Unsubscribe() + { + SubscribedSessionId = Oid::Zero; + LastSentCursor = 0; + } + }; + RwLock m_WsConnectionsLock; + std::vector<WsSubscriber> m_WsConnections; + std::atomic_uint64_t m_NextSubscriberId{1}; // 0 reserved as "not yet assigned" + asio::steady_timer m_PushTimer; void BroadcastSessions(); void EnqueuePushTimer(); + // Event-driven log push. Called from SessionsService's log-appended + // callback; iterates subscribers of the given session and ships any + // entries they haven't seen yet. + void BroadcastLogAppended(const Oid& SessionId, uint64_t NewCursor); + + // Periodic cleanup of old / excess ended sessions + asio::steady_timer m_CleanupTimer; + void EnqueueCleanupTimer(); + void RunCleanup(); + + // Periodic client-process liveness check for locally-connected sessions. + asio::steady_timer m_LivenessTimer; + void EnqueueLivenessTimer(); + void RunLivenessCheck(); + Oid m_SelfSessionId = Oid::Zero; CbObject BuildSessionListResponse(); diff --git a/src/zenserver/sessions/inprocsessionlogsink.cpp b/src/zenserver/sessions/inprocsessionlogsink.cpp index 04c5f7312..c935522bc 100644 --- a/src/zenserver/sessions/inprocsessionlogsink.cpp +++ b/src/zenserver/sessions/inprocsessionlogsink.cpp @@ -12,28 +12,31 @@ static constexpr uint64_t UnixEpochBiasSeconds = uint64_t(double(1970 - 1) * 365 static DateTime TimePointToDateTime(logging::LogClock::time_point Time) { - auto Duration = Time.time_since_epoch(); - auto Seconds = std::chrono::duration_cast<std::chrono::seconds>(Duration); - uint64_t Ticks = (UnixEpochBiasSeconds + static_cast<uint64_t>(Seconds.count())) * TimeSpan::TicksPerSecond; - return DateTime{Ticks}; + // DateTime ticks are 100 ns each. Splitting the time_point into whole-second + // and sub-second parts and converting both lets us preserve sub-second + // precision; the previous implementation truncated to seconds, which made + // every entry land at .000 ms in tail / dashboard renderings. + auto Duration = Time.time_since_epoch(); + auto Seconds = std::chrono::duration_cast<std::chrono::seconds>(Duration); + auto SubSecondNanos = std::chrono::duration_cast<std::chrono::nanoseconds>(Duration - Seconds); + uint64_t SecondsTicks = (UnixEpochBiasSeconds + static_cast<uint64_t>(Seconds.count())) * TimeSpan::TicksPerSecond; + uint64_t SubSecondTicks = static_cast<uint64_t>(SubSecondNanos.count()) / static_cast<uint64_t>(TimeSpan::NanosecondsPerTick); + return DateTime{SecondsTicks + SubSecondTicks}; } void InProcSessionLogSink::Log(const logging::LogMessage& Msg) { - Ref<SessionsService::Session> Session = m_Service.GetSession(m_SessionId); - if (!Session) - { - return; - } - - SessionsService::LogEntry Entry{ - .Timestamp = TimePointToDateTime(Msg.GetTime()), - .Level = std::string(logging::ToString(Msg.GetLevel())), - .Message = std::string(Msg.GetPayload()), - }; - - Session->AppendLog(std::move(Entry)); + // Route through the service-level AppendLog so the log-appended + // callback fires — otherwise WS subscribers tailing the self-session + // don't see in-proc lines until they reload and re-fetch via HTTP. + m_Service.AppendLog(m_SessionId, + SessionsService::LogEntryInput{ + .Timestamp = TimePointToDateTime(Msg.GetTime()), + .Level = Msg.GetLevel(), + .LoggerName = Msg.GetLoggerName(), + .Message = Msg.GetPayload(), + }); } } // namespace zen diff --git a/src/zenserver/sessions/logtemplate.cpp b/src/zenserver/sessions/logtemplate.cpp new file mode 100644 index 000000000..b4d8f37e8 --- /dev/null +++ b/src/zenserver/sessions/logtemplate.cpp @@ -0,0 +1,390 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "logtemplate.h" + +#include <zencore/fmtutils.h> +#include <zencore/guid.h> +#include <zencore/iohash.h> +#include <zencore/string.h> +#include <zencore/uid.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { +using namespace std::literals; + +namespace { + + // Bounded recursion so pathological nesting (e.g. an object that references + // itself through $format) can't stack-overflow the server. Depth counts + // every nested template expansion OR value descent. + constexpr size_t kMaxRecursionDepth = 16; + + void RenderTemplateInto(std::string_view Template, CbObjectView Fields, StringBuilderBase& Out, bool Localized, size_t Depth); + void RenderValue(CbFieldView Field, StringBuilderBase& Out, bool Localized, size_t Depth); + + ////////////////////////////////////////////////////////////////////////// + // + // Path resolution: walk a UE field_path — `name` followed by zero or + // more `.name` / `[N]` segments — starting from the fields root. Returns + // an empty field on any miss. + // + + CbFieldView ResolvePath(CbObjectView Root, std::string_view Path) + { + CbFieldView Cur; + bool Entered = false; // have we applied at least one segment? + + const auto ApplyName = [&](std::string_view Name) -> bool { + if (Name.empty()) + { + return false; + } + if (!Entered) + { + Cur = Root[Name]; + } + else + { + if (!Cur.IsObject()) + { + return false; + } + Cur = Cur.AsObjectView()[Name]; + } + Entered = true; + return Cur.operator bool(); + }; + + const auto ApplyIndex = [&](uint64_t Idx) -> bool { + if (!Entered || !Cur.IsArray()) + { + return false; + } + uint64_t N = 0; + for (CbFieldView Elem : Cur.AsArrayView().CreateViewIterator()) + { + if (N == Idx) + { + Cur = Elem; + return Cur.operator bool(); + } + ++N; + } + return false; + }; + + size_t i = 0; + while (i < Path.size()) + { + const char C = Path[i]; + if (C == '.') + { + ++i; + continue; + } + if (C == '[') + { + const size_t End = Path.find(']', i + 1); + if (End == std::string_view::npos) + { + return {}; + } + uint64_t Idx = 0; + for (size_t j = i + 1; j < End; ++j) + { + const char D = Path[j]; + if (D < '0' || D > '9') + { + return {}; + } + Idx = Idx * 10 + uint64_t(D - '0'); + } + if (!ApplyIndex(Idx)) + { + return {}; + } + i = End + 1; + continue; + } + // Name segment: run until the next '.' or '['. + const size_t NameStart = i; + while (i < Path.size() && Path[i] != '.' && Path[i] != '[') + { + ++i; + } + if (!ApplyName(Path.substr(NameStart, i - NameStart))) + { + return {}; + } + } + return Entered ? Cur : CbFieldView{}; + } + + ////////////////////////////////////////////////////////////////////////// + // + // Primitive rendering. Uses natural string forms for each CbField type. + // Non-string values are emitted without quotes — the caller (the JSON-ish + // fallback) adds quotes only around string values. + // + + void RenderPrimitive(CbFieldView Field, StringBuilderBase& Out) + { + if (Field.IsString()) + { + Out << Field.AsString(); + return; + } + if (Field.IsBool()) + { + Out << (Field.AsBool() ? "true"sv : "false"sv); + return; + } + if (Field.IsInteger()) + { + Out << Field.AsInt64(); + return; + } + if (Field.IsFloat()) + { + // format_to into the builder directly — avoids the std::string + // fmt::format would otherwise build just to hand to Append. + fmt::format_to(StringBuilderAppender(Out), "{}", Field.AsDouble()); + return; + } + if (Field.IsDateTime()) + { + Out.Append(Field.AsDateTime().ToIso8601()); + return; + } + if (Field.IsObjectId()) + { + // ToString(char[]) writes the 24-char hex into a caller buffer; + // the std::string overload would allocate. + char Buf[Oid::StringLength + 1] = {}; + Field.AsObjectId().ToString(Buf); + Out << std::string_view(Buf, Oid::StringLength); + return; + } + if (Field.IsHash()) + { + // Appender overload writes the 40-char hex directly into the + // builder; the std::string overload would allocate. + Field.AsHash().ToHexString(Out); + return; + } + if (Field.IsUuid()) + { + Guid G = Field.AsUuid(); + G.ToString(Out); + return; + } + if (Field.IsNull()) + { + Out << "null"sv; + return; + } + // Binary / attachment / custom / unknown → emit nothing rather than + // a stream of garbage bytes. + } + + ////////////////////////////////////////////////////////////////////////// + // + // JSON-ish fallback for bare objects / nested arrays. Compact single-line + // with quoted string keys and string values, raw other types. Intended + // for debug display — not strictly RFC-8259 JSON. + // + + void AppendJsonishString(std::string_view S, StringBuilderBase& Out) + { + Out << '"'; + for (char C : S) + { + switch (C) + { + case '"': + Out << "\\\""sv; + break; + case '\\': + Out << "\\\\"sv; + break; + case '\n': + Out << "\\n"sv; + break; + case '\r': + Out << "\\r"sv; + break; + case '\t': + Out << "\\t"sv; + break; + default: + Out << C; + break; + } + } + Out << '"'; + } + + void AppendJsonishValue(CbFieldView Field, StringBuilderBase& Out, bool Localized, size_t Depth) + { + if (Field.IsString()) + { + AppendJsonishString(Field.AsString(), Out); + return; + } + // Non-string leaves and nested objects/arrays go through RenderValue + // so object short-circuits ($text / $format / ...) still apply. + RenderValue(Field, Out, Localized, Depth); + } + + ////////////////////////////////////////////////////////////////////////// + // + // Value rendering (the decision tree from the plan). + // + + void RenderValue(CbFieldView Field, StringBuilderBase& Out, bool Localized, size_t Depth) + { + if (Depth >= kMaxRecursionDepth) + { + Out << "…"sv; + return; + } + if (Field.IsObject()) + { + CbObjectView Obj = Field.AsObjectView(); + + if (CbFieldView Text = Obj["$text"sv]; Text.IsString()) + { + Out << Text.AsString(); + return; + } + if (CbFieldView Format = Obj["$format"sv]; Format.IsString()) + { + RenderTemplateInto(Format.AsString(), Obj, Out, /*Localized=*/false, Depth + 1); + return; + } + if (CbFieldView LocFormat = Obj["$locformat"sv]; LocFormat.IsString()) + { + RenderTemplateInto(LocFormat.AsString(), Obj, Out, /*Localized=*/true, Depth + 1); + return; + } + + // Bare object — JSON-ish fallback. + Out << '{'; + bool First = true; + for (CbFieldView Entry : Obj.CreateViewIterator()) + { + if (!First) + { + Out << ", "sv; + } + First = false; + AppendJsonishString(Entry.GetName(), Out); + Out << ": "sv; + AppendJsonishValue(Entry, Out, Localized, Depth + 1); + } + Out << '}'; + return; + } + if (Field.IsArray()) + { + Out << '['; + bool First = true; + for (CbFieldView Elem : Field.AsArrayView().CreateViewIterator()) + { + if (!First) + { + Out << ", "sv; + } + First = false; + AppendJsonishValue(Elem, Out, Localized, Depth + 1); + } + Out << ']'; + return; + } + RenderPrimitive(Field, Out); + } + + ////////////////////////////////////////////////////////////////////////// + // + // Template tokenizer + renderer. + // + + void RenderTemplateInto(std::string_view Template, CbObjectView Fields, StringBuilderBase& Out, bool Localized, size_t Depth) + { + if (Depth >= kMaxRecursionDepth) + { + Out << "…"sv; + return; + } + + size_t i = 0; + while (i < Template.size()) + { + const char C = Template[i]; + + // Localized escape: ` followed by {, }, or ` → literal. + if (Localized && C == '`' && i + 1 < Template.size()) + { + const char Next = Template[i + 1]; + if (Next == '{' || Next == '}' || Next == '`') + { + Out << Next; + i += 2; + continue; + } + } + + // Non-localized escape: {{ or }} → literal { or }. + if (!Localized && C == '{' && i + 1 < Template.size() && Template[i + 1] == '{') + { + Out << '{'; + i += 2; + continue; + } + if (!Localized && C == '}' && i + 1 < Template.size() && Template[i + 1] == '}') + { + Out << '}'; + i += 2; + continue; + } + + if (C == '{') + { + // Placeholder: scan until matching '}'. + const size_t End = Template.find('}', i + 1); + if (End == std::string_view::npos) + { + // Unterminated placeholder — emit the rest literally so we + // don't silently drop data. UE would have asserted at emit. + Out << Template.substr(i); + return; + } + const std::string_view Path = Template.substr(i + 1, End - i - 1); + const CbFieldView Resolved = ResolvePath(Fields, Path); + if (Resolved) + { + RenderValue(Resolved, Out, Localized, Depth + 1); + } + // Missing placeholder: emit nothing. (UE asserts at emit time, + // so in well-formed input this never fires.) + i = End + 1; + continue; + } + + Out << C; + ++i; + } + } + +} // namespace + +void +RenderLogTemplate(std::string_view Template, CbObjectView Fields, StringBuilderBase& Out, bool Localized) +{ + RenderTemplateInto(Template, Fields, Out, Localized, 0); +} + +} // namespace zen diff --git a/src/zenserver/sessions/logtemplate.h b/src/zenserver/sessions/logtemplate.h new file mode 100644 index 000000000..e8b07e63d --- /dev/null +++ b/src/zenserver/sessions/logtemplate.h @@ -0,0 +1,42 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compactbinary.h> +#include <zencore/string.h> + +#include <string_view> + +namespace zen { + +/// Render a UE structured-log template (as produced by UE_LOGFMT) against a +/// fields bag, writing the result into a caller-provided builder. Grammar: +/// +/// format := (text | escape | placeholder)* +/// escape := '{{' | '}}' (non-localized, default) +/// | '`{' | '`}' | '``' (localized, $locformat) +/// placeholder := '{' field_path '}' +/// field_path := name ('.' name | '[' digits ']')* +/// name := [A-Za-z0-9] [A-Za-z0-9_]* (leading '_' reserved) +/// +/// There are NO inline format specs — `{Name:spec}` is not part of the +/// grammar. Formatting control lives on the value side via nested +/// objects carrying `$text` / `$format` / `$locformat` (see the value- +/// rendering rules in logtemplate.cpp). +/// +/// Missing paths render as empty (UE asserts these at emit time, so in +/// practice they don't occur; the empty-render is defensive). Unknown +/// primitive CbField types render as empty. +/// +/// Typical use: pass an `ExtendableStringBuilder<256>` so typical messages +/// render on the stack with no heap allocation. The builder is appended to, +/// not cleared, so callers can compose multiple writes if they want. +/// +/// @param Template The format string. +/// @param Fields The top-level fields bag referenced by placeholders. +/// @param Out Builder to append the rendered text to. +/// @param Localized True for $locformat templates (backtick escapes); +/// false (default) for the top-level `format` field. +void RenderLogTemplate(std::string_view Template, CbObjectView Fields, StringBuilderBase& Out, bool Localized = false); + +} // namespace zen diff --git a/src/zenserver/sessions/sessions.cpp b/src/zenserver/sessions/sessions.cpp index 9d4e3120c..470117c6a 100644 --- a/src/zenserver/sessions/sessions.cpp +++ b/src/zenserver/sessions/sessions.cpp @@ -3,59 +3,733 @@ #include "sessions.h" #include <zencore/basicfile.h> +#include <zencore/compactbinarybuilder.h> #include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> #include <zencore/logging.h> +#include <mutex> + namespace zen { using namespace std::literals; +namespace { + + // Per-session log file layout: + // [LogFileHeader][Record]* + // where Record = [uint32_t ByteLength][CbObject bytes of ByteLength]. + // Records are written in order. A partial trailing record (short write after + // crash) is ignored on load. + constexpr uint32_t kLogFileMagic = 0x5A534C47u; // 'ZSLG' + constexpr uint32_t kLogFileVersion = 1; + +#pragma pack(push, 1) + struct LogFileHeader + { + uint32_t Magic; + uint32_t Version; + }; +#pragma pack(pop) + + constexpr uint64_t kLogFileHeaderSize = sizeof(LogFileHeader); + + void WriteLogEntryFields(CbObjectWriter& Writer, const SessionsService::LogEntryInput& Entry) + { + Writer << "ts" << Entry.Timestamp; + if (Entry.Level != logging::Off) + { + // Store as a small integer (int8_t range) rather than a string — + // fixed-width, one byte in the serialized CbObject. + Writer << "lvl" << static_cast<int32_t>(Entry.Level); + } + if (!Entry.LoggerName.empty()) + { + Writer << "cat" << Entry.LoggerName; + } + if (!Entry.Message.empty()) + { + Writer << "msg" << Entry.Message; + } + // Structured-log template + fields. Only present for UE_LOGFMT-shaped + // entries; Message already holds the rendered text for those so a + // reader that ignores these two can still display the line. + if (!Entry.Format.empty()) + { + Writer << "fmt" << Entry.Format; + if (Entry.Fields.GetSize() > 0) + { + Writer.AddObject("flds"sv, Entry.Fields); + } + } + } + + // Parse a serialized record into an input form. The string_views in + // the result borrow from `Obj`'s underlying buffer; the caller must + // keep that buffer alive (or arena-copy via PreloadEntries) before + // the views are used. + bool ReadLogEntry(CbObjectView Obj, SessionsService::LogEntryInput& OutInput) + { + CbFieldView TsField = Obj["ts"sv]; + if (!TsField) + { + return false; + } + OutInput.Timestamp = TsField.AsDateTime(); + + // New format: integer. Legacy format (pre-refactor log.bin files on + // this same branch): string. Accept either so existing persisted + // entries keep their level when loaded. + CbFieldView LvlField = Obj["lvl"sv]; + if (LvlField.IsInteger()) + { + const int32_t Lvl = LvlField.AsInt32(); + if (Lvl >= 0 && Lvl < logging::LogLevelCount) + { + OutInput.Level = static_cast<logging::LogLevel>(Lvl); + } + } + else if (LvlField.IsString()) + { + OutInput.Level = logging::ParseLogLevelString(LvlField.AsString()); + } + + OutInput.LoggerName = Obj["cat"sv].AsString(); + OutInput.Message = Obj["msg"sv].AsString(); + OutInput.Format = Obj["fmt"sv].AsString(); + if (CbObjectView FieldsView = Obj["flds"sv].AsObjectView(); FieldsView.GetSize() > 0) + { + OutInput.Fields = CbObject::Clone(FieldsView); + } + return true; + } + + void WriteSessionInfoFields(CbObjectWriter& Writer, const SessionsService::SessionInfo& Info) + { + Writer << "id" << Info.Id; + if (!Info.AppName.empty()) + { + Writer << "app" << Info.AppName; + } + if (!Info.Mode.empty()) + { + Writer << "mode" << Info.Mode; + } + if (!Info.Platform.empty()) + { + Writer << "plat" << Info.Platform; + } + if (Info.ClientPid != 0) + { + Writer << "pid" << Info.ClientPid; + } + if (Info.ParentSessionId != Oid::Zero) + { + Writer << "parent_session_id" << Info.ParentSessionId; + } + if (Info.JobId != Oid::Zero) + { + Writer << "jobid" << Info.JobId; + } + Writer << "ca" << Info.CreatedAt; + Writer << "ua" << Info.UpdatedAt; + if (Info.EndedAt.GetTicks() != 0) + { + Writer << "ea" << Info.EndedAt; + } + if (Info.Metadata.GetSize() > 0) + { + Writer.AddObject("meta"sv, Info.Metadata); + } + } + + bool ReadSessionInfo(CbObjectView Obj, SessionsService::SessionInfo& OutInfo) + { + OutInfo.Id = Obj["id"sv].AsObjectId(); + if (OutInfo.Id == Oid::Zero) + { + return false; + } + OutInfo.AppName = std::string(Obj["app"sv].AsString()); + OutInfo.Mode = std::string(Obj["mode"sv].AsString()); + OutInfo.Platform = std::string(Obj["plat"sv].AsString()); + OutInfo.ClientPid = Obj["pid"sv].AsUInt32(); + OutInfo.ParentSessionId = Obj["parent_session_id"sv].AsObjectId(); + OutInfo.JobId = Obj["jobid"sv].AsObjectId(); + OutInfo.CreatedAt = Obj["ca"sv].AsDateTime(); + OutInfo.UpdatedAt = Obj["ua"sv].AsDateTime(); + OutInfo.EndedAt = Obj["ea"sv].AsDateTime(DateTime{0}); + CbObjectView MetaView = Obj["meta"sv].AsObjectView(); + if (MetaView.GetSize() > 0) + { + OutInfo.Metadata = CbObject::Clone(MetaView); + } + return true; + } + + std::filesystem::path SessionDir(const std::filesystem::path& Root, const Oid& Id) { return Root / Id.ToString(); } + +#if ZEN_PLATFORM_WINDOWS + // Turn a Windows process exit code into a human-friendly termination + // reason. Most abnormal terminations surface as NTSTATUS values (high + // bit set); the ones below are what you'll actually encounter in the + // wild. Anything we don't recognize falls through to a formatted hex + // (NTSTATUS-shaped) or decimal (application-level) exit code. + std::string DescribeWindowsExitCode(uint32_t ExitCode) + { + struct Named + { + uint32_t Code; + std::string_view Name; + }; + using namespace std::literals; + static constexpr Named kKnown[] = { + {0xC000013Au, "interrupted (Ctrl-C)"sv}, + {0xC0000005u, "access violation"sv}, + {0xC000001Du, "illegal instruction"sv}, + {0xC0000094u, "integer divide by zero"sv}, + {0xC0000096u, "privileged instruction"sv}, + {0xC00000FDu, "stack overflow"sv}, + {0xC0000409u, "stack buffer overrun"sv}, + {0xC0000374u, "heap corruption"sv}, + {0xC0000135u, "DLL not found"sv}, + {0xC0000142u, "DLL initialization failed"sv}, + {0xC000007Bu, "invalid image format"sv}, + {0xC0000420u, "assertion failure"sv}, + {0xC0000008u, "invalid handle"sv}, + {0xC000008Eu, "float divide by zero"sv}, + {0xC0000091u, "float overflow"sv}, + {0xC0000093u, "float underflow"sv}, + {0x80000003u, "breakpoint"sv}, + {0x40000015u, "fatal app exit"sv}, + }; + for (const Named& Entry : kKnown) + { + if (Entry.Code == ExitCode) + { + return fmt::format("process exited ({}, exit code 0x{:08X})", Entry.Name, ExitCode); + } + } + // NTSTATUS-shaped codes have the high bit set; show them as hex so + // they're recognizable (and matchable against Microsoft's doc tables). + if ((ExitCode & 0x80000000u) != 0) + { + return fmt::format("process exited (exit code 0x{:08X})", ExitCode); + } + return fmt::format("process exited (exit code {})", ExitCode); + } +#endif + +} // namespace + class SessionLog : public TRefCounted<SessionLog> { public: - SessionLog(std::filesystem::path LogFilePath) { m_LogFile.Open(LogFilePath, BasicFile::Mode::kWrite); } + explicit SessionLog(std::filesystem::path LogFilePath); + + void Append(const SessionsService::LogEntryInput& Entry); + + // LoadTail returns input-form entries: their string_views borrow + // from m_OwnedBuffers (held internally) so the caller's PreloadEntries + // can intern/copy them into the session arena before the buffers are + // dropped at LoadResult destruction. + struct LoadResult + { + std::vector<SessionsService::LogEntryInput> TailEntries; + // Backing memory for the views in TailEntries. Each ParsedRecord + // keeps a CbObject alive whose payload bytes back the strings. + std::vector<CbObject> OwnedBuffers; + uint64_t TotalCount = 0; + }; + LoadResult LoadTail(size_t MaxEntries); private: - BasicFile m_LogFile; + static LoggerRef Log() + { + static LoggerRef L(logging::Get("sessions")); + return L; + } + + std::filesystem::path m_Path; + std::mutex m_Mutex; + BasicFile m_File; + uint64_t m_WriteOffset = 0; + bool m_Enabled = false; }; -class SessionLogStore +SessionLog::SessionLog(std::filesystem::path LogFilePath) : m_Path(std::move(LogFilePath)) { -public: - SessionLogStore(std::filesystem::path StoragePath) : m_StoragePath(std::move(StoragePath)) {} + std::error_code Ec; + std::filesystem::create_directories(m_Path.parent_path(), Ec); - ~SessionLogStore() = default; + m_File.Open(m_Path, BasicFile::Mode::kWrite, Ec); + if (Ec) + { + ZEN_WARN("Session log '{}' could not be opened: {} - persistence disabled", m_Path, Ec.message()); + return; + } - Ref<SessionLog> GetLogForSession(const Oid& SessionId) + const uint64_t Size = m_File.FileSize(Ec); + if (Ec) { - // For now, just return a new log for each session. We can implement actual log storage and retrieval later. - return Ref(new SessionLog(m_StoragePath / (SessionId.ToString() + ".log"))); + m_File.Close(); + ZEN_WARN("Session log '{}' could not be sized: {} - persistence disabled", m_Path, Ec.message()); + return; } - Ref<SessionLog> CreateLogForSession(const Oid& SessionId) + LogFileHeader Header{}; + bool NeedsInit = Size < kLogFileHeaderSize; + if (!NeedsInit) + { + // Read is throwing-only; guard so a read failure doesn't escape. + try + { + m_File.Read(&Header, sizeof(Header), 0); + } + catch (const std::exception& E) + { + ZEN_WARN("Session log '{}' header read failed: {} - reinitializing", m_Path, E.what()); + NeedsInit = true; + } + if (!NeedsInit && (Header.Magic != kLogFileMagic || Header.Version != kLogFileVersion)) + { + NeedsInit = true; + } + } + + if (NeedsInit) + { + m_File.SetFileSize(0); + Header = LogFileHeader{.Magic = kLogFileMagic, .Version = kLogFileVersion}; + m_File.Write(&Header, sizeof(Header), 0, Ec); + if (Ec) + { + m_File.Close(); + ZEN_WARN("Session log '{}' header write failed: {} - persistence disabled", m_Path, Ec.message()); + return; + } + m_WriteOffset = kLogFileHeaderSize; + } + else + { + m_WriteOffset = Size; + } + + m_Enabled = true; +} + +void +SessionLog::Append(const SessionsService::LogEntryInput& Entry) +{ + if (!m_Enabled) { - // For now, just return a new log for each session. We can implement actual log storage and retrieval later. - return Ref(new SessionLog(m_StoragePath / (SessionId.ToString() + ".log"))); + return; } + CbObjectWriter Writer; + WriteLogEntryFields(Writer, Entry); + CbObject Obj = Writer.Save(); + + // Write directly from the CbObject's owned buffer — no need to allocate + // a fresh UniqueBuffer and memcpy just to hand the bytes to BasicFile::Write. + const MemoryView View = Obj.GetView(); + const uint64_t ObjSize = View.GetSize(); + if (ObjSize == 0 || ObjSize > std::numeric_limits<uint32_t>::max()) + { + return; + } + const uint32_t Len = static_cast<uint32_t>(ObjSize); + + std::lock_guard<std::mutex> Lock(m_Mutex); + std::error_code Ec; + m_File.Write(&Len, sizeof(Len), m_WriteOffset, Ec); + if (Ec) + { + return; + } + m_File.Write(View.GetData(), ObjSize, m_WriteOffset + sizeof(Len), Ec); + if (Ec) + { + return; + } + m_WriteOffset += sizeof(Len) + ObjSize; +} + +SessionLog::LoadResult +SessionLog::LoadTail(size_t MaxEntries) +{ + std::lock_guard<std::mutex> Lock(m_Mutex); + LoadResult Result; + + if (!m_Enabled) + { + return Result; + } + + std::error_code Ec; + const uint64_t Size = m_File.FileSize(Ec); + if (Ec || Size <= kLogFileHeaderSize) + { + return Result; + } + + IoBuffer Buffer; + try + { + Buffer = m_File.ReadRange(kLogFileHeaderSize, Size - kLogFileHeaderSize); + } + catch (const std::exception& E) + { + ZEN_WARN("Session log '{}' tail read failed: {}", m_Path, E.what()); + return Result; + } + const uint8_t* Data = reinterpret_cast<const uint8_t*>(Buffer.GetData()); + const uint64_t DataSize = Buffer.GetSize(); + + // Walk all valid record positions, ignoring any partial trailing record. + struct RecRef + { + const uint8_t* Ptr; + uint32_t Len; + }; + std::vector<RecRef> Records; + uint64_t Pos = 0; + while (Pos + sizeof(uint32_t) <= DataSize) + { + uint32_t RecLen = 0; + std::memcpy(&RecLen, Data + Pos, sizeof(RecLen)); + const uint64_t Next = Pos + sizeof(uint32_t) + RecLen; + if (RecLen == 0 || Next > DataSize) + { + break; + } + Records.push_back(RecRef{.Ptr = Data + Pos + sizeof(uint32_t), .Len = RecLen}); + Pos = Next; + } + + // Parse every record so TotalCount reflects how many actually decode + // — cursors anchor to this number, and counting CB-corrupt records + // would shift subsequent cursor math off. Only the trailing window + // of size MaxEntries is materialized into TailEntries; head records + // are parsed purely for the count and discarded. + const size_t TailStart = Records.size() > MaxEntries ? Records.size() - MaxEntries : 0; + Result.TailEntries.reserve(Records.size() - TailStart); + Result.OwnedBuffers.reserve(Records.size() - TailStart); + + for (size_t i = 0; i < Records.size(); ++i) + { + try + { + IoBuffer RecBuf = IoBufferBuilder::MakeCloneFromMemory(MemoryView(Records[i].Ptr, Records[i].Len)); + CbObject Obj = LoadCompactBinaryObject(std::move(RecBuf)); + + SessionsService::LogEntryInput Input; + if (ReadLogEntry(Obj, Input)) + { + ++Result.TotalCount; + if (i >= TailStart) + { + // Keep the CbObject alive for as long as the views + // in the input are needed — PreloadEntries will copy + // the strings into the session arena and we drop the + // buffer set when LoadResult is destroyed. + Result.TailEntries.push_back(std::move(Input)); + Result.OwnedBuffers.push_back(std::move(Obj)); + } + } + } + catch (const std::exception&) + { + // Skip malformed record — does not contribute to TotalCount. + } + } + + return Result; +} + +////////////////////////////////////////////////////////////////////////// + +class SessionLogStore +{ +public: + explicit SessionLogStore(std::filesystem::path StoragePath); + + Ref<SessionLog> GetOrCreateLogForSession(const Oid& SessionId); + void WriteSessionInfoFile(const SessionsService::SessionInfo& Info); + void DeleteSession(const Oid& SessionId); + uint64_t GetSessionSize(const Oid& SessionId) const; + + struct PersistedSession + { + SessionsService::SessionInfo Info; + std::filesystem::path LogPath; + }; + std::vector<PersistedSession> Scan() const; + private: + static LoggerRef Log() + { + static LoggerRef L(logging::Get("sessions")); + return L; + } + std::filesystem::path m_StoragePath; }; -SessionsService::Session::Session(const SessionInfo& Info) : m_Info(Info) +SessionLogStore::SessionLogStore(std::filesystem::path StoragePath) : m_StoragePath(std::move(StoragePath)) { + std::error_code Ec; + std::filesystem::create_directories(m_StoragePath, Ec); +} + +Ref<SessionLog> +SessionLogStore::GetOrCreateLogForSession(const Oid& SessionId) +{ + const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId); + std::error_code Ec; + std::filesystem::create_directories(Dir, Ec); + return Ref(new SessionLog(Dir / "log.bin")); +} + +void +SessionLogStore::DeleteSession(const Oid& SessionId) +{ + const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId); + std::error_code Ec; + std::filesystem::remove_all(Dir, Ec); + if (Ec) + { + ZEN_WARN("Failed to remove session directory '{}': {}", Dir, Ec.message()); + } +} + +uint64_t +SessionLogStore::GetSessionSize(const Oid& SessionId) const +{ + const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId); + std::error_code Ec; + uint64_t Total = 0; + std::filesystem::directory_iterator It{Dir, Ec}; + if (Ec) + { + return 0; + } + for (const std::filesystem::directory_entry& Entry : It) + { + std::error_code FileEc; + if (Entry.is_regular_file(FileEc)) + { + const uintmax_t Size = Entry.file_size(FileEc); + if (!FileEc) + { + Total += uint64_t(Size); + } + } + } + return Total; } -SessionsService::Session::~Session() = default; void -SessionsService::Session::AppendLog(LogEntry Entry) +SessionLogStore::WriteSessionInfoFile(const SessionsService::SessionInfo& Info) { + const std::filesystem::path Dir = SessionDir(m_StoragePath, Info.Id); + std::error_code Ec; + std::filesystem::create_directories(Dir, Ec); + + CbObjectWriter Writer; + WriteSessionInfoFields(Writer, Info); + CbObject Obj = Writer.Save(); + const MemoryView View = Obj.GetView(); + if (View.GetSize() == 0) + { + return; + } + TemporaryFile::SafeWriteFile(Dir / "info.cb", View, Ec); +} + +std::vector<SessionLogStore::PersistedSession> +SessionLogStore::Scan() const +{ + std::vector<PersistedSession> Result; + std::error_code Ec; + + if (!std::filesystem::exists(m_StoragePath, Ec)) + { + return Result; + } + + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator{m_StoragePath, Ec}) + { + if (Ec || !Entry.is_directory(Ec)) + { + continue; + } + + const std::filesystem::path InfoPath = Entry.path() / "info.cb"; + const std::filesystem::path LogPath = Entry.path() / "log.bin"; + + if (!std::filesystem::exists(InfoPath, Ec)) + { + continue; + } + + try + { + BasicFile InfoFile; + std::error_code OpenEc; + InfoFile.Open(InfoPath, BasicFile::Mode::kRead, OpenEc); + if (OpenEc) + { + continue; + } + IoBuffer InfoBuf = InfoFile.ReadAll(); + if (InfoBuf.GetSize() == 0) + { + continue; + } + CbObject InfoObj = LoadCompactBinaryObject(std::move(InfoBuf)); + PersistedSession PS{.Info = SessionsService::SessionInfo{ + .Id = Oid::Zero, + .CreatedAt = DateTime{0}, + .UpdatedAt = DateTime{0}, + }}; + if (!ReadSessionInfo(InfoObj, PS.Info)) + { + continue; + } + PS.LogPath = LogPath; + Result.push_back(std::move(PS)); + } + catch (const std::exception& E) + { + ZEN_WARN("Skipping session directory '{}': {}", Entry.path(), E.what()); + } + } + + return Result; +} + +////////////////////////////////////////////////////////////////////////// + +SessionsService::Session::Session(const SessionInfo& Info, Ref<SessionLog> Log, ProcessHandle ClientProcess) +: m_Info(Info) +, m_Log(std::move(Log)) +, m_ClientProcess(std::move(ClientProcess)) +{ +} +SessionsService::Session::~Session() = default; + +const char* +SessionsService::Session::InternLoggerNameLocked(std::string_view Name) +{ + if (Name.empty()) + { + return ""; + } + if (auto It = m_InternedLoggerNames.find(Name); It != m_InternedLoggerNames.end()) + { + return It->second; + } + const char* Arena = m_LogArena.DuplicateString(Name); + // The map's key view borrows from Arena (which lives as long as the + // session does), so it's safe to outlive `Name`. + m_InternedLoggerNames.emplace(std::string_view{Arena, Name.size()}, Arena); + return Arena; +} + +const char* +SessionsService::Session::AllocateLogStringLocked(std::string_view Str) +{ + if (Str.empty()) + { + return ""; + } + return m_LogArena.DuplicateString(Str); +} + +uint64_t +SessionsService::Session::AppendLog(LogEntryInput Input) +{ + // Persist first (outside the deque lock ordering) so disk I/O doesn't + // starve cursor readers holding the shared lock. The SessionLog has its + // own internal mutex that serializes file writes. + if (m_Log) + { + m_Log->Append(Input); + } + RwLock::ExclusiveLockScope Lock(m_LogLock); - m_LogEntries.push_back(std::move(Entry)); - ++m_TotalAppended; + m_LogEntries.emplace_back(); + LogEntry& Entry = m_LogEntries.back(); + Entry.Timestamp = Input.Timestamp; + Entry.Level = Input.Level; + Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName); + Entry.Message = AllocateLogStringLocked(Input.Message); + Entry.Format = AllocateLogStringLocked(Input.Format); + Entry.Fields = std::move(Input.Fields); + const uint64_t NewCursor = ++m_TotalAppended; while (m_LogEntries.size() > MaxLogEntries) { m_LogEntries.pop_front(); } + return NewCursor; +} + +uint64_t +SessionsService::Session::AppendLogBatch(std::span<LogEntryInput> Inputs) +{ + if (Inputs.empty()) + { + return 0; + } + + // Persist first (per-entry; SessionLog's internal mutex serializes + // these writes). We do this outside m_LogLock so file I/O doesn't + // stall cursor readers. + if (m_Log) + { + for (LogEntryInput& Input : Inputs) + { + m_Log->Append(Input); + } + } + + RwLock::ExclusiveLockScope Lock(m_LogLock); + for (LogEntryInput& Input : Inputs) + { + m_LogEntries.emplace_back(); + LogEntry& Entry = m_LogEntries.back(); + Entry.Timestamp = Input.Timestamp; + Entry.Level = Input.Level; + Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName); + Entry.Message = AllocateLogStringLocked(Input.Message); + Entry.Format = AllocateLogStringLocked(Input.Format); + Entry.Fields = std::move(Input.Fields); + ++m_TotalAppended; + } + while (m_LogEntries.size() > MaxLogEntries) + { + m_LogEntries.pop_front(); + } + return m_TotalAppended; +} + +void +SessionsService::Session::PreloadEntries(std::span<const LogEntryInput> Tail, uint64_t TotalCount) +{ + RwLock::ExclusiveLockScope Lock(m_LogLock); + m_LogEntries.clear(); + for (const LogEntryInput& Input : Tail) + { + m_LogEntries.emplace_back(); + LogEntry& Entry = m_LogEntries.back(); + Entry.Timestamp = Input.Timestamp; + Entry.Level = Input.Level; + Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName); + Entry.Message = AllocateLogStringLocked(Input.Message); + Entry.Format = AllocateLogStringLocked(Input.Format); + Entry.Fields = Input.Fields; + } + m_TotalAppended = TotalCount; } std::vector<SessionsService::LogEntry> @@ -118,17 +792,134 @@ SessionsService::Session::GetLogEntriesAfter(uint64_t AfterCursor) const }; } +uint64_t +SessionsService::AppendLog(const Oid& SessionId, LogEntryInput Input) +{ + // Resolve the session without holding any external lock — GetSession + // acquires m_Lock shared briefly and returns a ref-counted handle. + Ref<Session> Target = GetSession(SessionId); + if (!Target) + { + return 0; + } + + const uint64_t NewCursor = Target->AppendLog(std::move(Input)); + + // Fire after Session::m_LogLock is released (inside AppendLog) so the + // callback can safely call back into this service (e.g. to resolve + // the session again and fetch the delta for its subscribers) without + // any nested-lock concerns. + if (m_LogAppendedCallback) + { + m_LogAppendedCallback(SessionId, NewCursor); + } + return NewCursor; +} + +uint64_t +SessionsService::AppendLogBatch(const Oid& SessionId, std::span<LogEntryInput> Inputs) +{ + if (Inputs.empty()) + { + return 0; + } + Ref<Session> Target = GetSession(SessionId); + if (!Target) + { + return 0; + } + + const uint64_t NewCursor = Target->AppendLogBatch(Inputs); + + // One callback fires for the whole batch — subscribers see all + // entries in a single delta rather than N separate deltas. Fired + // after Session::m_LogLock is released for the same reason as the + // single-entry path. + if (m_LogAppendedCallback) + { + m_LogAppendedCallback(SessionId, NewCursor); + } + return NewCursor; +} + +void +SessionsService::SetLogAppendedCallback(LogAppendedCallback Callback) +{ + m_LogAppendedCallback = std::move(Callback); +} + ////////////////////////////////////////////////////////////////////////// -SessionsService::SessionsService() : m_Log(logging::Get("sessions")) +SessionsService::SessionsService(std::filesystem::path StorageRoot) : m_Log(logging::Get("sessions")) { + if (StorageRoot.empty()) + { + return; + } + + m_SessionLogs = std::make_unique<SessionLogStore>(StorageRoot); + + // Load all previously-persisted sessions as ended sessions. Their log + // tails are preloaded into the in-memory deque so the UI can view them. + std::vector<SessionLogStore::PersistedSession> Persisted = m_SessionLogs->Scan(); + for (SessionLogStore::PersistedSession& PS : Persisted) + { + // Sessions that were active at shutdown need a synthetic ended time so + // they sort correctly in the UI. + if (PS.Info.EndedAt.GetTicks() == 0) + { + PS.Info.EndedAt = PS.Info.UpdatedAt; + } + + Ref<Session> S = Ref(new Session(PS.Info)); + + // Load the log tail (no SessionLog attached — historical sessions do + // not receive new appends and the file handle is released here). + // PreloadEntries copies/interns each input's strings into the + // session's arena, after which Loaded.OwnedBuffers can be + // released along with the rest of the LoadResult. + SessionLog Reader(PS.LogPath); + SessionLog::LoadResult Loaded = Reader.LoadTail(Session::MaxLogEntries); + S->PreloadEntries(Loaded.TailEntries, Loaded.TotalCount); + + m_EndedSessions.push_back(std::move(S)); + } + + ZEN_INFO("Sessions service loaded {} persisted session(s) from '{}'", m_EndedSessions.size(), StorageRoot); } SessionsService::~SessionsService() = default; bool -SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, const Oid& JobId, CbObjectView Metadata) +SessionsService::RegisterSession(const Oid& SessionId, + std::string AppName, + std::string Mode, + std::string Platform, + uint32_t ClientPid, + const Oid& ParentSessionId, + const Oid& JobId, + CbObjectView Metadata) { + // Open a process handle eagerly — BEFORE any pid-reuse window opens. On + // Windows the handle is tied to the specific process instance, so a + // later pid recycle can't fool later liveness checks. Do the syscall + // outside the service lock (it's a kernel round-trip). On POSIX this is + // effectively a no-op (just stores the pid). + ProcessHandle ClientProcess; + if (ClientPid != 0) + { + std::error_code Ec; + ClientProcess.Initialize(static_cast<int>(ClientPid), Ec); + if (Ec) + { + ZEN_WARN("Session {} registered with pid {} but OpenProcess failed: {} — liveness tracking disabled", + SessionId, + ClientPid, + Ec.message()); + } + } + + SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}}; // Log outside the lock scope - InProcSessionLogSink calls back into // GetSession() which acquires m_Lock shared, so logging while holding // m_Lock exclusively would deadlock. @@ -141,35 +932,69 @@ SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, std: } const DateTime Now = DateTime::Now(); - m_Sessions.emplace(SessionId, - Ref(new Session(SessionInfo{.Id = SessionId, - .AppName = AppName, - .Mode = Mode, - .JobId = JobId, - .Metadata = CbObject::Clone(Metadata), - .CreatedAt = Now, - .UpdatedAt = Now}))); + SessionInfo Info{.Id = SessionId, + .AppName = AppName, + .Mode = Mode, + .Platform = Platform, + .ClientPid = ClientPid, + .ParentSessionId = ParentSessionId, + .JobId = JobId, + .Metadata = CbObject::Clone(Metadata), + .CreatedAt = Now, + .UpdatedAt = Now}; + + Ref<SessionLog> Log; + if (m_SessionLogs) + { + Log = m_SessionLogs->GetOrCreateLogForSession(SessionId); + } + m_Sessions.emplace(SessionId, Ref(new Session(Info, std::move(Log), std::move(ClientProcess)))); + PersistedInfo = std::move(Info); + } + + if (m_SessionLogs) + { + m_SessionLogs->WriteSessionInfoFile(PersistedInfo); } - ZEN_INFO("Session {} registered (AppName: {}, Mode: {}, JobId: {})", SessionId, AppName, Mode, JobId); + // Include the tracked pid so the log makes it obvious whether the client + // opted into liveness tracking (and whether our OpenProcess succeeded). + // "pid: 0" = no liveness tracking (remote or client didn't report); + // "pid: N" with an immediately-prior warning = OpenProcess failed. + ZEN_INFO("Session {} registered (AppName: {}, Mode: {}, Platform: {}, Pid: {}, ParentSessionId: {}, JobId: {})", + SessionId, + AppName, + Mode, + Platform, + ClientPid, + ParentSessionId, + JobId); return true; } bool SessionsService::UpdateSession(const Oid& SessionId, CbObjectView Metadata) { - RwLock::ExclusiveLockScope Lock(m_Lock); - - auto It = m_Sessions.find(SessionId); - if (It == m_Sessions.end()) + SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}}; { - return false; + RwLock::ExclusiveLockScope Lock(m_Lock); + + auto It = m_Sessions.find(SessionId); + if (It == m_Sessions.end()) + { + return false; + } + + It.value()->UpdateMetadata(Metadata); + PersistedInfo = It.value()->Info(); } - It.value()->UpdateMetadata(Metadata); + if (m_SessionLogs) + { + m_SessionLogs->WriteSessionInfoFile(PersistedInfo); + } - const SessionInfo& Info = It.value()->Info(); - ZEN_DEBUG("Session {} updated (AppName: {}, JobId: {})", SessionId, Info.AppName, Info.JobId); + ZEN_DEBUG("Session {} updated (AppName: {}, JobId: {})", SessionId, PersistedInfo.AppName, PersistedInfo.JobId); return true; } @@ -178,13 +1003,23 @@ SessionsService::GetSession(const Oid& SessionId) const { RwLock::SharedLockScope Lock(m_Lock); - auto It = m_Sessions.find(SessionId); - if (It == m_Sessions.end()) + if (auto It = m_Sessions.find(SessionId); It != m_Sessions.end()) { - return {}; + return It->second; + } + + // Fall back to ended sessions so HTTP consumers can fetch logs/metadata + // for sessions that have finished (including sessions loaded from disk on + // startup as historical ended sessions). + for (const Ref<Session>& Ended : m_EndedSessions) + { + if (Ended->Info().Id == SessionId) + { + return Ended; + } } - return It->second; + return {}; } std::vector<Ref<SessionsService::Session>> @@ -202,10 +1037,14 @@ SessionsService::GetSessions() const } bool -SessionsService::RemoveSession(const Oid& SessionId) +SessionsService::RemoveSession(const Oid& SessionId, std::string_view Reason) { - std::string RemovedAppName; - Oid RemovedJobId; + std::string RemovedAppName; + Oid RemovedJobId; + SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}}; + bool Persist = false; + Ref<Session> Ended; + const DateTime EndTime = DateTime::Now(); { RwLock::ExclusiveLockScope Lock(m_Lock); @@ -219,13 +1058,50 @@ SessionsService::RemoveSession(const Oid& SessionId) RemovedAppName = It.value()->Info().AppName; RemovedJobId = It.value()->Info().JobId; - Ref<Session> Ended = It.value(); - Ended->SetEndedAt(DateTime::Now()); - m_EndedSessions.push_back(std::move(Ended)); + Ended = It.value(); + Ended->SetEndedAt(EndTime); + if (m_SessionLogs) + { + PersistedInfo = Ended->Info(); + Persist = true; + } + m_EndedSessions.push_back(Ended); m_Sessions.erase(It); } + // Synthetic "Session ended" entry is appended *after* m_Lock is + // released. AppendLog hits disk via SessionLog::Append, so doing it + // inside the exclusive scope would block every other session lookup + // / registration / list while the I/O completes. The callback that + // pushes the delta to WS subscribers also fires from outside the + // lock — same self-deadlock concern as the GetSession path. + uint64_t SyntheticEndCursor = 0; + { + ExtendableStringBuilder<128> MsgBuilder; + MsgBuilder << "Session ended"sv; + if (!Reason.empty()) + { + MsgBuilder << ": "sv << Reason; + } + SyntheticEndCursor = Ended->AppendLog(LogEntryInput{ + .Timestamp = EndTime, + .Level = logging::Info, + .LoggerName = "sessions"sv, + .Message = MsgBuilder.ToView(), + }); + } + + if (m_LogAppendedCallback) + { + m_LogAppendedCallback(SessionId, SyntheticEndCursor); + } + + if (Persist) + { + m_SessionLogs->WriteSessionInfoFile(PersistedInfo); + } + ZEN_INFO("Session {} removed (AppName: {}, JobId: {})", SessionId, RemovedAppName, RemovedJobId); return true; } @@ -244,4 +1120,202 @@ SessionsService::GetSessionCount() const return m_Sessions.size(); } +// The "when was this session last relevant" timestamp used for age checks +// and count-based eviction ordering: EndedAt if set, otherwise UpdatedAt. +static DateTime +ReferenceTime(const SessionsService::SessionInfo& Info) +{ + return Info.EndedAt.GetTicks() != 0 ? Info.EndedAt : Info.UpdatedAt; +} + +size_t +SessionsService::CheckProcessLiveness() +{ + // Snapshot active sessions under a shared lock, then probe liveness and + // drop dead ones without holding the service lock (IsRunning() is a + // kernel round-trip and RemoveSession() takes the lock exclusively). + std::vector<Ref<Session>> Candidates; + { + RwLock::SharedLockScope Lock(m_Lock); + Candidates.reserve(m_Sessions.size()); + for (const auto& [Id, SessionRef] : m_Sessions) + { + if (SessionRef->GetClientProcess().IsValid()) + { + Candidates.push_back(SessionRef); + } + } + } + + size_t Ended = 0; + for (const Ref<Session>& S : Candidates) + { + // m_ClientProcess is set once at construction and never mutated, so + // reading it here without synchronization is safe. + if (!S->GetClientProcess().IsRunning()) + { + // Build the termination reason. On Windows, GetExitCode() returns + // the real OS exit code and is cheap right after !IsRunning(); we + // map the common NTSTATUS codes (Ctrl-C, access violation, DLL + // init failure, …) to human-readable names. On POSIX the exit + // code is only populated via Wait() which we never call, so + // stick with the plain reason. + std::string Reason = "process exited"; +#if ZEN_PLATFORM_WINDOWS + const uint32_t ExitCode = static_cast<uint32_t>(S->GetClientProcess().GetExitCode()); + if (ExitCode != 0) + { + Reason = DescribeWindowsExitCode(ExitCode); + } +#endif + if (RemoveSession(S->Info().Id, Reason)) + { + ++Ended; + } + } + } + return Ended; +} + +SessionsService::PruneResult +SessionsService::PruneExpired(TimeSpan MaxAge, size_t MaxCount, uint64_t MaxStorageBytes) +{ + PruneResult Result; + std::vector<Oid> ToDelete; + + // Phase 1: age + count pruning (fast; purely in-memory). + { + RwLock::ExclusiveLockScope Lock(m_Lock); + + const uint64_t NowTicks = DateTime::Now().GetTicks(); + const uint64_t CutoffTicks = NowTicks > MaxAge.GetTicks() ? NowTicks - MaxAge.GetTicks() : 0; + const DateTime Cutoff{CutoffTicks}; + + // Age-based pruning: drop ended sessions whose reference time is + // older than Cutoff. + auto ExpiredIt = std::remove_if(m_EndedSessions.begin(), m_EndedSessions.end(), [&](const Ref<Session>& S) { + if (ReferenceTime(S->Info()) < Cutoff) + { + ToDelete.push_back(S->Info().Id); + return true; + } + return false; + }); + Result.ExpiredByAge = size_t(m_EndedSessions.end() - ExpiredIt); + m_EndedSessions.erase(ExpiredIt, m_EndedSessions.end()); + + // Count-based pruning: keep at most MaxCount sessions total. Active + // sessions are never touched; if there are already >= MaxCount active + // sessions then all ended sessions get evicted. + const size_t ActiveCount = m_Sessions.size(); + const size_t EndedTarget = MaxCount > ActiveCount ? MaxCount - ActiveCount : 0; + if (m_EndedSessions.size() > EndedTarget) + { + const size_t ToRemove = m_EndedSessions.size() - EndedTarget; + // Move the `ToRemove` oldest entries to the front. + std::partial_sort( + m_EndedSessions.begin(), + m_EndedSessions.begin() + ToRemove, + m_EndedSessions.end(), + [](const Ref<Session>& A, const Ref<Session>& B) { return ReferenceTime(A->Info()) < ReferenceTime(B->Info()); }); + for (size_t i = 0; i < ToRemove; ++i) + { + ToDelete.push_back(m_EndedSessions[i]->Info().Id); + } + m_EndedSessions.erase(m_EndedSessions.begin(), m_EndedSessions.begin() + ToRemove); + Result.ExpiredByCount = ToRemove; + } + } + + // Phase 1 disk deletion, outside the service lock. + if (m_SessionLogs) + { + for (const Oid& Id : ToDelete) + { + m_SessionLogs->DeleteSession(Id); + } + } + + // Phase 2: storage-footprint pruning. Snapshot remaining ended sessions + // (id + reference time) under a shared lock, then stat each directory + // outside the lock so we don't hold writers off during filesystem calls. + if (!m_SessionLogs) + { + return Result; + } + + struct Candidate + { + Oid Id; + DateTime RefTime; + uint64_t Size = 0; + }; + std::vector<Candidate> Candidates; + { + RwLock::SharedLockScope Lock(m_Lock); + Candidates.reserve(m_EndedSessions.size()); + for (const Ref<Session>& S : m_EndedSessions) + { + Candidates.push_back(Candidate{.Id = S->Info().Id, .RefTime = ReferenceTime(S->Info())}); + } + } + + uint64_t TotalBytes = 0; + for (Candidate& C : Candidates) + { + C.Size = m_SessionLogs->GetSessionSize(C.Id); + TotalBytes += C.Size; + } + + if (TotalBytes <= MaxStorageBytes) + { + return Result; + } + + // Oldest first so we evict in chronological order. + std::sort(Candidates.begin(), Candidates.end(), [](const Candidate& A, const Candidate& B) { return A.RefTime < B.RefTime; }); + + std::vector<Oid> StorageDelete; + uint64_t Reclaimed = 0; + const uint64_t NeedBytes = TotalBytes - MaxStorageBytes; + for (const Candidate& C : Candidates) + { + if (Reclaimed >= NeedBytes) + { + break; + } + StorageDelete.push_back(C.Id); + Reclaimed += C.Size; + } + + if (StorageDelete.empty()) + { + return Result; + } + + // Erase from m_EndedSessions under exclusive lock. Concurrent RemoveSession + // calls between the snapshot and here will have inserted new entries at + // the back, which we safely leave alone. + { + RwLock::ExclusiveLockScope Lock(m_Lock); + tsl::robin_map<Oid, uint8_t, Oid::Hasher> IdSet; + for (const Oid& Id : StorageDelete) + { + IdSet[Id] = 1; + } + auto It = std::remove_if(m_EndedSessions.begin(), m_EndedSessions.end(), [&](const Ref<Session>& S) { + return IdSet.contains(S->Info().Id); + }); + m_EndedSessions.erase(It, m_EndedSessions.end()); + } + + for (const Oid& Id : StorageDelete) + { + m_SessionLogs->DeleteSession(Id); + } + Result.ExpiredByStorage = StorageDelete.size(); + + return Result; +} + } // namespace zen diff --git a/src/zenserver/sessions/sessions.h b/src/zenserver/sessions/sessions.h index a84ca6506..a722704e0 100644 --- a/src/zenserver/sessions/sessions.h +++ b/src/zenserver/sessions/sessions.h @@ -4,6 +4,8 @@ #include <zencore/compactbinary.h> #include <zencore/logbase.h> +#include <zencore/memory/memoryarena.h> +#include <zencore/process.h> #include <zencore/thread.h> #include <zencore/uid.h> @@ -11,7 +13,10 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <EASTL/deque.h> #include <tsl/robin_map.h> ZEN_THIRD_PARTY_INCLUDES_END +#include <filesystem> +#include <functional> #include <optional> +#include <span> #include <string> #include <vector> @@ -34,25 +39,62 @@ public: Oid Id; std::string AppName; std::string Mode; - Oid JobId; - CbObject Metadata; - DateTime CreatedAt; - DateTime UpdatedAt; - DateTime EndedAt{0}; + std::string Platform; // Reported by the client, e.g. "windows", "linux", "macos" + uint32_t ClientPid = 0; // Non-zero = local PID to probe for liveness. 0 = don't track. + Oid ParentSessionId; + // Optional task/action identifier used to associate this session with a + // specific unit of work. Distinct from ParentSessionId, which records + // process/session ancestry. + Oid JobId; + CbObject Metadata; + DateTime CreatedAt; + DateTime UpdatedAt; + DateTime EndedAt{0}; }; + /// Stored form of a log entry. The string fields are arena-borrowed + /// `const char*` — they live in the owning Session's MemoryArena and + /// are valid only for that Session's lifetime. Default copy is + /// intentionally shallow (string pointers are shared with the source); + /// callers must not let copies outlive the originating Session. + /// + /// Build entries via `LogEntryInput` and route them through + /// `Session::AppendLog` / `AppendLogBatch`, which intern logger names + /// and arena-allocate the other strings before storing. struct LogEntry { - DateTime Timestamp; - std::string Level; - std::string Message; - CbObject Data; + DateTime Timestamp{0}; + // Sentinel: Off means "no level set" (e.g. plain-text POSTed entries + // where the client didn't include a level). Real log entries use + // Trace..Critical, so Off is free to reuse as "omit on serialize". + logging::LogLevel Level = logging::Off; + // Arena pointers (null-terminated). Empty string is the default + // — never null, so callers don't need to guard. + const char* LoggerName = ""; // Interned: one canonical copy per unique name across the session. + const char* Message = ""; // For structured entries: the rendered form (populated at intake). + const char* Format = ""; // UE_LOGFMT template; "" for plain entries. + CbObject Fields; // Present only when Format is non-empty. + }; + + /// Input form used to build an entry on the way into a Session. The + /// string_view fields are caller-borrowed; AppendLog interns/copies + /// them into the Session's arena before any LogEntry is built. Use + /// this struct rather than constructing LogEntry directly so the + /// arena ownership invariant stays one-sided. + struct LogEntryInput + { + DateTime Timestamp{0}; + logging::LogLevel Level = logging::Off; + std::string_view LoggerName; + std::string_view Message; + std::string_view Format; + CbObject Fields; }; class Session : public TRefCounted<Session> { public: - Session(const SessionInfo& Info); + Session(const SessionInfo& Info, Ref<SessionLog> Log = {}, ProcessHandle ClientProcess = {}); ~Session(); Session(Session&&) = delete; @@ -67,12 +109,29 @@ public: void SetEndedAt(DateTime When) { m_Info.EndedAt = When; } - void AppendLog(LogEntry Entry); + /// Appends an entry to the in-memory deque and to the persisted + /// log. Returns the new cursor value (m_TotalAppended post- + /// increment). Logger name is interned, message and format are + /// arena-allocated — the input's string_views may safely be + /// caller-stack-bound. + uint64_t AppendLog(LogEntryInput Input); + + /// Append-many counterpart that takes the deque lock exactly once + /// for the whole batch. Use this when an inbound HTTP POST carries + /// multiple entries — single-lock semantics keep entries from one + /// caller contiguous on the wire even when other appends race in, + /// and the WS-push observer can fire just once for the whole batch. + /// Returns the new cursor (the value at the tail of the batch). + uint64_t AppendLogBatch(std::span<LogEntryInput> Inputs); + std::vector<LogEntry> GetLogEntries(uint32_t Limit = 0, uint32_t Offset = 0) const; uint64_t GetLogCount() const; /// Returns entries appended after the given cursor and the new cursor value. /// A cursor of 0 returns all entries currently in the deque. + /// The returned LogEntries borrow strings from this Session's + /// arena — callers must hold a Ref<Session> for as long as they + /// keep the result. struct CursorResult { std::vector<LogEntry> Entries; @@ -81,26 +140,118 @@ public: }; CursorResult GetLogEntriesAfter(uint64_t AfterCursor) const; + // Seed this session with pre-existing log entries (e.g. loaded from disk + // on startup). Sets the total-appended counter to reflect what was on + // disk so cursors remain meaningful for historical sessions. The inputs + // are interned/arena-allocated into this session. + void PreloadEntries(std::span<const LogEntryInput> Tail, uint64_t TotalCount); + + /// Process handle used for client-liveness checks. Acquired at + /// registration time (while the pid is known to refer to the reporting + /// process) and held for the session's lifetime; on Windows this is a + /// real HANDLE tied to the specific process instance and is immune to + /// pid reuse. Invalid (IsValid() == false) for remote sessions or when + /// OpenProcess() failed. Set once at construction — no synchronization + /// needed for readers. + const ProcessHandle& GetClientProcess() const { return m_ClientProcess; } + ProcessHandle& GetClientProcess() { return m_ClientProcess; } + + static constexpr uint32_t MaxLogEntries = 10000; + private: + // Intern a logger name into m_LogArena and return the canonical + // pointer for that name. Subsequent calls with the same string + // return the same pointer. Caller must hold m_LogLock exclusive. + const char* InternLoggerNameLocked(std::string_view Name); + + // Allocate a copy of Str into m_LogArena and return a null- + // terminated pointer. No deduplication. Caller must hold m_LogLock + // exclusive. Empty input returns "" (no allocation). + const char* AllocateLogStringLocked(std::string_view Str); + SessionInfo m_Info; Ref<SessionLog> m_Log; + ProcessHandle m_ClientProcess; mutable RwLock m_LogLock; eastl::deque<LogEntry> m_LogEntries; uint64_t m_TotalAppended = 0; // monotonically increasing counter - - static constexpr uint32_t MaxLogEntries = 10000; + // String storage for the in-memory deque. LoggerName is interned + // (one canonical copy per unique name); Message and Format are + // duplicated per entry. Both die with the Session — so the + // LogEntry pointers do too. Sized to fit a typical session's + // strings in one chunk; spills to additional chunks otherwise. + MemoryArena m_LogArena{4096}; + tsl::robin_map<std::string_view, const char*> m_InternedLoggerNames; }; - SessionsService(); + /// Construct a SessionsService. If StorageRoot is non-empty, session + /// metadata and logs are persisted under that directory (one subdirectory + /// per session id) and previously-persisted sessions are loaded as ended. + explicit SessionsService(std::filesystem::path StorageRoot = {}); ~SessionsService(); - bool RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, const Oid& JobId, CbObjectView Metadata); - bool UpdateSession(const Oid& SessionId, CbObjectView Metadata); - Ref<Session> GetSession(const Oid& SessionId) const; + bool RegisterSession(const Oid& SessionId, + std::string AppName, + std::string Mode, + std::string Platform, + uint32_t ClientPid, + const Oid& ParentSessionId, + const Oid& JobId, + CbObjectView Metadata); + bool UpdateSession(const Oid& SessionId, CbObjectView Metadata); + Ref<Session> GetSession(const Oid& SessionId) const; std::vector<Ref<Session>> GetSessions() const; std::vector<Ref<Session>> GetEndedSessions() const; - bool RemoveSession(const Oid& SessionId); - uint64_t GetSessionCount() const; + /// Ends a session. If Reason is non-empty, a synthetic log line is + /// appended to the session log before it's moved to ended so the + /// historical log has a clear closing event. + bool RemoveSession(const Oid& SessionId, std::string_view Reason = {}); + uint64_t GetSessionCount() const; + + /// Appends a log entry to `SessionId` and, if the session exists, + /// invokes the log-appended callback with the new cursor so downstream + /// push subscribers (e.g. the HTTP WS broadcast) can deliver the delta + /// without polling. Returns the new cursor, or 0 if the session is + /// unknown. Fires the callback AFTER any internal locks are released + /// so the callback can safely call back into this service. + uint64_t AppendLog(const Oid& SessionId, LogEntryInput Input); + + /// Batch counterpart of AppendLog. Atomic with respect to other + /// appends to the same session — entries land contiguously on the + /// wire and persist in order — and fires exactly one push-callback + /// for the whole batch. Empty batches and unknown sessions are + /// no-ops returning 0. + uint64_t AppendLogBatch(const Oid& SessionId, std::span<LogEntryInput> Inputs); + + /// Observer fired after an entry is appended to any session. Replaces + /// any previously set callback. Pass {} to clear. Only one listener is + /// supported — the single consumer today is the HTTP WebSocket push. + using LogAppendedCallback = std::function<void(const Oid& SessionId, uint64_t NewCursor)>; + void SetLogAppendedCallback(LogAppendedCallback Callback); + + /// Drop ended sessions that are too old, that push us over the count + /// limit, or that push the on-disk footprint over the byte budget, and + /// delete their persisted directories. Active sessions are never + /// pruned. Returns the number removed by each criterion. + struct PruneResult + { + size_t ExpiredByAge = 0; + size_t ExpiredByCount = 0; + size_t ExpiredByStorage = 0; + }; + PruneResult PruneExpired(TimeSpan MaxAge, size_t MaxCount, uint64_t MaxStorageBytes); + + /// End any active session whose tracked client process is no longer + /// running. Sessions with an invalid ProcessHandle (remote, or + /// OpenProcess failed at registration) are skipped. Returns the number + /// of sessions ended by this pass. + size_t CheckProcessLiveness(); + + // Tuning defaults. Expressed in whole days / bytes so they're easy to + // override from a future command-line flag without touching internals. + static constexpr int kDefaultMaxSessionAgeDays = 365; + static constexpr size_t kDefaultMaxSessionCount = 1000; + static constexpr uint64_t kDefaultMaxStorageBytes = 50ull * 1024 * 1024; // 50 MiB private: LoggerRef& Log() { return m_Log; } @@ -110,6 +261,10 @@ private: tsl::robin_map<Oid, Ref<Session>, Oid::Hasher> m_Sessions; std::vector<Ref<Session>> m_EndedSessions; std::unique_ptr<SessionLogStore> m_SessionLogs; + // Set once at wiring-time (single consumer), never reassigned while + // hot, so no dedicated lock — just a plain member. Copy-on-call + // guards against the theoretical re-register race below. + LogAppendedCallback m_LogAppendedCallback; }; } // namespace zen |