// Copyright Epic Games, Inc. All Rights Reserved. #include "httpsessions.h" #include #include #include #include #include #include "logtemplate.h" #include "sessions.h" ZEN_THIRD_PARTY_INCLUDES_START #include #include #include ZEN_THIRD_PARTY_INCLUDES_END #include namespace zen { using namespace std::literals; HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService, HttpStatsService& StatsService, SessionsService& Sessions, asio::io_context& IoContext) : m_Log(logging::Get("sessions")) , m_StatusService(StatusService) , m_StatsService(StatsService) , m_Sessions(Sessions) , m_PushTimer(IoContext) , m_CleanupTimer(IoContext) , m_LivenessTimer(IoContext) { Initialize(); } HttpSessionsService::~HttpSessionsService() { // Break the callback edge before tearing anything else down so a // late AppendLog on another thread can't fire BroadcastLogAppended // after our subscriber list is gone. m_Sessions.SetLogAppendedCallback({}); m_PushTimer.cancel(); m_CleanupTimer.cancel(); m_LivenessTimer.cancel(); m_StatsService.UnregisterHandler("sessions", *this); m_StatusService.UnregisterHandler("sessions", *this); } const char* HttpSessionsService::BaseUri() const { return "/sessions/"; } void HttpSessionsService::HandleRequest(HttpServerRequest& Request) { metrics::OperationTiming::Scope $(m_HttpRequests); if (m_Router.HandleRequest(Request) == false) { ZEN_WARN("No route found for {0}", Request.RelativeUri()); return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Not found"sv); } } void HttpSessionsService::HandleStatusRequest(HttpServerRequest& Request) { ZEN_TRACE_CPU("HttpSessionsService::Status"); CbObjectWriter Cbo; Cbo << "ok" << true; Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } void HttpSessionsService::HandleStatsRequest(HttpServerRequest& HttpReq) { HttpReq.WriteResponse(HttpResponseCode::OK, CollectStats()); } CbObject HttpSessionsService::CollectStats() { ZEN_TRACE_CPU("SessionsService::Stats"); CbObjectWriter Cbo; EmitSnapshot("requests", m_HttpRequests, Cbo); Cbo.BeginObject("sessions"); { Cbo << "readcount" << m_SessionsStats.SessionReadCount; Cbo << "writecount" << m_SessionsStats.SessionWriteCount; Cbo << "deletecount" << m_SessionsStats.SessionDeleteCount; Cbo << "listcount" << m_SessionsStats.SessionListCount; Cbo << "requestcount" << m_SessionsStats.RequestCount; Cbo << "badrequestcount" << m_SessionsStats.BadRequestCount; Cbo << "count" << m_Sessions.GetSessionCount(); } Cbo.EndObject(); return Cbo.Save(); } uint64_t HttpSessionsService::GetActivityCounter() { return m_HttpRequests.Count(); } void HttpSessionsService::Initialize() { using namespace std::literals; ZEN_INFO("Initializing Sessions Service"); static constexpr AsciiSet ValidHexCharactersSet{"0123456789abcdefABCDEF"}; m_Router.AddMatcher("session_id", [](std::string_view Str) -> bool { return Str.length() == Oid::StringLength && AsciiSet::HasOnly(Str, ValidHexCharactersSet); }); m_Router.RegisterRoute( "list", [this](HttpRouterRequest& Req) { ListSessionsRequest(Req); }, HttpVerb::kGet); m_Router.RegisterRoute( "{session_id}", [this](HttpRouterRequest& Req) { SessionRequest(Req); }, HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kPut | HttpVerb::kDelete); m_Router.RegisterRoute( "{session_id}/log", [this](HttpRouterRequest& Req) { SessionLogRequest(Req); }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "", [this](HttpRouterRequest& Req) { ListSessionsRequest(Req); }, HttpVerb::kGet); m_Router.RegisterRoute( "ws", [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK); }, HttpVerb::kGet); m_StatsService.RegisterHandler("sessions", *this); m_StatusService.RegisterHandler("sessions", *this); // Event-driven log push: the service fires this every time an entry // is appended (including the synthetic "session ended" line emitted // by RemoveSession). Subscribers receive a binary CB frame carrying // the delta. Safe to call BroadcastLogAppended from any thread — it // does its own locking and SendBinary is async-queued by the WS // transport. m_Sessions.SetLogAppendedCallback([this](const Oid& SessionId, uint64_t NewCursor) { BroadcastLogAppended(SessionId, NewCursor); }); EnqueuePushTimer(); // Run a cleanup pass shortly after startup so freshly-loaded historical // data is pruned even if the server doesn't stay up for an hour. m_CleanupTimer.expires_after(std::chrono::seconds(30)); m_CleanupTimer.async_wait([this](const asio::error_code& Ec) { if (Ec) { return; } RunCleanup(); EnqueueCleanupTimer(); }); EnqueueLivenessTimer(); } static void WriteSessionInfo(CbWriter& Writer, const SessionsService::Session& Session) { const SessionsService::SessionInfo& Info = Session.Info(); Writer << "id" << Info.Id; if (!Info.AppName.empty()) { Writer << "appname" << Info.AppName; } if (!Info.Mode.empty()) { Writer << "mode" << Info.Mode; } if (!Info.Platform.empty()) { Writer << "platform" << Info.Platform; } if (Info.ClientPid != 0) { Writer << "pid" << Info.ClientPid; } if (Info.ParentSessionId != Oid::Zero) { Writer << "parent_session_id" << Info.ParentSessionId; } if (Info.JobId != Oid::Zero) { Writer << "jobid" << Info.JobId; } Writer << "created_at" << Info.CreatedAt; Writer << "updated_at" << Info.UpdatedAt; if (Info.EndedAt.GetTicks() != 0) { Writer << "ended_at" << Info.EndedAt; } if (const uint64_t LogCount = Session.GetLogCount(); LogCount > 0) { Writer << "log_count" << LogCount; } if (Info.Metadata.GetSize() > 0) { Writer.AddObject("metadata"sv, Info.Metadata); } } CbObject HttpSessionsService::BuildSessionListResponse() { std::vector> Active = m_Sessions.GetSessions(); std::vector> Ended = m_Sessions.GetEndedSessions(); CbObjectWriter Response; if (m_SelfSessionId != Oid::Zero) { Response << "self_id" << m_SelfSessionId; } Response.BeginArray("sessions"); for (const Ref& Session : Active) { Response.BeginObject(); WriteSessionInfo(Response, *Session); Response.EndObject(); } for (const Ref& Session : Ended) { Response.BeginObject(); WriteSessionInfo(Response, *Session); Response.EndObject(); } Response.EndArray(); return Response.Save(); } void HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); m_SessionsStats.SessionListCount++; m_SessionsStats.RequestCount++; HttpServerRequest::QueryParams Params = ServerRequest.GetQueryParams(); std::string_view Status = Params.GetValue("status"sv); std::vector> Sessions; if (Status == "ended"sv) { Sessions = m_Sessions.GetEndedSessions(); } else if (Status == "all"sv) { Sessions = m_Sessions.GetSessions(); std::vector> Ended = m_Sessions.GetEndedSessions(); Sessions.insert(Sessions.end(), Ended.begin(), Ended.end()); } else { Sessions = m_Sessions.GetSessions(); } CbObjectWriter Response; if (m_SelfSessionId != Oid::Zero) { Response << "self_id" << m_SelfSessionId; } Response.BeginArray("sessions"); for (const Ref& Session : Sessions) { Response.BeginObject(); WriteSessionInfo(Response, *Session); Response.EndObject(); } Response.EndArray(); return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); } void HttpSessionsService::SessionRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); const Oid SessionId = Oid::TryFromHexString(Req.GetCapture(1)); if (SessionId == Oid::Zero) { m_SessionsStats.BadRequestCount++; return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Invalid session id '{}'", Req.GetCapture(1))); } m_SessionsStats.RequestCount++; switch (ServerRequest.RequestVerb()) { case HttpVerb::kPost: case HttpVerb::kPut: { CbObject RequestObject = ServerRequest.ReadPayloadObject(); // Render the id into a stack buffer once for any success-reply // paths below — avoids a std::string per POST/PUT. char IdBuf[Oid::StringLength + 1] = {}; SessionId.ToString(IdBuf); const std::string_view IdStr(IdBuf, Oid::StringLength); if (ServerRequest.RequestVerb() == HttpVerb::kPost) { std::string AppName(RequestObject["appname"sv].AsString()); std::string Mode(RequestObject["mode"sv].AsString()); std::string Platform(RequestObject["platform"sv].AsString()); Oid ParentSessionId = RequestObject["parent_session_id"sv].AsObjectId(); Oid JobId = RequestObject["jobid"sv].AsObjectId(); CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView(); // Only trust a client-reported pid when the HTTP layer // says the request is local (unix socket or a loopback // TCP peer). A remote client's pid refers to a different // machine's process table — opening a local handle with // it would at best be meaningless, at worst a liveness // false positive. uint32_t ClientPid = 0; if (ServerRequest.IsLocalMachineRequest()) { ClientPid = RequestObject["pid"sv].AsUInt32(); } m_SessionsStats.SessionWriteCount++; if (m_Sessions.RegisterSession(SessionId, std::move(AppName), std::move(Mode), std::move(Platform), ClientPid, ParentSessionId, JobId, MetadataView)) { return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, IdStr); } else { // Already exists - try update instead if (m_Sessions.UpdateSession(SessionId, MetadataView)) { return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, IdStr); } return ServerRequest.WriteResponse(HttpResponseCode::InternalServerError); } } else { // PUT - update only m_SessionsStats.SessionWriteCount++; if (m_Sessions.UpdateSession(SessionId, RequestObject["metadata"sv].AsObjectView())) { return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, IdStr); } return ServerRequest.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Session '{}' not found", SessionId)); } } case HttpVerb::kGet: { m_SessionsStats.SessionReadCount++; Ref Session = m_Sessions.GetSession(SessionId); if (Session) { CbObjectWriter Response; WriteSessionInfo(Response, *Session); return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); } return ServerRequest.WriteResponse(HttpResponseCode::NotFound); } case HttpVerb::kDelete: { m_SessionsStats.SessionDeleteCount++; if (m_Sessions.RemoveSession(SessionId, "client request"sv)) { return ServerRequest.WriteResponse(HttpResponseCode::OK); } return ServerRequest.WriteResponse(HttpResponseCode::NotFound); } default: { return ServerRequest.WriteResponse(HttpResponseCode::MethodNotAllowed); } } } ////////////////////////////////////////////////////////////////////////// // // Session log // static void WriteLogEntry(CbWriter& Writer, const SessionsService::LogEntry& Entry) { Writer << "timestamp" << Entry.Timestamp; if (Entry.Level != logging::Off) { // Frontend renders on the string form (CSS class derives from it), so // keep the wire format as the canonical lowercase name. Writer << "level" << logging::ToString(Entry.Level); } const std::string_view LoggerName{Entry.LoggerName}; if (!LoggerName.empty()) { Writer << "logger" << LoggerName; } const std::string_view Message{Entry.Message}; if (!Message.empty()) { Writer << "message" << Message; } // Structured-log form alongside the rendered message so a future UI // can offer field-level drill-down without another schema bump. The // existing UI only looks at "message" and is unaffected. const std::string_view Format{Entry.Format}; if (!Format.empty()) { Writer << "format" << Format; if (Entry.Fields.GetSize() > 0) { Writer.AddObject("fields"sv, Entry.Fields); } } } void HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); const Oid SessionId = Oid::TryFromHexString(Req.GetCapture(1)); if (SessionId == Oid::Zero) { m_SessionsStats.BadRequestCount++; return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Invalid session id '{}'", Req.GetCapture(1))); } m_SessionsStats.RequestCount++; Ref Session = m_Sessions.GetSession(SessionId); if (!Session) { return ServerRequest.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Session '{}' not found", SessionId)); } if (ServerRequest.RequestVerb() == HttpVerb::kPost) { m_SessionsStats.SessionWriteCount++; if (ServerRequest.RequestContentType() == HttpContentType::kText) { // Raw text - split by newlines, one entry per line. Collect // into a batch and append atomically: keeps a single client's // payload contiguous on the wire even when other clients race // in, and fires the WS push observer just once for the whole // batch instead of once per line. IoBuffer Payload = ServerRequest.ReadPayload(); std::string_view Text(reinterpret_cast(Payload.GetData()), Payload.GetSize()); const DateTime Now = DateTime::Now(); // 64 inline slots covers the typical SendLogBatch posting size // (~50) without touching the heap. Spills to heap beyond that. // LogEntryInput's string_views point into the request payload // (Text), which lives for the duration of this handler. eastl::fixed_vector Batch; size_t Pos = 0; while (Pos < Text.size()) { size_t End = Text.find('\n', Pos); if (End == std::string_view::npos) { End = Text.size(); } std::string_view Line = Text.substr(Pos, End - Pos); // Strip trailing \r if (!Line.empty() && Line.back() == '\r') { Line.remove_suffix(1); } if (!Line.empty()) { Batch.push_back(SessionsService::LogEntryInput{ .Timestamp = Now, .Message = Line, }); } Pos = End + 1; } m_Sessions.AppendLogBatch(SessionId, Batch); } else { // Structured log (JSON or CbObject). Accepts a single record // or an "entries" array of records — collect into a batch so // a single POST lands atomically and fires one WS push. CbObject RequestObject = ServerRequest.ReadPayloadObject(); const DateTime Now = DateTime::Now(); // 64 inline slots covers the typical SendLogBatch posting size // (~50) without touching the heap. Spills to heap beyond that. // LogEntryInput's string_views borrow from the parsed // RequestObject's underlying buffer (the logger / message / // format strings on the wire); we keep RequestObject alive // for the whole intake. eastl::fixed_vector Batch; // Stable backing for messages we render from a structured // template. fixed_list never moves nodes on insertion, so // string_views into these strings stay valid until the list // is destroyed at handler exit. 64 inline nodes match the // batch's fixed-vector inline cap; spills to heap if a POST // brings more. eastl::fixed_list RenderedMessages; auto AppendFromObject = [&](CbObjectView Obj) { CbFieldView LevelField = Obj["level"sv]; logging::LogLevel Level = logging::Off; if (LevelField.IsString()) { Level = logging::ParseLogLevelString(LevelField.AsString()); } else if (LevelField.IsInteger()) { int32_t LevelInt = LevelField.AsInt32(); if (LevelInt >= 0 && LevelInt < logging::LogLevelCount) { Level = static_cast(LevelInt); } } const std::string_view LoggerName = Obj["logger"sv].AsString(); // Two entry shapes. Structured entries carry `format` + // `fields` and no `message` — we render the template right // here so the rest of the pipeline (in-memory deque, // persisted log.bin, UI GET response) keeps working the // same way for both shapes. CbFieldView FormatField = Obj["format"sv]; if (FormatField.IsString()) { const std::string_view FormatView = FormatField.AsString(); CbObjectView FieldsView = Obj["fields"sv].AsObjectView(); ExtendableStringBuilder<256> RenderedBuilder; RenderLogTemplate(FormatView, FieldsView, RenderedBuilder); // Anchor the rendered string in the stable list so the // LogEntryInput's view into it stays valid until the // AppendLogBatch call below. RenderedMessages.emplace_back(RenderedBuilder.ToView()); const std::string& StoredRendered = RenderedMessages.back(); Batch.push_back(SessionsService::LogEntryInput{ .Timestamp = Now, .Level = Level, .LoggerName = LoggerName, .Message = StoredRendered, .Format = FormatView, .Fields = CbObject::Clone(FieldsView), }); return; } // Plain entry. Batch.push_back(SessionsService::LogEntryInput{ .Timestamp = Now, .Level = Level, .LoggerName = LoggerName, .Message = Obj["message"sv].AsString(), }); }; CbFieldView EntriesField = RequestObject["entries"sv]; if (EntriesField.IsArray()) { // Pre-reserve so the 50-ish entries from a typical // SendLogBatch don't trigger 4-5 reallocations as the // vector grows. CbArrayView Arr = EntriesField.AsArrayView(); Batch.reserve(Arr.Num()); for (CbFieldView Entry : Arr) { AppendFromObject(Entry.AsObjectView()); } } else { Batch.reserve(1); AppendFromObject(RequestObject); } m_Sessions.AppendLogBatch(SessionId, Batch); } return ServerRequest.WriteResponse(HttpResponseCode::OK); } else { // GET - return log entries m_SessionsStats.SessionReadCount++; HttpServerRequest::QueryParams Params = ServerRequest.GetQueryParams(); // cursor-based retrieval: client passes the cursor from the previous response // and receives only entries appended since then. std::string_view CursorStr = Params.GetValue("cursor"sv); if (!CursorStr.empty()) { const std::optional AfterCursor = ParseInt(CursorStr); if (!AfterCursor) { m_SessionsStats.BadRequestCount++; return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid 'cursor' parameter"sv); } SessionsService::Session::CursorResult Result = Session->GetLogEntriesAfter(*AfterCursor); CbObjectWriter Response; Response << "cursor" << Result.Cursor; Response << "count" << Result.Count; Response.BeginArray("entries"); for (const SessionsService::LogEntry& Entry : Result.Entries) { Response.BeginObject(); WriteLogEntry(Response, Entry); Response.EndObject(); } Response.EndArray(); return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); } // Legacy offset/limit retrieval uint32_t Limit = 0; uint32_t Offset = 0; if (std::string_view LimitStr = Params.GetValue("limit"sv); !LimitStr.empty()) { const std::optional Parsed = ParseInt(LimitStr); if (!Parsed) { m_SessionsStats.BadRequestCount++; return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid 'limit' parameter"sv); } Limit = *Parsed; } if (std::string_view OffsetStr = Params.GetValue("offset"sv); !OffsetStr.empty()) { const std::optional Parsed = ParseInt(OffsetStr); if (!Parsed) { m_SessionsStats.BadRequestCount++; return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid 'offset' parameter"sv); } Offset = *Parsed; } std::vector Entries = Session->GetLogEntries(Limit, Offset); CbObjectWriter Response; Response << "total" << Session->GetLogCount(); Response.BeginArray("entries"); for (const SessionsService::LogEntry& Entry : Entries) { Response.BeginObject(); WriteLogEntry(Response, Entry); Response.EndObject(); } Response.EndArray(); return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); } } ////////////////////////////////////////////////////////////////////////// // // WebSocket push // void HttpSessionsService::OnWebSocketOpen(Ref Connection, std::string_view RelativeUri) { ZEN_UNUSED(RelativeUri); ZEN_INFO("Sessions WebSocket client connected"); const uint64_t NewId = m_NextSubscriberId.fetch_add(1, std::memory_order_relaxed); m_WsConnectionsLock.WithExclusiveLock( [&] { m_WsConnections.push_back(WsSubscriber{.Connection = std::move(Connection), .Id = NewId}); }); } void HttpSessionsService::OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg) { // Expected client→server protocol is JSON text frames; see // sessions.js → _ws_send. Binary frames and malformed JSON are logged // at debug and ignored so a confused client can't disturb others. if (Msg.Opcode != WebSocketOpcode::kText) { return; } std::string_view PayloadText(static_cast(Msg.Payload.GetData()), Msg.Payload.GetSize()); std::string ParseError; json11::Json Parsed = json11::Json::parse(std::string(PayloadText), ParseError); if (!ParseError.empty() || !Parsed.is_object()) { ZEN_DEBUG("Ignoring malformed WebSocket frame: {}", ParseError.empty() ? "not an object" : ParseError); return; } const std::string& Type = Parsed["type"].string_value(); if (Type == "sub_log") { const Oid SessionId = Oid::TryFromHexString(Parsed["session"].string_value()); if (SessionId == Oid::Zero) { ZEN_DEBUG("sub_log with invalid session id '{}'", Parsed["session"].string_value()); return; } // json11 reports int via int_value() (32-bit); cursors fit easily // inside a session's lifetime so this is fine for the foreseeable // future. Negative values are treated as 0. const int CursorRaw = Parsed["cursor"].int_value(); const uint64_t Cursor = CursorRaw > 0 ? static_cast(CursorRaw) : 0; // Record the subscription and fire an immediate delta so we don't // drop entries that landed between the client's HTTP replay and // this frame. See BroadcastLogAppended for the broadcast flow. m_WsConnectionsLock.WithExclusiveLock([&] { for (WsSubscriber& Sub : m_WsConnections) { if (Sub.Connection.Get() == &Conn) { Sub.SubscribedSessionId = SessionId; Sub.LastSentCursor = Cursor; break; } } }); // Pass UINT64_MAX to force a flush even if the cursor hasn't // advanced — the subscriber's LastSentCursor may already lag the // tail (e.g. rapid posts before the client subscribed). BroadcastLogAppended(SessionId, std::numeric_limits::max()); } else if (Type == "unsub_log") { m_WsConnectionsLock.WithExclusiveLock([&] { for (WsSubscriber& Sub : m_WsConnections) { if (Sub.Connection.Get() == &Conn) { Sub.Unsubscribe(); break; } } }); } // Unknown types are silently ignored so the protocol can grow. } void HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason) { ZEN_INFO("Sessions WebSocket client disconnected (code {})", Code); m_WsConnectionsLock.WithExclusiveLock([&] { auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const WsSubscriber& Sub) { return Sub.Connection.Get() == &Conn; }); m_WsConnections.erase(It, m_WsConnections.end()); }); } void HttpSessionsService::BroadcastSessions() { // 8 inline slots covers any realistic number of concurrent UI tabs; // spills to heap beyond that. eastl::fixed_vector, 8> Connections; m_WsConnectionsLock.WithSharedLock([&] { for (const WsSubscriber& Sub : m_WsConnections) { Connections.push_back(Sub.Connection); } }); if (Connections.empty()) { return; } ExtendableStringBuilder<4096> JsonBuilder; BuildSessionListResponse().ToJson(JsonBuilder); std::string_view Json = JsonBuilder.ToView(); for (const Ref& Conn : Connections) { if (Conn->IsOpen()) { Conn->SendText(Json); } } } void HttpSessionsService::BroadcastLogAppended(const Oid& SessionId, uint64_t NewCursor) { Ref Session = m_Sessions.GetSession(SessionId); if (!Session) { // Session vanished (e.g. pruned) between the append and the // broadcast. No entries to ship. return; } // Claim each subscriber's cursor and snapshot its delta atomically under // the exclusive WS lock. Doing claim+fetch+cursor-bump together — rather // than snapshot-shared / fetch-unlocked / bump-exclusive — closes the // race where two concurrent BroadcastLogAppended calls would both // observe the same FromCursor, fetch overlapping ranges, and ship the // subscriber duplicate entries. Sends still happen after the lock is // released to avoid holding it across async socket I/O. struct PendingSend { Ref Connection; SessionsService::Session::CursorResult Delta; bool InitialSend; // true when FromCursor == 0 }; // 8 inline slots keeps the broadcast allocation-free for the typical UI // case (1-2 tabs tailing one session); spills to heap if many clients // happen to subscribe to the same session at once. eastl::fixed_vector Sends; m_WsConnectionsLock.WithExclusiveLock([&] { for (WsSubscriber& Sub : m_WsConnections) { if (!Sub.IsSubscribedTo(SessionId)) { continue; } // Cheap gate: if the subscriber already has everything up to // NewCursor, skip. Sub_log uses UINT64_MAX to force a flush. if (NewCursor != std::numeric_limits::max() && Sub.LastSentCursor >= NewCursor) { continue; } if (!Sub.Connection->IsOpen()) { continue; } const uint64_t FromCursor = Sub.LastSentCursor; SessionsService::Session::CursorResult Delta = Session->GetLogEntriesAfter(FromCursor); Sub.LastSentCursor = Delta.Cursor; Sends.push_back({Sub.Connection, std::move(Delta), FromCursor == 0}); } }); if (Sends.empty()) { return; } // Render the hex id into a stack buffer — CbWriter only needs a // string_view, so we avoid the 24-byte std::string allocation that // Oid::ToString() would otherwise do on every broadcast. The buffer // is StringLength + 1 because ToString writes a trailing NUL beyond // the 24 hex chars; the view itself excludes the NUL. char HexSessionIdBuf[Oid::StringLength + 1]; SessionId.ToString(HexSessionIdBuf); const std::string_view HexSessionId(HexSessionIdBuf, Oid::StringLength); for (const PendingSend& Send : Sends) { if (Send.Delta.Entries.empty() && !Send.InitialSend) { // Nothing new and the subscriber was primed — nothing to send. continue; } // Binary CB frame — the client already has a CB parser // (util/compactbinary.js). CB keeps structured entries typed end- // to-end (hashes, ints, dates stay that way on the wire) and skips // JSON escaping overhead on every append. Shape mirrors the HTTP // GET response plus two routing fields (type + session). A fresh // CbObjectWriter per iteration is required because the ctor calls // BeginObject() to set up the implicit outer object — Save() then // finalizes that object, leaving the writer in a state that // Reset() doesn't restore. CbObjectWriter Response; Response << "type"sv << "log"sv; Response << "session"sv << HexSessionId; Response << "cursor"sv << Send.Delta.Cursor; Response << "count"sv << Send.Delta.Count; Response.BeginArray("entries"sv); for (const SessionsService::LogEntry& Entry : Send.Delta.Entries) { Response.BeginObject(); WriteLogEntry(Response, Entry); Response.EndObject(); } Response.EndArray(); CbObject Obj = Response.Save(); Send.Connection->SendBinary(Obj.GetView()); } } void HttpSessionsService::EnqueuePushTimer() { m_PushTimer.expires_after(std::chrono::seconds(2)); m_PushTimer.async_wait([this](const asio::error_code& Ec) { if (Ec) { return; } BroadcastSessions(); EnqueuePushTimer(); }); } ////////////////////////////////////////////////////////////////////////// // // Periodic cleanup of expired / excess sessions // void HttpSessionsService::RunCleanup() { const TimeSpan MaxAge = TimeSpan(SessionsService::kDefaultMaxSessionAgeDays, 0, 0, 0); const size_t MaxCount = SessionsService::kDefaultMaxSessionCount; const uint64_t MaxBytes = SessionsService::kDefaultMaxStorageBytes; const SessionsService::PruneResult Result = m_Sessions.PruneExpired(MaxAge, MaxCount, MaxBytes); if (Result.ExpiredByAge + Result.ExpiredByCount + Result.ExpiredByStorage > 0) { ZEN_INFO("Sessions cleanup: pruned {} by age, {} by count, {} by storage (max {} days, max {} sessions, max {} MiB)", Result.ExpiredByAge, Result.ExpiredByCount, Result.ExpiredByStorage, SessionsService::kDefaultMaxSessionAgeDays, MaxCount, MaxBytes / (1024 * 1024)); } } void HttpSessionsService::EnqueueCleanupTimer() { m_CleanupTimer.expires_after(std::chrono::hours(1)); m_CleanupTimer.async_wait([this](const asio::error_code& Ec) { if (Ec) { return; } RunCleanup(); EnqueueCleanupTimer(); }); } ////////////////////////////////////////////////////////////////////////// // // Periodic liveness check for tracked local client processes // void HttpSessionsService::RunLivenessCheck() { const size_t EndedByDeadClient = m_Sessions.CheckProcessLiveness(); if (EndedByDeadClient > 0) { ZEN_INFO("Sessions liveness: ended {} session(s) whose client process had exited", EndedByDeadClient); } else { // Debug-level so this doesn't spam at info every 30s, but lets an // operator who's specifically investigating why their crashed // session didn't clean up see whether anything is being tracked. ZEN_DEBUG("Sessions liveness: no dead client processes found"); } } void HttpSessionsService::EnqueueLivenessTimer() { // 30s strikes a balance between crash-detection latency and // per-session OpenProcess/GetExitCode overhead. Active sessions with // no reported pid (remote clients) are skipped in the inner loop so // the cost scales with local sessions only. m_LivenessTimer.expires_after(std::chrono::seconds(30)); m_LivenessTimer.async_wait([this](const asio::error_code& Ec) { if (Ec) { return; } RunLivenessCheck(); EnqueueLivenessTimer(); }); } } // namespace zen