diff options
Diffstat (limited to 'src/zenserver/sessions/httpsessions.cpp')
| -rw-r--r-- | src/zenserver/sessions/httpsessions.cpp | 504 |
1 files changed, 460 insertions, 44 deletions
diff --git a/src/zenserver/sessions/httpsessions.cpp b/src/zenserver/sessions/httpsessions.cpp index 88db36828..1678ede60 100644 --- a/src/zenserver/sessions/httpsessions.cpp +++ b/src/zenserver/sessions/httpsessions.cpp @@ -7,8 +7,17 @@ #include <zencore/logging.h> #include <zencore/string.h> #include <zencore/trace.h> +#include "logtemplate.h" #include "sessions.h" +ZEN_THIRD_PARTY_INCLUDES_START +#include <EASTL/fixed_list.h> +#include <EASTL/fixed_vector.h> +#include <json11.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <limits> + namespace zen { using namespace std::literals; @@ -21,13 +30,21 @@ HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService, , m_StatsService(StatsService) , m_Sessions(Sessions) , m_PushTimer(IoContext) +, m_CleanupTimer(IoContext) +, m_LivenessTimer(IoContext) { Initialize(); } HttpSessionsService::~HttpSessionsService() { + // Break the callback edge before tearing anything else down so a + // late AppendLog on another thread can't fire BroadcastLogAppended + // after our subscriber list is gone. + m_Sessions.SetLogAppendedCallback({}); m_PushTimer.cancel(); + m_CleanupTimer.cancel(); + m_LivenessTimer.cancel(); m_StatsService.UnregisterHandler("sessions", *this); m_StatusService.UnregisterHandler("sessions", *this); } @@ -135,12 +152,36 @@ HttpSessionsService::Initialize() m_StatsService.RegisterHandler("sessions", *this); m_StatusService.RegisterHandler("sessions", *this); + // Event-driven log push: the service fires this every time an entry + // is appended (including the synthetic "session ended" line emitted + // by RemoveSession). Subscribers receive a binary CB frame carrying + // the delta. Safe to call BroadcastLogAppended from any thread — it + // does its own locking and SendBinary is async-queued by the WS + // transport. + m_Sessions.SetLogAppendedCallback([this](const Oid& SessionId, uint64_t NewCursor) { BroadcastLogAppended(SessionId, NewCursor); }); + EnqueuePushTimer(); + + // Run a cleanup pass shortly after startup so freshly-loaded historical + // data is pruned even if the server doesn't stay up for an hour. + m_CleanupTimer.expires_after(std::chrono::seconds(30)); + m_CleanupTimer.async_wait([this](const asio::error_code& Ec) { + if (Ec) + { + return; + } + RunCleanup(); + EnqueueCleanupTimer(); + }); + + EnqueueLivenessTimer(); } static void -WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info) +WriteSessionInfo(CbWriter& Writer, const SessionsService::Session& Session) { + const SessionsService::SessionInfo& Info = Session.Info(); + Writer << "id" << Info.Id; if (!Info.AppName.empty()) { @@ -150,6 +191,18 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info) { Writer << "mode" << Info.Mode; } + if (!Info.Platform.empty()) + { + Writer << "platform" << Info.Platform; + } + if (Info.ClientPid != 0) + { + Writer << "pid" << Info.ClientPid; + } + if (Info.ParentSessionId != Oid::Zero) + { + Writer << "parent_session_id" << Info.ParentSessionId; + } if (Info.JobId != Oid::Zero) { Writer << "jobid" << Info.JobId; @@ -161,6 +214,11 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info) Writer << "ended_at" << Info.EndedAt; } + if (const uint64_t LogCount = Session.GetLogCount(); LogCount > 0) + { + Writer << "log_count" << LogCount; + } + if (Info.Metadata.GetSize() > 0) { Writer.AddObject("metadata"sv, Info.Metadata); @@ -182,13 +240,13 @@ HttpSessionsService::BuildSessionListResponse() for (const Ref<SessionsService::Session>& Session : Active) { Response.BeginObject(); - WriteSessionInfo(Response, Session->Info()); + WriteSessionInfo(Response, *Session); Response.EndObject(); } for (const Ref<SessionsService::Session>& Session : Ended) { Response.BeginObject(); - WriteSessionInfo(Response, Session->Info()); + WriteSessionInfo(Response, *Session); Response.EndObject(); } Response.EndArray(); @@ -231,7 +289,7 @@ HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req) for (const Ref<SessionsService::Session>& Session : Sessions) { Response.BeginObject(); - WriteSessionInfo(Response, Session->Info()); + WriteSessionInfo(Response, *Session); Response.EndObject(); } Response.EndArray(); @@ -262,24 +320,51 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req) { CbObject RequestObject = ServerRequest.ReadPayloadObject(); + // Render the id into a stack buffer once for any success-reply + // paths below — avoids a std::string per POST/PUT. + char IdBuf[Oid::StringLength + 1] = {}; + SessionId.ToString(IdBuf); + const std::string_view IdStr(IdBuf, Oid::StringLength); + if (ServerRequest.RequestVerb() == HttpVerb::kPost) { std::string AppName(RequestObject["appname"sv].AsString()); std::string Mode(RequestObject["mode"sv].AsString()); - Oid JobId = RequestObject["jobid"sv].AsObjectId(); - CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView(); + std::string Platform(RequestObject["platform"sv].AsString()); + Oid ParentSessionId = RequestObject["parent_session_id"sv].AsObjectId(); + Oid JobId = RequestObject["jobid"sv].AsObjectId(); + CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView(); + + // Only trust a client-reported pid when the HTTP layer + // says the request is local (unix socket or a loopback + // TCP peer). A remote client's pid refers to a different + // machine's process table — opening a local handle with + // it would at best be meaningless, at worst a liveness + // false positive. + uint32_t ClientPid = 0; + if (ServerRequest.IsLocalMachineRequest()) + { + ClientPid = RequestObject["pid"sv].AsUInt32(); + } m_SessionsStats.SessionWriteCount++; - if (m_Sessions.RegisterSession(SessionId, std::move(AppName), std::move(Mode), JobId, MetadataView)) + if (m_Sessions.RegisterSession(SessionId, + std::move(AppName), + std::move(Mode), + std::move(Platform), + ClientPid, + ParentSessionId, + JobId, + MetadataView)) { - return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", SessionId)); + return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, IdStr); } else { // Already exists - try update instead if (m_Sessions.UpdateSession(SessionId, MetadataView)) { - return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", SessionId)); + return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, IdStr); } return ServerRequest.WriteResponse(HttpResponseCode::InternalServerError); } @@ -290,7 +375,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req) m_SessionsStats.SessionWriteCount++; if (m_Sessions.UpdateSession(SessionId, RequestObject["metadata"sv].AsObjectView())) { - return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", SessionId)); + return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, IdStr); } return ServerRequest.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, @@ -304,7 +389,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req) if (Session) { CbObjectWriter Response; - WriteSessionInfo(Response, Session->Info()); + WriteSessionInfo(Response, *Session); return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); } return ServerRequest.WriteResponse(HttpResponseCode::NotFound); @@ -312,7 +397,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req) case HttpVerb::kDelete: { m_SessionsStats.SessionDeleteCount++; - if (m_Sessions.RemoveSession(SessionId)) + if (m_Sessions.RemoveSession(SessionId, "client request"sv)) { return ServerRequest.WriteResponse(HttpResponseCode::OK); } @@ -334,17 +419,33 @@ static void WriteLogEntry(CbWriter& Writer, const SessionsService::LogEntry& Entry) { Writer << "timestamp" << Entry.Timestamp; - if (!Entry.Level.empty()) + if (Entry.Level != logging::Off) { - Writer << "level" << Entry.Level; + // Frontend renders on the string form (CSS class derives from it), so + // keep the wire format as the canonical lowercase name. + Writer << "level" << logging::ToString(Entry.Level); } - if (!Entry.Message.empty()) + const std::string_view LoggerName{Entry.LoggerName}; + if (!LoggerName.empty()) { - Writer << "message" << Entry.Message; + Writer << "logger" << LoggerName; } - if (Entry.Data.GetSize() > 0) + const std::string_view Message{Entry.Message}; + if (!Message.empty()) { - Writer.AddObject("data"sv, Entry.Data); + Writer << "message" << Message; + } + // Structured-log form alongside the rendered message so a future UI + // can offer field-level drill-down without another schema bump. The + // existing UI only looks at "message" and is unaffected. + const std::string_view Format{Entry.Format}; + if (!Format.empty()) + { + Writer << "format" << Format; + if (Entry.Fields.GetSize() > 0) + { + Writer.AddObject("fields"sv, Entry.Fields); + } } } @@ -378,12 +479,21 @@ HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req) if (ServerRequest.RequestContentType() == HttpContentType::kText) { - // Raw text - split by newlines, one entry per line + // Raw text - split by newlines, one entry per line. Collect + // into a batch and append atomically: keeps a single client's + // payload contiguous on the wire even when other clients race + // in, and fires the WS push observer just once for the whole + // batch instead of once per line. IoBuffer Payload = ServerRequest.ReadPayload(); std::string_view Text(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); const DateTime Now = DateTime::Now(); - size_t Pos = 0; + // 64 inline slots covers the typical SendLogBatch posting size + // (~50) without touching the heap. Spills to heap beyond that. + // LogEntryInput's string_views point into the request payload + // (Text), which lives for the duration of this handler. + eastl::fixed_vector<SessionsService::LogEntryInput, 64> Batch; + size_t Pos = 0; while (Pos < Text.size()) { size_t End = Text.find('\n', Pos); @@ -401,60 +511,115 @@ HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req) if (!Line.empty()) { - Session->AppendLog(SessionsService::LogEntry{ + Batch.push_back(SessionsService::LogEntryInput{ .Timestamp = Now, - .Message = std::string(Line), + .Message = Line, }); } Pos = End + 1; } + m_Sessions.AppendLogBatch(SessionId, Batch); } else { - // Structured log (JSON or CbObject) - // Accepts a single record or an "entries" array of records + // Structured log (JSON or CbObject). Accepts a single record + // or an "entries" array of records — collect into a batch so + // a single POST lands atomically and fires one WS push. CbObject RequestObject = ServerRequest.ReadPayloadObject(); const DateTime Now = DateTime::Now(); + // 64 inline slots covers the typical SendLogBatch posting size + // (~50) without touching the heap. Spills to heap beyond that. + // LogEntryInput's string_views borrow from the parsed + // RequestObject's underlying buffer (the logger / message / + // format strings on the wire); we keep RequestObject alive + // for the whole intake. + eastl::fixed_vector<SessionsService::LogEntryInput, 64> Batch; + + // Stable backing for messages we render from a structured + // template. fixed_list never moves nodes on insertion, so + // string_views into these strings stay valid until the list + // is destroyed at handler exit. 64 inline nodes match the + // batch's fixed-vector inline cap; spills to heap if a POST + // brings more. + eastl::fixed_list<std::string, 64> RenderedMessages; + auto AppendFromObject = [&](CbObjectView Obj) { - CbFieldView LevelField = Obj["level"sv]; - std::string_view Level; + CbFieldView LevelField = Obj["level"sv]; + logging::LogLevel Level = logging::Off; if (LevelField.IsString()) { - Level = LevelField.AsString(); + Level = logging::ParseLogLevelString(LevelField.AsString()); } else if (LevelField.IsInteger()) { int32_t LevelInt = LevelField.AsInt32(); if (LevelInt >= 0 && LevelInt < logging::LogLevelCount) { - Level = logging::ToString(static_cast<logging::LogLevel>(LevelInt)); + Level = static_cast<logging::LogLevel>(LevelInt); } } - std::string Message(Obj["message"sv].AsString()); - CbObjectView DataView = Obj["data"sv].AsObjectView(); - - Session->AppendLog(SessionsService::LogEntry{ - .Timestamp = Now, - .Level = std::string(Level), - .Message = std::move(Message), - .Data = CbObject::Clone(DataView), + const std::string_view LoggerName = Obj["logger"sv].AsString(); + + // Two entry shapes. Structured entries carry `format` + + // `fields` and no `message` — we render the template right + // here so the rest of the pipeline (in-memory deque, + // persisted log.bin, UI GET response) keeps working the + // same way for both shapes. + CbFieldView FormatField = Obj["format"sv]; + if (FormatField.IsString()) + { + const std::string_view FormatView = FormatField.AsString(); + CbObjectView FieldsView = Obj["fields"sv].AsObjectView(); + ExtendableStringBuilder<256> RenderedBuilder; + RenderLogTemplate(FormatView, FieldsView, RenderedBuilder); + + // Anchor the rendered string in the stable list so the + // LogEntryInput's view into it stays valid until the + // AppendLogBatch call below. + RenderedMessages.emplace_back(RenderedBuilder.ToView()); + const std::string& StoredRendered = RenderedMessages.back(); + + Batch.push_back(SessionsService::LogEntryInput{ + .Timestamp = Now, + .Level = Level, + .LoggerName = LoggerName, + .Message = StoredRendered, + .Format = FormatView, + .Fields = CbObject::Clone(FieldsView), + }); + return; + } + + // Plain entry. + Batch.push_back(SessionsService::LogEntryInput{ + .Timestamp = Now, + .Level = Level, + .LoggerName = LoggerName, + .Message = Obj["message"sv].AsString(), }); }; CbFieldView EntriesField = RequestObject["entries"sv]; if (EntriesField.IsArray()) { - for (CbFieldView Entry : EntriesField) + // Pre-reserve so the 50-ish entries from a typical + // SendLogBatch don't trigger 4-5 reallocations as the + // vector grows. + CbArrayView Arr = EntriesField.AsArrayView(); + Batch.reserve(Arr.Num()); + for (CbFieldView Entry : Arr) { AppendFromObject(Entry.AsObjectView()); } } else { + Batch.reserve(1); AppendFromObject(RequestObject); } + m_Sessions.AppendLogBatch(SessionId, Batch); } return ServerRequest.WriteResponse(HttpResponseCode::OK); @@ -547,13 +712,78 @@ HttpSessionsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::s { ZEN_UNUSED(RelativeUri); ZEN_INFO("Sessions WebSocket client connected"); - m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); + const uint64_t NewId = m_NextSubscriberId.fetch_add(1, std::memory_order_relaxed); + m_WsConnectionsLock.WithExclusiveLock( + [&] { m_WsConnections.push_back(WsSubscriber{.Connection = std::move(Connection), .Id = NewId}); }); } void -HttpSessionsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/) +HttpSessionsService::OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg) { - // No client-to-server messages expected + // Expected client→server protocol is JSON text frames; see + // sessions.js → _ws_send. Binary frames and malformed JSON are logged + // at debug and ignored so a confused client can't disturb others. + if (Msg.Opcode != WebSocketOpcode::kText) + { + return; + } + std::string_view PayloadText(static_cast<const char*>(Msg.Payload.GetData()), Msg.Payload.GetSize()); + std::string ParseError; + json11::Json Parsed = json11::Json::parse(std::string(PayloadText), ParseError); + if (!ParseError.empty() || !Parsed.is_object()) + { + ZEN_DEBUG("Ignoring malformed WebSocket frame: {}", ParseError.empty() ? "not an object" : ParseError); + return; + } + + const std::string& Type = Parsed["type"].string_value(); + if (Type == "sub_log") + { + const Oid SessionId = Oid::TryFromHexString(Parsed["session"].string_value()); + if (SessionId == Oid::Zero) + { + ZEN_DEBUG("sub_log with invalid session id '{}'", Parsed["session"].string_value()); + return; + } + // json11 reports int via int_value() (32-bit); cursors fit easily + // inside a session's lifetime so this is fine for the foreseeable + // future. Negative values are treated as 0. + const int CursorRaw = Parsed["cursor"].int_value(); + const uint64_t Cursor = CursorRaw > 0 ? static_cast<uint64_t>(CursorRaw) : 0; + + // Record the subscription and fire an immediate delta so we don't + // drop entries that landed between the client's HTTP replay and + // this frame. See BroadcastLogAppended for the broadcast flow. + m_WsConnectionsLock.WithExclusiveLock([&] { + for (WsSubscriber& Sub : m_WsConnections) + { + if (Sub.Connection.Get() == &Conn) + { + Sub.SubscribedSessionId = SessionId; + Sub.LastSentCursor = Cursor; + break; + } + } + }); + // Pass UINT64_MAX to force a flush even if the cursor hasn't + // advanced — the subscriber's LastSentCursor may already lag the + // tail (e.g. rapid posts before the client subscribed). + BroadcastLogAppended(SessionId, std::numeric_limits<uint64_t>::max()); + } + else if (Type == "unsub_log") + { + m_WsConnectionsLock.WithExclusiveLock([&] { + for (WsSubscriber& Sub : m_WsConnections) + { + if (Sub.Connection.Get() == &Conn) + { + Sub.Unsubscribe(); + break; + } + } + }); + } + // Unknown types are silently ignored so the protocol can grow. } void @@ -561,8 +791,8 @@ HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused] { ZEN_INFO("Sessions WebSocket client disconnected (code {})", Code); m_WsConnectionsLock.WithExclusiveLock([&] { - auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) { - return C.Get() == &Conn; + auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const WsSubscriber& Sub) { + return Sub.Connection.Get() == &Conn; }); m_WsConnections.erase(It, m_WsConnections.end()); }); @@ -571,8 +801,15 @@ HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused] void HttpSessionsService::BroadcastSessions() { - std::vector<Ref<WebSocketConnection>> Connections; - m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; }); + // 8 inline slots covers any realistic number of concurrent UI tabs; + // spills to heap beyond that. + eastl::fixed_vector<Ref<WebSocketConnection>, 8> Connections; + m_WsConnectionsLock.WithSharedLock([&] { + for (const WsSubscriber& Sub : m_WsConnections) + { + Connections.push_back(Sub.Connection); + } + }); if (Connections.empty()) { @@ -593,6 +830,107 @@ HttpSessionsService::BroadcastSessions() } void +HttpSessionsService::BroadcastLogAppended(const Oid& SessionId, uint64_t NewCursor) +{ + Ref<SessionsService::Session> Session = m_Sessions.GetSession(SessionId); + if (!Session) + { + // Session vanished (e.g. pruned) between the append and the + // broadcast. No entries to ship. + return; + } + + // Claim each subscriber's cursor and snapshot its delta atomically under + // the exclusive WS lock. Doing claim+fetch+cursor-bump together — rather + // than snapshot-shared / fetch-unlocked / bump-exclusive — closes the + // race where two concurrent BroadcastLogAppended calls would both + // observe the same FromCursor, fetch overlapping ranges, and ship the + // subscriber duplicate entries. Sends still happen after the lock is + // released to avoid holding it across async socket I/O. + struct PendingSend + { + Ref<WebSocketConnection> Connection; + SessionsService::Session::CursorResult Delta; + bool InitialSend; // true when FromCursor == 0 + }; + // 8 inline slots keeps the broadcast allocation-free for the typical UI + // case (1-2 tabs tailing one session); spills to heap if many clients + // happen to subscribe to the same session at once. + eastl::fixed_vector<PendingSend, 8> Sends; + m_WsConnectionsLock.WithExclusiveLock([&] { + for (WsSubscriber& Sub : m_WsConnections) + { + if (!Sub.IsSubscribedTo(SessionId)) + { + continue; + } + // Cheap gate: if the subscriber already has everything up to + // NewCursor, skip. Sub_log uses UINT64_MAX to force a flush. + if (NewCursor != std::numeric_limits<uint64_t>::max() && Sub.LastSentCursor >= NewCursor) + { + continue; + } + if (!Sub.Connection->IsOpen()) + { + continue; + } + const uint64_t FromCursor = Sub.LastSentCursor; + SessionsService::Session::CursorResult Delta = Session->GetLogEntriesAfter(FromCursor); + Sub.LastSentCursor = Delta.Cursor; + Sends.push_back({Sub.Connection, std::move(Delta), FromCursor == 0}); + } + }); + if (Sends.empty()) + { + return; + } + + // Render the hex id into a stack buffer — CbWriter only needs a + // string_view, so we avoid the 24-byte std::string allocation that + // Oid::ToString() would otherwise do on every broadcast. The buffer + // is StringLength + 1 because ToString writes a trailing NUL beyond + // the 24 hex chars; the view itself excludes the NUL. + char HexSessionIdBuf[Oid::StringLength + 1]; + SessionId.ToString(HexSessionIdBuf); + const std::string_view HexSessionId(HexSessionIdBuf, Oid::StringLength); + for (const PendingSend& Send : Sends) + { + if (Send.Delta.Entries.empty() && !Send.InitialSend) + { + // Nothing new and the subscriber was primed — nothing to send. + continue; + } + + // Binary CB frame — the client already has a CB parser + // (util/compactbinary.js). CB keeps structured entries typed end- + // to-end (hashes, ints, dates stay that way on the wire) and skips + // JSON escaping overhead on every append. Shape mirrors the HTTP + // GET response plus two routing fields (type + session). A fresh + // CbObjectWriter per iteration is required because the ctor calls + // BeginObject() to set up the implicit outer object — Save() then + // finalizes that object, leaving the writer in a state that + // Reset() doesn't restore. + CbObjectWriter Response; + Response << "type"sv + << "log"sv; + Response << "session"sv << HexSessionId; + Response << "cursor"sv << Send.Delta.Cursor; + Response << "count"sv << Send.Delta.Count; + Response.BeginArray("entries"sv); + for (const SessionsService::LogEntry& Entry : Send.Delta.Entries) + { + Response.BeginObject(); + WriteLogEntry(Response, Entry); + Response.EndObject(); + } + Response.EndArray(); + + CbObject Obj = Response.Save(); + Send.Connection->SendBinary(Obj.GetView()); + } +} + +void HttpSessionsService::EnqueuePushTimer() { m_PushTimer.expires_after(std::chrono::seconds(2)); @@ -607,4 +945,82 @@ HttpSessionsService::EnqueuePushTimer() }); } +////////////////////////////////////////////////////////////////////////// +// +// Periodic cleanup of expired / excess sessions +// + +void +HttpSessionsService::RunCleanup() +{ + const TimeSpan MaxAge = TimeSpan(SessionsService::kDefaultMaxSessionAgeDays, 0, 0, 0); + const size_t MaxCount = SessionsService::kDefaultMaxSessionCount; + const uint64_t MaxBytes = SessionsService::kDefaultMaxStorageBytes; + const SessionsService::PruneResult Result = m_Sessions.PruneExpired(MaxAge, MaxCount, MaxBytes); + if (Result.ExpiredByAge + Result.ExpiredByCount + Result.ExpiredByStorage > 0) + { + ZEN_INFO("Sessions cleanup: pruned {} by age, {} by count, {} by storage (max {} days, max {} sessions, max {} MiB)", + Result.ExpiredByAge, + Result.ExpiredByCount, + Result.ExpiredByStorage, + SessionsService::kDefaultMaxSessionAgeDays, + MaxCount, + MaxBytes / (1024 * 1024)); + } +} + +void +HttpSessionsService::EnqueueCleanupTimer() +{ + m_CleanupTimer.expires_after(std::chrono::hours(1)); + m_CleanupTimer.async_wait([this](const asio::error_code& Ec) { + if (Ec) + { + return; + } + RunCleanup(); + EnqueueCleanupTimer(); + }); +} + +////////////////////////////////////////////////////////////////////////// +// +// Periodic liveness check for tracked local client processes +// + +void +HttpSessionsService::RunLivenessCheck() +{ + const size_t EndedByDeadClient = m_Sessions.CheckProcessLiveness(); + if (EndedByDeadClient > 0) + { + ZEN_INFO("Sessions liveness: ended {} session(s) whose client process had exited", EndedByDeadClient); + } + else + { + // Debug-level so this doesn't spam at info every 30s, but lets an + // operator who's specifically investigating why their crashed + // session didn't clean up see whether anything is being tracked. + ZEN_DEBUG("Sessions liveness: no dead client processes found"); + } +} + +void +HttpSessionsService::EnqueueLivenessTimer() +{ + // 30s strikes a balance between crash-detection latency and + // per-session OpenProcess/GetExitCode overhead. Active sessions with + // no reported pid (remote clients) are skipped in the inner loop so + // the cost scales with local sessions only. + m_LivenessTimer.expires_after(std::chrono::seconds(30)); + m_LivenessTimer.async_wait([this](const asio::error_code& Ec) { + if (Ec) + { + return; + } + RunLivenessCheck(); + EnqueueLivenessTimer(); + }); +} + } // namespace zen |