aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/sessions
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/sessions')
-rw-r--r--src/zenserver/sessions/httpsessions.cpp504
-rw-r--r--src/zenserver/sessions/httpsessions.h60
-rw-r--r--src/zenserver/sessions/inprocsessionlogsink.cpp37
-rw-r--r--src/zenserver/sessions/logtemplate.cpp390
-rw-r--r--src/zenserver/sessions/logtemplate.h42
-rw-r--r--src/zenserver/sessions/sessions.cpp1166
-rw-r--r--src/zenserver/sessions/sessions.h193
7 files changed, 2262 insertions, 130 deletions
diff --git a/src/zenserver/sessions/httpsessions.cpp b/src/zenserver/sessions/httpsessions.cpp
index 88db36828..1678ede60 100644
--- a/src/zenserver/sessions/httpsessions.cpp
+++ b/src/zenserver/sessions/httpsessions.cpp
@@ -7,8 +7,17 @@
#include <zencore/logging.h>
#include <zencore/string.h>
#include <zencore/trace.h>
+#include "logtemplate.h"
#include "sessions.h"
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <EASTL/fixed_list.h>
+#include <EASTL/fixed_vector.h>
+#include <json11.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <limits>
+
namespace zen {
using namespace std::literals;
@@ -21,13 +30,21 @@ HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService,
, m_StatsService(StatsService)
, m_Sessions(Sessions)
, m_PushTimer(IoContext)
+, m_CleanupTimer(IoContext)
+, m_LivenessTimer(IoContext)
{
Initialize();
}
HttpSessionsService::~HttpSessionsService()
{
+ // Break the callback edge before tearing anything else down so a
+ // late AppendLog on another thread can't fire BroadcastLogAppended
+ // after our subscriber list is gone.
+ m_Sessions.SetLogAppendedCallback({});
m_PushTimer.cancel();
+ m_CleanupTimer.cancel();
+ m_LivenessTimer.cancel();
m_StatsService.UnregisterHandler("sessions", *this);
m_StatusService.UnregisterHandler("sessions", *this);
}
@@ -135,12 +152,36 @@ HttpSessionsService::Initialize()
m_StatsService.RegisterHandler("sessions", *this);
m_StatusService.RegisterHandler("sessions", *this);
+ // Event-driven log push: the service fires this every time an entry
+ // is appended (including the synthetic "session ended" line emitted
+ // by RemoveSession). Subscribers receive a binary CB frame carrying
+ // the delta. Safe to call BroadcastLogAppended from any thread — it
+ // does its own locking and SendBinary is async-queued by the WS
+ // transport.
+ m_Sessions.SetLogAppendedCallback([this](const Oid& SessionId, uint64_t NewCursor) { BroadcastLogAppended(SessionId, NewCursor); });
+
EnqueuePushTimer();
+
+ // Run a cleanup pass shortly after startup so freshly-loaded historical
+ // data is pruned even if the server doesn't stay up for an hour.
+ m_CleanupTimer.expires_after(std::chrono::seconds(30));
+ m_CleanupTimer.async_wait([this](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+ RunCleanup();
+ EnqueueCleanupTimer();
+ });
+
+ EnqueueLivenessTimer();
}
static void
-WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info)
+WriteSessionInfo(CbWriter& Writer, const SessionsService::Session& Session)
{
+ const SessionsService::SessionInfo& Info = Session.Info();
+
Writer << "id" << Info.Id;
if (!Info.AppName.empty())
{
@@ -150,6 +191,18 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info)
{
Writer << "mode" << Info.Mode;
}
+ if (!Info.Platform.empty())
+ {
+ Writer << "platform" << Info.Platform;
+ }
+ if (Info.ClientPid != 0)
+ {
+ Writer << "pid" << Info.ClientPid;
+ }
+ if (Info.ParentSessionId != Oid::Zero)
+ {
+ Writer << "parent_session_id" << Info.ParentSessionId;
+ }
if (Info.JobId != Oid::Zero)
{
Writer << "jobid" << Info.JobId;
@@ -161,6 +214,11 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info)
Writer << "ended_at" << Info.EndedAt;
}
+ if (const uint64_t LogCount = Session.GetLogCount(); LogCount > 0)
+ {
+ Writer << "log_count" << LogCount;
+ }
+
if (Info.Metadata.GetSize() > 0)
{
Writer.AddObject("metadata"sv, Info.Metadata);
@@ -182,13 +240,13 @@ HttpSessionsService::BuildSessionListResponse()
for (const Ref<SessionsService::Session>& Session : Active)
{
Response.BeginObject();
- WriteSessionInfo(Response, Session->Info());
+ WriteSessionInfo(Response, *Session);
Response.EndObject();
}
for (const Ref<SessionsService::Session>& Session : Ended)
{
Response.BeginObject();
- WriteSessionInfo(Response, Session->Info());
+ WriteSessionInfo(Response, *Session);
Response.EndObject();
}
Response.EndArray();
@@ -231,7 +289,7 @@ HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req)
for (const Ref<SessionsService::Session>& Session : Sessions)
{
Response.BeginObject();
- WriteSessionInfo(Response, Session->Info());
+ WriteSessionInfo(Response, *Session);
Response.EndObject();
}
Response.EndArray();
@@ -262,24 +320,51 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
{
CbObject RequestObject = ServerRequest.ReadPayloadObject();
+ // Render the id into a stack buffer once for any success-reply
+ // paths below — avoids a std::string per POST/PUT.
+ char IdBuf[Oid::StringLength + 1] = {};
+ SessionId.ToString(IdBuf);
+ const std::string_view IdStr(IdBuf, Oid::StringLength);
+
if (ServerRequest.RequestVerb() == HttpVerb::kPost)
{
std::string AppName(RequestObject["appname"sv].AsString());
std::string Mode(RequestObject["mode"sv].AsString());
- Oid JobId = RequestObject["jobid"sv].AsObjectId();
- CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView();
+ std::string Platform(RequestObject["platform"sv].AsString());
+ Oid ParentSessionId = RequestObject["parent_session_id"sv].AsObjectId();
+ Oid JobId = RequestObject["jobid"sv].AsObjectId();
+ CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView();
+
+ // Only trust a client-reported pid when the HTTP layer
+ // says the request is local (unix socket or a loopback
+ // TCP peer). A remote client's pid refers to a different
+ // machine's process table — opening a local handle with
+ // it would at best be meaningless, at worst a liveness
+ // false positive.
+ uint32_t ClientPid = 0;
+ if (ServerRequest.IsLocalMachineRequest())
+ {
+ ClientPid = RequestObject["pid"sv].AsUInt32();
+ }
m_SessionsStats.SessionWriteCount++;
- if (m_Sessions.RegisterSession(SessionId, std::move(AppName), std::move(Mode), JobId, MetadataView))
+ if (m_Sessions.RegisterSession(SessionId,
+ std::move(AppName),
+ std::move(Mode),
+ std::move(Platform),
+ ClientPid,
+ ParentSessionId,
+ JobId,
+ MetadataView))
{
- return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", SessionId));
+ return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, IdStr);
}
else
{
// Already exists - try update instead
if (m_Sessions.UpdateSession(SessionId, MetadataView))
{
- return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", SessionId));
+ return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, IdStr);
}
return ServerRequest.WriteResponse(HttpResponseCode::InternalServerError);
}
@@ -290,7 +375,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
m_SessionsStats.SessionWriteCount++;
if (m_Sessions.UpdateSession(SessionId, RequestObject["metadata"sv].AsObjectView()))
{
- return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", SessionId));
+ return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, IdStr);
}
return ServerRequest.WriteResponse(HttpResponseCode::NotFound,
HttpContentType::kText,
@@ -304,7 +389,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
if (Session)
{
CbObjectWriter Response;
- WriteSessionInfo(Response, Session->Info());
+ WriteSessionInfo(Response, *Session);
return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save());
}
return ServerRequest.WriteResponse(HttpResponseCode::NotFound);
@@ -312,7 +397,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
case HttpVerb::kDelete:
{
m_SessionsStats.SessionDeleteCount++;
- if (m_Sessions.RemoveSession(SessionId))
+ if (m_Sessions.RemoveSession(SessionId, "client request"sv))
{
return ServerRequest.WriteResponse(HttpResponseCode::OK);
}
@@ -334,17 +419,33 @@ static void
WriteLogEntry(CbWriter& Writer, const SessionsService::LogEntry& Entry)
{
Writer << "timestamp" << Entry.Timestamp;
- if (!Entry.Level.empty())
+ if (Entry.Level != logging::Off)
{
- Writer << "level" << Entry.Level;
+ // Frontend renders on the string form (CSS class derives from it), so
+ // keep the wire format as the canonical lowercase name.
+ Writer << "level" << logging::ToString(Entry.Level);
}
- if (!Entry.Message.empty())
+ const std::string_view LoggerName{Entry.LoggerName};
+ if (!LoggerName.empty())
{
- Writer << "message" << Entry.Message;
+ Writer << "logger" << LoggerName;
}
- if (Entry.Data.GetSize() > 0)
+ const std::string_view Message{Entry.Message};
+ if (!Message.empty())
{
- Writer.AddObject("data"sv, Entry.Data);
+ Writer << "message" << Message;
+ }
+ // Structured-log form alongside the rendered message so a future UI
+ // can offer field-level drill-down without another schema bump. The
+ // existing UI only looks at "message" and is unaffected.
+ const std::string_view Format{Entry.Format};
+ if (!Format.empty())
+ {
+ Writer << "format" << Format;
+ if (Entry.Fields.GetSize() > 0)
+ {
+ Writer.AddObject("fields"sv, Entry.Fields);
+ }
}
}
@@ -378,12 +479,21 @@ HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req)
if (ServerRequest.RequestContentType() == HttpContentType::kText)
{
- // Raw text - split by newlines, one entry per line
+ // Raw text - split by newlines, one entry per line. Collect
+ // into a batch and append atomically: keeps a single client's
+ // payload contiguous on the wire even when other clients race
+ // in, and fires the WS push observer just once for the whole
+ // batch instead of once per line.
IoBuffer Payload = ServerRequest.ReadPayload();
std::string_view Text(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
const DateTime Now = DateTime::Now();
- size_t Pos = 0;
+ // 64 inline slots covers the typical SendLogBatch posting size
+ // (~50) without touching the heap. Spills to heap beyond that.
+ // LogEntryInput's string_views point into the request payload
+ // (Text), which lives for the duration of this handler.
+ eastl::fixed_vector<SessionsService::LogEntryInput, 64> Batch;
+ size_t Pos = 0;
while (Pos < Text.size())
{
size_t End = Text.find('\n', Pos);
@@ -401,60 +511,115 @@ HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req)
if (!Line.empty())
{
- Session->AppendLog(SessionsService::LogEntry{
+ Batch.push_back(SessionsService::LogEntryInput{
.Timestamp = Now,
- .Message = std::string(Line),
+ .Message = Line,
});
}
Pos = End + 1;
}
+ m_Sessions.AppendLogBatch(SessionId, Batch);
}
else
{
- // Structured log (JSON or CbObject)
- // Accepts a single record or an "entries" array of records
+ // Structured log (JSON or CbObject). Accepts a single record
+ // or an "entries" array of records — collect into a batch so
+ // a single POST lands atomically and fires one WS push.
CbObject RequestObject = ServerRequest.ReadPayloadObject();
const DateTime Now = DateTime::Now();
+ // 64 inline slots covers the typical SendLogBatch posting size
+ // (~50) without touching the heap. Spills to heap beyond that.
+ // LogEntryInput's string_views borrow from the parsed
+ // RequestObject's underlying buffer (the logger / message /
+ // format strings on the wire); we keep RequestObject alive
+ // for the whole intake.
+ eastl::fixed_vector<SessionsService::LogEntryInput, 64> Batch;
+
+ // Stable backing for messages we render from a structured
+ // template. fixed_list never moves nodes on insertion, so
+ // string_views into these strings stay valid until the list
+ // is destroyed at handler exit. 64 inline nodes match the
+ // batch's fixed-vector inline cap; spills to heap if a POST
+ // brings more.
+ eastl::fixed_list<std::string, 64> RenderedMessages;
+
auto AppendFromObject = [&](CbObjectView Obj) {
- CbFieldView LevelField = Obj["level"sv];
- std::string_view Level;
+ CbFieldView LevelField = Obj["level"sv];
+ logging::LogLevel Level = logging::Off;
if (LevelField.IsString())
{
- Level = LevelField.AsString();
+ Level = logging::ParseLogLevelString(LevelField.AsString());
}
else if (LevelField.IsInteger())
{
int32_t LevelInt = LevelField.AsInt32();
if (LevelInt >= 0 && LevelInt < logging::LogLevelCount)
{
- Level = logging::ToString(static_cast<logging::LogLevel>(LevelInt));
+ Level = static_cast<logging::LogLevel>(LevelInt);
}
}
- std::string Message(Obj["message"sv].AsString());
- CbObjectView DataView = Obj["data"sv].AsObjectView();
-
- Session->AppendLog(SessionsService::LogEntry{
- .Timestamp = Now,
- .Level = std::string(Level),
- .Message = std::move(Message),
- .Data = CbObject::Clone(DataView),
+ const std::string_view LoggerName = Obj["logger"sv].AsString();
+
+ // Two entry shapes. Structured entries carry `format` +
+ // `fields` and no `message` — we render the template right
+ // here so the rest of the pipeline (in-memory deque,
+ // persisted log.bin, UI GET response) keeps working the
+ // same way for both shapes.
+ CbFieldView FormatField = Obj["format"sv];
+ if (FormatField.IsString())
+ {
+ const std::string_view FormatView = FormatField.AsString();
+ CbObjectView FieldsView = Obj["fields"sv].AsObjectView();
+ ExtendableStringBuilder<256> RenderedBuilder;
+ RenderLogTemplate(FormatView, FieldsView, RenderedBuilder);
+
+ // Anchor the rendered string in the stable list so the
+ // LogEntryInput's view into it stays valid until the
+ // AppendLogBatch call below.
+ RenderedMessages.emplace_back(RenderedBuilder.ToView());
+ const std::string& StoredRendered = RenderedMessages.back();
+
+ Batch.push_back(SessionsService::LogEntryInput{
+ .Timestamp = Now,
+ .Level = Level,
+ .LoggerName = LoggerName,
+ .Message = StoredRendered,
+ .Format = FormatView,
+ .Fields = CbObject::Clone(FieldsView),
+ });
+ return;
+ }
+
+ // Plain entry.
+ Batch.push_back(SessionsService::LogEntryInput{
+ .Timestamp = Now,
+ .Level = Level,
+ .LoggerName = LoggerName,
+ .Message = Obj["message"sv].AsString(),
});
};
CbFieldView EntriesField = RequestObject["entries"sv];
if (EntriesField.IsArray())
{
- for (CbFieldView Entry : EntriesField)
+ // Pre-reserve so the 50-ish entries from a typical
+ // SendLogBatch don't trigger 4-5 reallocations as the
+ // vector grows.
+ CbArrayView Arr = EntriesField.AsArrayView();
+ Batch.reserve(Arr.Num());
+ for (CbFieldView Entry : Arr)
{
AppendFromObject(Entry.AsObjectView());
}
}
else
{
+ Batch.reserve(1);
AppendFromObject(RequestObject);
}
+ m_Sessions.AppendLogBatch(SessionId, Batch);
}
return ServerRequest.WriteResponse(HttpResponseCode::OK);
@@ -547,13 +712,78 @@ HttpSessionsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::s
{
ZEN_UNUSED(RelativeUri);
ZEN_INFO("Sessions WebSocket client connected");
- m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); });
+ const uint64_t NewId = m_NextSubscriberId.fetch_add(1, std::memory_order_relaxed);
+ m_WsConnectionsLock.WithExclusiveLock(
+ [&] { m_WsConnections.push_back(WsSubscriber{.Connection = std::move(Connection), .Id = NewId}); });
}
void
-HttpSessionsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/)
+HttpSessionsService::OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg)
{
- // No client-to-server messages expected
+ // Expected client→server protocol is JSON text frames; see
+ // sessions.js → _ws_send. Binary frames and malformed JSON are logged
+ // at debug and ignored so a confused client can't disturb others.
+ if (Msg.Opcode != WebSocketOpcode::kText)
+ {
+ return;
+ }
+ std::string_view PayloadText(static_cast<const char*>(Msg.Payload.GetData()), Msg.Payload.GetSize());
+ std::string ParseError;
+ json11::Json Parsed = json11::Json::parse(std::string(PayloadText), ParseError);
+ if (!ParseError.empty() || !Parsed.is_object())
+ {
+ ZEN_DEBUG("Ignoring malformed WebSocket frame: {}", ParseError.empty() ? "not an object" : ParseError);
+ return;
+ }
+
+ const std::string& Type = Parsed["type"].string_value();
+ if (Type == "sub_log")
+ {
+ const Oid SessionId = Oid::TryFromHexString(Parsed["session"].string_value());
+ if (SessionId == Oid::Zero)
+ {
+ ZEN_DEBUG("sub_log with invalid session id '{}'", Parsed["session"].string_value());
+ return;
+ }
+ // json11 reports int via int_value() (32-bit); cursors fit easily
+ // inside a session's lifetime so this is fine for the foreseeable
+ // future. Negative values are treated as 0.
+ const int CursorRaw = Parsed["cursor"].int_value();
+ const uint64_t Cursor = CursorRaw > 0 ? static_cast<uint64_t>(CursorRaw) : 0;
+
+ // Record the subscription and fire an immediate delta so we don't
+ // drop entries that landed between the client's HTTP replay and
+ // this frame. See BroadcastLogAppended for the broadcast flow.
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ for (WsSubscriber& Sub : m_WsConnections)
+ {
+ if (Sub.Connection.Get() == &Conn)
+ {
+ Sub.SubscribedSessionId = SessionId;
+ Sub.LastSentCursor = Cursor;
+ break;
+ }
+ }
+ });
+ // Pass UINT64_MAX to force a flush even if the cursor hasn't
+ // advanced — the subscriber's LastSentCursor may already lag the
+ // tail (e.g. rapid posts before the client subscribed).
+ BroadcastLogAppended(SessionId, std::numeric_limits<uint64_t>::max());
+ }
+ else if (Type == "unsub_log")
+ {
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ for (WsSubscriber& Sub : m_WsConnections)
+ {
+ if (Sub.Connection.Get() == &Conn)
+ {
+ Sub.Unsubscribe();
+ break;
+ }
+ }
+ });
+ }
+ // Unknown types are silently ignored so the protocol can grow.
}
void
@@ -561,8 +791,8 @@ HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]
{
ZEN_INFO("Sessions WebSocket client disconnected (code {})", Code);
m_WsConnectionsLock.WithExclusiveLock([&] {
- auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) {
- return C.Get() == &Conn;
+ auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const WsSubscriber& Sub) {
+ return Sub.Connection.Get() == &Conn;
});
m_WsConnections.erase(It, m_WsConnections.end());
});
@@ -571,8 +801,15 @@ HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]
void
HttpSessionsService::BroadcastSessions()
{
- std::vector<Ref<WebSocketConnection>> Connections;
- m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; });
+ // 8 inline slots covers any realistic number of concurrent UI tabs;
+ // spills to heap beyond that.
+ eastl::fixed_vector<Ref<WebSocketConnection>, 8> Connections;
+ m_WsConnectionsLock.WithSharedLock([&] {
+ for (const WsSubscriber& Sub : m_WsConnections)
+ {
+ Connections.push_back(Sub.Connection);
+ }
+ });
if (Connections.empty())
{
@@ -593,6 +830,107 @@ HttpSessionsService::BroadcastSessions()
}
void
+HttpSessionsService::BroadcastLogAppended(const Oid& SessionId, uint64_t NewCursor)
+{
+ Ref<SessionsService::Session> Session = m_Sessions.GetSession(SessionId);
+ if (!Session)
+ {
+ // Session vanished (e.g. pruned) between the append and the
+ // broadcast. No entries to ship.
+ return;
+ }
+
+ // Claim each subscriber's cursor and snapshot its delta atomically under
+ // the exclusive WS lock. Doing claim+fetch+cursor-bump together — rather
+ // than snapshot-shared / fetch-unlocked / bump-exclusive — closes the
+ // race where two concurrent BroadcastLogAppended calls would both
+ // observe the same FromCursor, fetch overlapping ranges, and ship the
+ // subscriber duplicate entries. Sends still happen after the lock is
+ // released to avoid holding it across async socket I/O.
+ struct PendingSend
+ {
+ Ref<WebSocketConnection> Connection;
+ SessionsService::Session::CursorResult Delta;
+ bool InitialSend; // true when FromCursor == 0
+ };
+ // 8 inline slots keeps the broadcast allocation-free for the typical UI
+ // case (1-2 tabs tailing one session); spills to heap if many clients
+ // happen to subscribe to the same session at once.
+ eastl::fixed_vector<PendingSend, 8> Sends;
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ for (WsSubscriber& Sub : m_WsConnections)
+ {
+ if (!Sub.IsSubscribedTo(SessionId))
+ {
+ continue;
+ }
+ // Cheap gate: if the subscriber already has everything up to
+ // NewCursor, skip. Sub_log uses UINT64_MAX to force a flush.
+ if (NewCursor != std::numeric_limits<uint64_t>::max() && Sub.LastSentCursor >= NewCursor)
+ {
+ continue;
+ }
+ if (!Sub.Connection->IsOpen())
+ {
+ continue;
+ }
+ const uint64_t FromCursor = Sub.LastSentCursor;
+ SessionsService::Session::CursorResult Delta = Session->GetLogEntriesAfter(FromCursor);
+ Sub.LastSentCursor = Delta.Cursor;
+ Sends.push_back({Sub.Connection, std::move(Delta), FromCursor == 0});
+ }
+ });
+ if (Sends.empty())
+ {
+ return;
+ }
+
+ // Render the hex id into a stack buffer — CbWriter only needs a
+ // string_view, so we avoid the 24-byte std::string allocation that
+ // Oid::ToString() would otherwise do on every broadcast. The buffer
+ // is StringLength + 1 because ToString writes a trailing NUL beyond
+ // the 24 hex chars; the view itself excludes the NUL.
+ char HexSessionIdBuf[Oid::StringLength + 1];
+ SessionId.ToString(HexSessionIdBuf);
+ const std::string_view HexSessionId(HexSessionIdBuf, Oid::StringLength);
+ for (const PendingSend& Send : Sends)
+ {
+ if (Send.Delta.Entries.empty() && !Send.InitialSend)
+ {
+ // Nothing new and the subscriber was primed — nothing to send.
+ continue;
+ }
+
+ // Binary CB frame — the client already has a CB parser
+ // (util/compactbinary.js). CB keeps structured entries typed end-
+ // to-end (hashes, ints, dates stay that way on the wire) and skips
+ // JSON escaping overhead on every append. Shape mirrors the HTTP
+ // GET response plus two routing fields (type + session). A fresh
+ // CbObjectWriter per iteration is required because the ctor calls
+ // BeginObject() to set up the implicit outer object — Save() then
+ // finalizes that object, leaving the writer in a state that
+ // Reset() doesn't restore.
+ CbObjectWriter Response;
+ Response << "type"sv
+ << "log"sv;
+ Response << "session"sv << HexSessionId;
+ Response << "cursor"sv << Send.Delta.Cursor;
+ Response << "count"sv << Send.Delta.Count;
+ Response.BeginArray("entries"sv);
+ for (const SessionsService::LogEntry& Entry : Send.Delta.Entries)
+ {
+ Response.BeginObject();
+ WriteLogEntry(Response, Entry);
+ Response.EndObject();
+ }
+ Response.EndArray();
+
+ CbObject Obj = Response.Save();
+ Send.Connection->SendBinary(Obj.GetView());
+ }
+}
+
+void
HttpSessionsService::EnqueuePushTimer()
{
m_PushTimer.expires_after(std::chrono::seconds(2));
@@ -607,4 +945,82 @@ HttpSessionsService::EnqueuePushTimer()
});
}
+//////////////////////////////////////////////////////////////////////////
+//
+// Periodic cleanup of expired / excess sessions
+//
+
+void
+HttpSessionsService::RunCleanup()
+{
+ const TimeSpan MaxAge = TimeSpan(SessionsService::kDefaultMaxSessionAgeDays, 0, 0, 0);
+ const size_t MaxCount = SessionsService::kDefaultMaxSessionCount;
+ const uint64_t MaxBytes = SessionsService::kDefaultMaxStorageBytes;
+ const SessionsService::PruneResult Result = m_Sessions.PruneExpired(MaxAge, MaxCount, MaxBytes);
+ if (Result.ExpiredByAge + Result.ExpiredByCount + Result.ExpiredByStorage > 0)
+ {
+ ZEN_INFO("Sessions cleanup: pruned {} by age, {} by count, {} by storage (max {} days, max {} sessions, max {} MiB)",
+ Result.ExpiredByAge,
+ Result.ExpiredByCount,
+ Result.ExpiredByStorage,
+ SessionsService::kDefaultMaxSessionAgeDays,
+ MaxCount,
+ MaxBytes / (1024 * 1024));
+ }
+}
+
+void
+HttpSessionsService::EnqueueCleanupTimer()
+{
+ m_CleanupTimer.expires_after(std::chrono::hours(1));
+ m_CleanupTimer.async_wait([this](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+ RunCleanup();
+ EnqueueCleanupTimer();
+ });
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Periodic liveness check for tracked local client processes
+//
+
+void
+HttpSessionsService::RunLivenessCheck()
+{
+ const size_t EndedByDeadClient = m_Sessions.CheckProcessLiveness();
+ if (EndedByDeadClient > 0)
+ {
+ ZEN_INFO("Sessions liveness: ended {} session(s) whose client process had exited", EndedByDeadClient);
+ }
+ else
+ {
+ // Debug-level so this doesn't spam at info every 30s, but lets an
+ // operator who's specifically investigating why their crashed
+ // session didn't clean up see whether anything is being tracked.
+ ZEN_DEBUG("Sessions liveness: no dead client processes found");
+ }
+}
+
+void
+HttpSessionsService::EnqueueLivenessTimer()
+{
+ // 30s strikes a balance between crash-detection latency and
+ // per-session OpenProcess/GetExitCode overhead. Active sessions with
+ // no reported pid (remote clients) are skipped in the inner loop so
+ // the cost scales with local sessions only.
+ m_LivenessTimer.expires_after(std::chrono::seconds(30));
+ m_LivenessTimer.async_wait([this](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+ RunLivenessCheck();
+ EnqueueLivenessTimer();
+ });
+}
+
} // namespace zen
diff --git a/src/zenserver/sessions/httpsessions.h b/src/zenserver/sessions/httpsessions.h
index 6ebe61c8d..2c0185176 100644
--- a/src/zenserver/sessions/httpsessions.h
+++ b/src/zenserver/sessions/httpsessions.h
@@ -13,6 +13,8 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <asio/steady_timer.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <atomic>
+
namespace zen {
class SessionsService;
@@ -69,14 +71,64 @@ private:
SessionsStats m_SessionsStats;
metrics::OperationTiming m_HttpRequests;
- // WebSocket push
- RwLock m_WsConnectionsLock;
- std::vector<Ref<WebSocketConnection>> m_WsConnections;
- asio::steady_timer m_PushTimer;
+ // WebSocket push.
+ //
+ // Each connection can subscribe to a single session's log stream. The
+ // subscription is optional (SubscribedSessionId == Oid::Zero means
+ // "session-list broadcasts only"). LastSentCursor is the cursor value
+ // most recently delivered for the subscribed session; the broadcaster
+ // uses it to pull the correct delta from the service.
+ //
+ // Id is a process-monotonic generation token assigned at OnWebSocket-
+ // Open. Pointer matching is fine for OnWebSocketMessage /
+ // OnWebSocketClose where the live `WebSocketConnection&` parameter is
+ // unambiguous; Id-based matching is reserved for any future code path
+ // that wants to refer to a subscriber across lock releases without
+ // risking a slot-reuse mix-up.
+ struct WsSubscriber
+ {
+ Ref<WebSocketConnection> Connection;
+ uint64_t Id = 0;
+ Oid SubscribedSessionId = Oid::Zero;
+ uint64_t LastSentCursor = 0;
+
+ // True iff the subscriber is currently subscribed to `Session`.
+ // Centralizes the (SubscribedSessionId != Oid::Zero) sentinel
+ // check that broadcaster, cursor-bump path, and any future filter
+ // would otherwise each open-code.
+ bool IsSubscribedTo(const Oid& Session) const { return SubscribedSessionId != Oid::Zero && SubscribedSessionId == Session; }
+
+ // Drop the active subscription. After this returns, IsSubscribedTo
+ // is false for every session id.
+ void Unsubscribe()
+ {
+ SubscribedSessionId = Oid::Zero;
+ LastSentCursor = 0;
+ }
+ };
+ RwLock m_WsConnectionsLock;
+ std::vector<WsSubscriber> m_WsConnections;
+ std::atomic_uint64_t m_NextSubscriberId{1}; // 0 reserved as "not yet assigned"
+ asio::steady_timer m_PushTimer;
void BroadcastSessions();
void EnqueuePushTimer();
+ // Event-driven log push. Called from SessionsService's log-appended
+ // callback; iterates subscribers of the given session and ships any
+ // entries they haven't seen yet.
+ void BroadcastLogAppended(const Oid& SessionId, uint64_t NewCursor);
+
+ // Periodic cleanup of old / excess ended sessions
+ asio::steady_timer m_CleanupTimer;
+ void EnqueueCleanupTimer();
+ void RunCleanup();
+
+ // Periodic client-process liveness check for locally-connected sessions.
+ asio::steady_timer m_LivenessTimer;
+ void EnqueueLivenessTimer();
+ void RunLivenessCheck();
+
Oid m_SelfSessionId = Oid::Zero;
CbObject BuildSessionListResponse();
diff --git a/src/zenserver/sessions/inprocsessionlogsink.cpp b/src/zenserver/sessions/inprocsessionlogsink.cpp
index 04c5f7312..c935522bc 100644
--- a/src/zenserver/sessions/inprocsessionlogsink.cpp
+++ b/src/zenserver/sessions/inprocsessionlogsink.cpp
@@ -12,28 +12,31 @@ static constexpr uint64_t UnixEpochBiasSeconds = uint64_t(double(1970 - 1) * 365
static DateTime
TimePointToDateTime(logging::LogClock::time_point Time)
{
- auto Duration = Time.time_since_epoch();
- auto Seconds = std::chrono::duration_cast<std::chrono::seconds>(Duration);
- uint64_t Ticks = (UnixEpochBiasSeconds + static_cast<uint64_t>(Seconds.count())) * TimeSpan::TicksPerSecond;
- return DateTime{Ticks};
+ // DateTime ticks are 100 ns each. Splitting the time_point into whole-second
+ // and sub-second parts and converting both lets us preserve sub-second
+ // precision; the previous implementation truncated to seconds, which made
+ // every entry land at .000 ms in tail / dashboard renderings.
+ auto Duration = Time.time_since_epoch();
+ auto Seconds = std::chrono::duration_cast<std::chrono::seconds>(Duration);
+ auto SubSecondNanos = std::chrono::duration_cast<std::chrono::nanoseconds>(Duration - Seconds);
+ uint64_t SecondsTicks = (UnixEpochBiasSeconds + static_cast<uint64_t>(Seconds.count())) * TimeSpan::TicksPerSecond;
+ uint64_t SubSecondTicks = static_cast<uint64_t>(SubSecondNanos.count()) / static_cast<uint64_t>(TimeSpan::NanosecondsPerTick);
+ return DateTime{SecondsTicks + SubSecondTicks};
}
void
InProcSessionLogSink::Log(const logging::LogMessage& Msg)
{
- Ref<SessionsService::Session> Session = m_Service.GetSession(m_SessionId);
- if (!Session)
- {
- return;
- }
-
- SessionsService::LogEntry Entry{
- .Timestamp = TimePointToDateTime(Msg.GetTime()),
- .Level = std::string(logging::ToString(Msg.GetLevel())),
- .Message = std::string(Msg.GetPayload()),
- };
-
- Session->AppendLog(std::move(Entry));
+ // Route through the service-level AppendLog so the log-appended
+ // callback fires — otherwise WS subscribers tailing the self-session
+ // don't see in-proc lines until they reload and re-fetch via HTTP.
+ m_Service.AppendLog(m_SessionId,
+ SessionsService::LogEntryInput{
+ .Timestamp = TimePointToDateTime(Msg.GetTime()),
+ .Level = Msg.GetLevel(),
+ .LoggerName = Msg.GetLoggerName(),
+ .Message = Msg.GetPayload(),
+ });
}
} // namespace zen
diff --git a/src/zenserver/sessions/logtemplate.cpp b/src/zenserver/sessions/logtemplate.cpp
new file mode 100644
index 000000000..b4d8f37e8
--- /dev/null
+++ b/src/zenserver/sessions/logtemplate.cpp
@@ -0,0 +1,390 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "logtemplate.h"
+
+#include <zencore/fmtutils.h>
+#include <zencore/guid.h>
+#include <zencore/iohash.h>
+#include <zencore/string.h>
+#include <zencore/uid.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+using namespace std::literals;
+
+namespace {
+
+ // Bounded recursion so pathological nesting (e.g. an object that references
+ // itself through $format) can't stack-overflow the server. Depth counts
+ // every nested template expansion OR value descent.
+ constexpr size_t kMaxRecursionDepth = 16;
+
+ void RenderTemplateInto(std::string_view Template, CbObjectView Fields, StringBuilderBase& Out, bool Localized, size_t Depth);
+ void RenderValue(CbFieldView Field, StringBuilderBase& Out, bool Localized, size_t Depth);
+
+ //////////////////////////////////////////////////////////////////////////
+ //
+ // Path resolution: walk a UE field_path — `name` followed by zero or
+ // more `.name` / `[N]` segments — starting from the fields root. Returns
+ // an empty field on any miss.
+ //
+
+ CbFieldView ResolvePath(CbObjectView Root, std::string_view Path)
+ {
+ CbFieldView Cur;
+ bool Entered = false; // have we applied at least one segment?
+
+ const auto ApplyName = [&](std::string_view Name) -> bool {
+ if (Name.empty())
+ {
+ return false;
+ }
+ if (!Entered)
+ {
+ Cur = Root[Name];
+ }
+ else
+ {
+ if (!Cur.IsObject())
+ {
+ return false;
+ }
+ Cur = Cur.AsObjectView()[Name];
+ }
+ Entered = true;
+ return Cur.operator bool();
+ };
+
+ const auto ApplyIndex = [&](uint64_t Idx) -> bool {
+ if (!Entered || !Cur.IsArray())
+ {
+ return false;
+ }
+ uint64_t N = 0;
+ for (CbFieldView Elem : Cur.AsArrayView().CreateViewIterator())
+ {
+ if (N == Idx)
+ {
+ Cur = Elem;
+ return Cur.operator bool();
+ }
+ ++N;
+ }
+ return false;
+ };
+
+ size_t i = 0;
+ while (i < Path.size())
+ {
+ const char C = Path[i];
+ if (C == '.')
+ {
+ ++i;
+ continue;
+ }
+ if (C == '[')
+ {
+ const size_t End = Path.find(']', i + 1);
+ if (End == std::string_view::npos)
+ {
+ return {};
+ }
+ uint64_t Idx = 0;
+ for (size_t j = i + 1; j < End; ++j)
+ {
+ const char D = Path[j];
+ if (D < '0' || D > '9')
+ {
+ return {};
+ }
+ Idx = Idx * 10 + uint64_t(D - '0');
+ }
+ if (!ApplyIndex(Idx))
+ {
+ return {};
+ }
+ i = End + 1;
+ continue;
+ }
+ // Name segment: run until the next '.' or '['.
+ const size_t NameStart = i;
+ while (i < Path.size() && Path[i] != '.' && Path[i] != '[')
+ {
+ ++i;
+ }
+ if (!ApplyName(Path.substr(NameStart, i - NameStart)))
+ {
+ return {};
+ }
+ }
+ return Entered ? Cur : CbFieldView{};
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ //
+ // Primitive rendering. Uses natural string forms for each CbField type.
+ // Non-string values are emitted without quotes — the caller (the JSON-ish
+ // fallback) adds quotes only around string values.
+ //
+
+ void RenderPrimitive(CbFieldView Field, StringBuilderBase& Out)
+ {
+ if (Field.IsString())
+ {
+ Out << Field.AsString();
+ return;
+ }
+ if (Field.IsBool())
+ {
+ Out << (Field.AsBool() ? "true"sv : "false"sv);
+ return;
+ }
+ if (Field.IsInteger())
+ {
+ Out << Field.AsInt64();
+ return;
+ }
+ if (Field.IsFloat())
+ {
+ // format_to into the builder directly — avoids the std::string
+ // fmt::format would otherwise build just to hand to Append.
+ fmt::format_to(StringBuilderAppender(Out), "{}", Field.AsDouble());
+ return;
+ }
+ if (Field.IsDateTime())
+ {
+ Out.Append(Field.AsDateTime().ToIso8601());
+ return;
+ }
+ if (Field.IsObjectId())
+ {
+ // ToString(char[]) writes the 24-char hex into a caller buffer;
+ // the std::string overload would allocate.
+ char Buf[Oid::StringLength + 1] = {};
+ Field.AsObjectId().ToString(Buf);
+ Out << std::string_view(Buf, Oid::StringLength);
+ return;
+ }
+ if (Field.IsHash())
+ {
+ // Appender overload writes the 40-char hex directly into the
+ // builder; the std::string overload would allocate.
+ Field.AsHash().ToHexString(Out);
+ return;
+ }
+ if (Field.IsUuid())
+ {
+ Guid G = Field.AsUuid();
+ G.ToString(Out);
+ return;
+ }
+ if (Field.IsNull())
+ {
+ Out << "null"sv;
+ return;
+ }
+ // Binary / attachment / custom / unknown → emit nothing rather than
+ // a stream of garbage bytes.
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ //
+ // JSON-ish fallback for bare objects / nested arrays. Compact single-line
+ // with quoted string keys and string values, raw other types. Intended
+ // for debug display — not strictly RFC-8259 JSON.
+ //
+
+ void AppendJsonishString(std::string_view S, StringBuilderBase& Out)
+ {
+ Out << '"';
+ for (char C : S)
+ {
+ switch (C)
+ {
+ case '"':
+ Out << "\\\""sv;
+ break;
+ case '\\':
+ Out << "\\\\"sv;
+ break;
+ case '\n':
+ Out << "\\n"sv;
+ break;
+ case '\r':
+ Out << "\\r"sv;
+ break;
+ case '\t':
+ Out << "\\t"sv;
+ break;
+ default:
+ Out << C;
+ break;
+ }
+ }
+ Out << '"';
+ }
+
+ void AppendJsonishValue(CbFieldView Field, StringBuilderBase& Out, bool Localized, size_t Depth)
+ {
+ if (Field.IsString())
+ {
+ AppendJsonishString(Field.AsString(), Out);
+ return;
+ }
+ // Non-string leaves and nested objects/arrays go through RenderValue
+ // so object short-circuits ($text / $format / ...) still apply.
+ RenderValue(Field, Out, Localized, Depth);
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ //
+ // Value rendering (the decision tree from the plan).
+ //
+
+ void RenderValue(CbFieldView Field, StringBuilderBase& Out, bool Localized, size_t Depth)
+ {
+ if (Depth >= kMaxRecursionDepth)
+ {
+ Out << "…"sv;
+ return;
+ }
+ if (Field.IsObject())
+ {
+ CbObjectView Obj = Field.AsObjectView();
+
+ if (CbFieldView Text = Obj["$text"sv]; Text.IsString())
+ {
+ Out << Text.AsString();
+ return;
+ }
+ if (CbFieldView Format = Obj["$format"sv]; Format.IsString())
+ {
+ RenderTemplateInto(Format.AsString(), Obj, Out, /*Localized=*/false, Depth + 1);
+ return;
+ }
+ if (CbFieldView LocFormat = Obj["$locformat"sv]; LocFormat.IsString())
+ {
+ RenderTemplateInto(LocFormat.AsString(), Obj, Out, /*Localized=*/true, Depth + 1);
+ return;
+ }
+
+ // Bare object — JSON-ish fallback.
+ Out << '{';
+ bool First = true;
+ for (CbFieldView Entry : Obj.CreateViewIterator())
+ {
+ if (!First)
+ {
+ Out << ", "sv;
+ }
+ First = false;
+ AppendJsonishString(Entry.GetName(), Out);
+ Out << ": "sv;
+ AppendJsonishValue(Entry, Out, Localized, Depth + 1);
+ }
+ Out << '}';
+ return;
+ }
+ if (Field.IsArray())
+ {
+ Out << '[';
+ bool First = true;
+ for (CbFieldView Elem : Field.AsArrayView().CreateViewIterator())
+ {
+ if (!First)
+ {
+ Out << ", "sv;
+ }
+ First = false;
+ AppendJsonishValue(Elem, Out, Localized, Depth + 1);
+ }
+ Out << ']';
+ return;
+ }
+ RenderPrimitive(Field, Out);
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ //
+ // Template tokenizer + renderer.
+ //
+
+ void RenderTemplateInto(std::string_view Template, CbObjectView Fields, StringBuilderBase& Out, bool Localized, size_t Depth)
+ {
+ if (Depth >= kMaxRecursionDepth)
+ {
+ Out << "…"sv;
+ return;
+ }
+
+ size_t i = 0;
+ while (i < Template.size())
+ {
+ const char C = Template[i];
+
+ // Localized escape: ` followed by {, }, or ` → literal.
+ if (Localized && C == '`' && i + 1 < Template.size())
+ {
+ const char Next = Template[i + 1];
+ if (Next == '{' || Next == '}' || Next == '`')
+ {
+ Out << Next;
+ i += 2;
+ continue;
+ }
+ }
+
+ // Non-localized escape: {{ or }} → literal { or }.
+ if (!Localized && C == '{' && i + 1 < Template.size() && Template[i + 1] == '{')
+ {
+ Out << '{';
+ i += 2;
+ continue;
+ }
+ if (!Localized && C == '}' && i + 1 < Template.size() && Template[i + 1] == '}')
+ {
+ Out << '}';
+ i += 2;
+ continue;
+ }
+
+ if (C == '{')
+ {
+ // Placeholder: scan until matching '}'.
+ const size_t End = Template.find('}', i + 1);
+ if (End == std::string_view::npos)
+ {
+ // Unterminated placeholder — emit the rest literally so we
+ // don't silently drop data. UE would have asserted at emit.
+ Out << Template.substr(i);
+ return;
+ }
+ const std::string_view Path = Template.substr(i + 1, End - i - 1);
+ const CbFieldView Resolved = ResolvePath(Fields, Path);
+ if (Resolved)
+ {
+ RenderValue(Resolved, Out, Localized, Depth + 1);
+ }
+ // Missing placeholder: emit nothing. (UE asserts at emit time,
+ // so in well-formed input this never fires.)
+ i = End + 1;
+ continue;
+ }
+
+ Out << C;
+ ++i;
+ }
+ }
+
+} // namespace
+
+void
+RenderLogTemplate(std::string_view Template, CbObjectView Fields, StringBuilderBase& Out, bool Localized)
+{
+ RenderTemplateInto(Template, Fields, Out, Localized, 0);
+}
+
+} // namespace zen
diff --git a/src/zenserver/sessions/logtemplate.h b/src/zenserver/sessions/logtemplate.h
new file mode 100644
index 000000000..e8b07e63d
--- /dev/null
+++ b/src/zenserver/sessions/logtemplate.h
@@ -0,0 +1,42 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinary.h>
+#include <zencore/string.h>
+
+#include <string_view>
+
+namespace zen {
+
+/// Render a UE structured-log template (as produced by UE_LOGFMT) against a
+/// fields bag, writing the result into a caller-provided builder. Grammar:
+///
+/// format := (text | escape | placeholder)*
+/// escape := '{{' | '}}' (non-localized, default)
+/// | '`{' | '`}' | '``' (localized, $locformat)
+/// placeholder := '{' field_path '}'
+/// field_path := name ('.' name | '[' digits ']')*
+/// name := [A-Za-z0-9] [A-Za-z0-9_]* (leading '_' reserved)
+///
+/// There are NO inline format specs — `{Name:spec}` is not part of the
+/// grammar. Formatting control lives on the value side via nested
+/// objects carrying `$text` / `$format` / `$locformat` (see the value-
+/// rendering rules in logtemplate.cpp).
+///
+/// Missing paths render as empty (UE asserts these at emit time, so in
+/// practice they don't occur; the empty-render is defensive). Unknown
+/// primitive CbField types render as empty.
+///
+/// Typical use: pass an `ExtendableStringBuilder<256>` so typical messages
+/// render on the stack with no heap allocation. The builder is appended to,
+/// not cleared, so callers can compose multiple writes if they want.
+///
+/// @param Template The format string.
+/// @param Fields The top-level fields bag referenced by placeholders.
+/// @param Out Builder to append the rendered text to.
+/// @param Localized True for $locformat templates (backtick escapes);
+/// false (default) for the top-level `format` field.
+void RenderLogTemplate(std::string_view Template, CbObjectView Fields, StringBuilderBase& Out, bool Localized = false);
+
+} // namespace zen
diff --git a/src/zenserver/sessions/sessions.cpp b/src/zenserver/sessions/sessions.cpp
index 9d4e3120c..470117c6a 100644
--- a/src/zenserver/sessions/sessions.cpp
+++ b/src/zenserver/sessions/sessions.cpp
@@ -3,59 +3,733 @@
#include "sessions.h"
#include <zencore/basicfile.h>
+#include <zencore/compactbinarybuilder.h>
#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
#include <zencore/logging.h>
+#include <mutex>
+
namespace zen {
using namespace std::literals;
+namespace {
+
+ // Per-session log file layout:
+ // [LogFileHeader][Record]*
+ // where Record = [uint32_t ByteLength][CbObject bytes of ByteLength].
+ // Records are written in order. A partial trailing record (short write after
+ // crash) is ignored on load.
+ constexpr uint32_t kLogFileMagic = 0x5A534C47u; // 'ZSLG'
+ constexpr uint32_t kLogFileVersion = 1;
+
+#pragma pack(push, 1)
+ struct LogFileHeader
+ {
+ uint32_t Magic;
+ uint32_t Version;
+ };
+#pragma pack(pop)
+
+ constexpr uint64_t kLogFileHeaderSize = sizeof(LogFileHeader);
+
+ void WriteLogEntryFields(CbObjectWriter& Writer, const SessionsService::LogEntryInput& Entry)
+ {
+ Writer << "ts" << Entry.Timestamp;
+ if (Entry.Level != logging::Off)
+ {
+ // Store as a small integer (int8_t range) rather than a string —
+ // fixed-width, one byte in the serialized CbObject.
+ Writer << "lvl" << static_cast<int32_t>(Entry.Level);
+ }
+ if (!Entry.LoggerName.empty())
+ {
+ Writer << "cat" << Entry.LoggerName;
+ }
+ if (!Entry.Message.empty())
+ {
+ Writer << "msg" << Entry.Message;
+ }
+ // Structured-log template + fields. Only present for UE_LOGFMT-shaped
+ // entries; Message already holds the rendered text for those so a
+ // reader that ignores these two can still display the line.
+ if (!Entry.Format.empty())
+ {
+ Writer << "fmt" << Entry.Format;
+ if (Entry.Fields.GetSize() > 0)
+ {
+ Writer.AddObject("flds"sv, Entry.Fields);
+ }
+ }
+ }
+
+ // Parse a serialized record into an input form. The string_views in
+ // the result borrow from `Obj`'s underlying buffer; the caller must
+ // keep that buffer alive (or arena-copy via PreloadEntries) before
+ // the views are used.
+ bool ReadLogEntry(CbObjectView Obj, SessionsService::LogEntryInput& OutInput)
+ {
+ CbFieldView TsField = Obj["ts"sv];
+ if (!TsField)
+ {
+ return false;
+ }
+ OutInput.Timestamp = TsField.AsDateTime();
+
+ // New format: integer. Legacy format (pre-refactor log.bin files on
+ // this same branch): string. Accept either so existing persisted
+ // entries keep their level when loaded.
+ CbFieldView LvlField = Obj["lvl"sv];
+ if (LvlField.IsInteger())
+ {
+ const int32_t Lvl = LvlField.AsInt32();
+ if (Lvl >= 0 && Lvl < logging::LogLevelCount)
+ {
+ OutInput.Level = static_cast<logging::LogLevel>(Lvl);
+ }
+ }
+ else if (LvlField.IsString())
+ {
+ OutInput.Level = logging::ParseLogLevelString(LvlField.AsString());
+ }
+
+ OutInput.LoggerName = Obj["cat"sv].AsString();
+ OutInput.Message = Obj["msg"sv].AsString();
+ OutInput.Format = Obj["fmt"sv].AsString();
+ if (CbObjectView FieldsView = Obj["flds"sv].AsObjectView(); FieldsView.GetSize() > 0)
+ {
+ OutInput.Fields = CbObject::Clone(FieldsView);
+ }
+ return true;
+ }
+
+ void WriteSessionInfoFields(CbObjectWriter& Writer, const SessionsService::SessionInfo& Info)
+ {
+ Writer << "id" << Info.Id;
+ if (!Info.AppName.empty())
+ {
+ Writer << "app" << Info.AppName;
+ }
+ if (!Info.Mode.empty())
+ {
+ Writer << "mode" << Info.Mode;
+ }
+ if (!Info.Platform.empty())
+ {
+ Writer << "plat" << Info.Platform;
+ }
+ if (Info.ClientPid != 0)
+ {
+ Writer << "pid" << Info.ClientPid;
+ }
+ if (Info.ParentSessionId != Oid::Zero)
+ {
+ Writer << "parent_session_id" << Info.ParentSessionId;
+ }
+ if (Info.JobId != Oid::Zero)
+ {
+ Writer << "jobid" << Info.JobId;
+ }
+ Writer << "ca" << Info.CreatedAt;
+ Writer << "ua" << Info.UpdatedAt;
+ if (Info.EndedAt.GetTicks() != 0)
+ {
+ Writer << "ea" << Info.EndedAt;
+ }
+ if (Info.Metadata.GetSize() > 0)
+ {
+ Writer.AddObject("meta"sv, Info.Metadata);
+ }
+ }
+
+ bool ReadSessionInfo(CbObjectView Obj, SessionsService::SessionInfo& OutInfo)
+ {
+ OutInfo.Id = Obj["id"sv].AsObjectId();
+ if (OutInfo.Id == Oid::Zero)
+ {
+ return false;
+ }
+ OutInfo.AppName = std::string(Obj["app"sv].AsString());
+ OutInfo.Mode = std::string(Obj["mode"sv].AsString());
+ OutInfo.Platform = std::string(Obj["plat"sv].AsString());
+ OutInfo.ClientPid = Obj["pid"sv].AsUInt32();
+ OutInfo.ParentSessionId = Obj["parent_session_id"sv].AsObjectId();
+ OutInfo.JobId = Obj["jobid"sv].AsObjectId();
+ OutInfo.CreatedAt = Obj["ca"sv].AsDateTime();
+ OutInfo.UpdatedAt = Obj["ua"sv].AsDateTime();
+ OutInfo.EndedAt = Obj["ea"sv].AsDateTime(DateTime{0});
+ CbObjectView MetaView = Obj["meta"sv].AsObjectView();
+ if (MetaView.GetSize() > 0)
+ {
+ OutInfo.Metadata = CbObject::Clone(MetaView);
+ }
+ return true;
+ }
+
+ std::filesystem::path SessionDir(const std::filesystem::path& Root, const Oid& Id) { return Root / Id.ToString(); }
+
+#if ZEN_PLATFORM_WINDOWS
+ // Turn a Windows process exit code into a human-friendly termination
+ // reason. Most abnormal terminations surface as NTSTATUS values (high
+ // bit set); the ones below are what you'll actually encounter in the
+ // wild. Anything we don't recognize falls through to a formatted hex
+ // (NTSTATUS-shaped) or decimal (application-level) exit code.
+ std::string DescribeWindowsExitCode(uint32_t ExitCode)
+ {
+ struct Named
+ {
+ uint32_t Code;
+ std::string_view Name;
+ };
+ using namespace std::literals;
+ static constexpr Named kKnown[] = {
+ {0xC000013Au, "interrupted (Ctrl-C)"sv},
+ {0xC0000005u, "access violation"sv},
+ {0xC000001Du, "illegal instruction"sv},
+ {0xC0000094u, "integer divide by zero"sv},
+ {0xC0000096u, "privileged instruction"sv},
+ {0xC00000FDu, "stack overflow"sv},
+ {0xC0000409u, "stack buffer overrun"sv},
+ {0xC0000374u, "heap corruption"sv},
+ {0xC0000135u, "DLL not found"sv},
+ {0xC0000142u, "DLL initialization failed"sv},
+ {0xC000007Bu, "invalid image format"sv},
+ {0xC0000420u, "assertion failure"sv},
+ {0xC0000008u, "invalid handle"sv},
+ {0xC000008Eu, "float divide by zero"sv},
+ {0xC0000091u, "float overflow"sv},
+ {0xC0000093u, "float underflow"sv},
+ {0x80000003u, "breakpoint"sv},
+ {0x40000015u, "fatal app exit"sv},
+ };
+ for (const Named& Entry : kKnown)
+ {
+ if (Entry.Code == ExitCode)
+ {
+ return fmt::format("process exited ({}, exit code 0x{:08X})", Entry.Name, ExitCode);
+ }
+ }
+ // NTSTATUS-shaped codes have the high bit set; show them as hex so
+ // they're recognizable (and matchable against Microsoft's doc tables).
+ if ((ExitCode & 0x80000000u) != 0)
+ {
+ return fmt::format("process exited (exit code 0x{:08X})", ExitCode);
+ }
+ return fmt::format("process exited (exit code {})", ExitCode);
+ }
+#endif
+
+} // namespace
+
class SessionLog : public TRefCounted<SessionLog>
{
public:
- SessionLog(std::filesystem::path LogFilePath) { m_LogFile.Open(LogFilePath, BasicFile::Mode::kWrite); }
+ explicit SessionLog(std::filesystem::path LogFilePath);
+
+ void Append(const SessionsService::LogEntryInput& Entry);
+
+ // LoadTail returns input-form entries: their string_views borrow
+ // from m_OwnedBuffers (held internally) so the caller's PreloadEntries
+ // can intern/copy them into the session arena before the buffers are
+ // dropped at LoadResult destruction.
+ struct LoadResult
+ {
+ std::vector<SessionsService::LogEntryInput> TailEntries;
+ // Backing memory for the views in TailEntries. Each ParsedRecord
+ // keeps a CbObject alive whose payload bytes back the strings.
+ std::vector<CbObject> OwnedBuffers;
+ uint64_t TotalCount = 0;
+ };
+ LoadResult LoadTail(size_t MaxEntries);
private:
- BasicFile m_LogFile;
+ static LoggerRef Log()
+ {
+ static LoggerRef L(logging::Get("sessions"));
+ return L;
+ }
+
+ std::filesystem::path m_Path;
+ std::mutex m_Mutex;
+ BasicFile m_File;
+ uint64_t m_WriteOffset = 0;
+ bool m_Enabled = false;
};
-class SessionLogStore
+SessionLog::SessionLog(std::filesystem::path LogFilePath) : m_Path(std::move(LogFilePath))
{
-public:
- SessionLogStore(std::filesystem::path StoragePath) : m_StoragePath(std::move(StoragePath)) {}
+ std::error_code Ec;
+ std::filesystem::create_directories(m_Path.parent_path(), Ec);
- ~SessionLogStore() = default;
+ m_File.Open(m_Path, BasicFile::Mode::kWrite, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Session log '{}' could not be opened: {} - persistence disabled", m_Path, Ec.message());
+ return;
+ }
- Ref<SessionLog> GetLogForSession(const Oid& SessionId)
+ const uint64_t Size = m_File.FileSize(Ec);
+ if (Ec)
{
- // For now, just return a new log for each session. We can implement actual log storage and retrieval later.
- return Ref(new SessionLog(m_StoragePath / (SessionId.ToString() + ".log")));
+ m_File.Close();
+ ZEN_WARN("Session log '{}' could not be sized: {} - persistence disabled", m_Path, Ec.message());
+ return;
}
- Ref<SessionLog> CreateLogForSession(const Oid& SessionId)
+ LogFileHeader Header{};
+ bool NeedsInit = Size < kLogFileHeaderSize;
+ if (!NeedsInit)
+ {
+ // Read is throwing-only; guard so a read failure doesn't escape.
+ try
+ {
+ m_File.Read(&Header, sizeof(Header), 0);
+ }
+ catch (const std::exception& E)
+ {
+ ZEN_WARN("Session log '{}' header read failed: {} - reinitializing", m_Path, E.what());
+ NeedsInit = true;
+ }
+ if (!NeedsInit && (Header.Magic != kLogFileMagic || Header.Version != kLogFileVersion))
+ {
+ NeedsInit = true;
+ }
+ }
+
+ if (NeedsInit)
+ {
+ m_File.SetFileSize(0);
+ Header = LogFileHeader{.Magic = kLogFileMagic, .Version = kLogFileVersion};
+ m_File.Write(&Header, sizeof(Header), 0, Ec);
+ if (Ec)
+ {
+ m_File.Close();
+ ZEN_WARN("Session log '{}' header write failed: {} - persistence disabled", m_Path, Ec.message());
+ return;
+ }
+ m_WriteOffset = kLogFileHeaderSize;
+ }
+ else
+ {
+ m_WriteOffset = Size;
+ }
+
+ m_Enabled = true;
+}
+
+void
+SessionLog::Append(const SessionsService::LogEntryInput& Entry)
+{
+ if (!m_Enabled)
{
- // For now, just return a new log for each session. We can implement actual log storage and retrieval later.
- return Ref(new SessionLog(m_StoragePath / (SessionId.ToString() + ".log")));
+ return;
}
+ CbObjectWriter Writer;
+ WriteLogEntryFields(Writer, Entry);
+ CbObject Obj = Writer.Save();
+
+ // Write directly from the CbObject's owned buffer — no need to allocate
+ // a fresh UniqueBuffer and memcpy just to hand the bytes to BasicFile::Write.
+ const MemoryView View = Obj.GetView();
+ const uint64_t ObjSize = View.GetSize();
+ if (ObjSize == 0 || ObjSize > std::numeric_limits<uint32_t>::max())
+ {
+ return;
+ }
+ const uint32_t Len = static_cast<uint32_t>(ObjSize);
+
+ std::lock_guard<std::mutex> Lock(m_Mutex);
+ std::error_code Ec;
+ m_File.Write(&Len, sizeof(Len), m_WriteOffset, Ec);
+ if (Ec)
+ {
+ return;
+ }
+ m_File.Write(View.GetData(), ObjSize, m_WriteOffset + sizeof(Len), Ec);
+ if (Ec)
+ {
+ return;
+ }
+ m_WriteOffset += sizeof(Len) + ObjSize;
+}
+
+SessionLog::LoadResult
+SessionLog::LoadTail(size_t MaxEntries)
+{
+ std::lock_guard<std::mutex> Lock(m_Mutex);
+ LoadResult Result;
+
+ if (!m_Enabled)
+ {
+ return Result;
+ }
+
+ std::error_code Ec;
+ const uint64_t Size = m_File.FileSize(Ec);
+ if (Ec || Size <= kLogFileHeaderSize)
+ {
+ return Result;
+ }
+
+ IoBuffer Buffer;
+ try
+ {
+ Buffer = m_File.ReadRange(kLogFileHeaderSize, Size - kLogFileHeaderSize);
+ }
+ catch (const std::exception& E)
+ {
+ ZEN_WARN("Session log '{}' tail read failed: {}", m_Path, E.what());
+ return Result;
+ }
+ const uint8_t* Data = reinterpret_cast<const uint8_t*>(Buffer.GetData());
+ const uint64_t DataSize = Buffer.GetSize();
+
+ // Walk all valid record positions, ignoring any partial trailing record.
+ struct RecRef
+ {
+ const uint8_t* Ptr;
+ uint32_t Len;
+ };
+ std::vector<RecRef> Records;
+ uint64_t Pos = 0;
+ while (Pos + sizeof(uint32_t) <= DataSize)
+ {
+ uint32_t RecLen = 0;
+ std::memcpy(&RecLen, Data + Pos, sizeof(RecLen));
+ const uint64_t Next = Pos + sizeof(uint32_t) + RecLen;
+ if (RecLen == 0 || Next > DataSize)
+ {
+ break;
+ }
+ Records.push_back(RecRef{.Ptr = Data + Pos + sizeof(uint32_t), .Len = RecLen});
+ Pos = Next;
+ }
+
+ // Parse every record so TotalCount reflects how many actually decode
+ // — cursors anchor to this number, and counting CB-corrupt records
+ // would shift subsequent cursor math off. Only the trailing window
+ // of size MaxEntries is materialized into TailEntries; head records
+ // are parsed purely for the count and discarded.
+ const size_t TailStart = Records.size() > MaxEntries ? Records.size() - MaxEntries : 0;
+ Result.TailEntries.reserve(Records.size() - TailStart);
+ Result.OwnedBuffers.reserve(Records.size() - TailStart);
+
+ for (size_t i = 0; i < Records.size(); ++i)
+ {
+ try
+ {
+ IoBuffer RecBuf = IoBufferBuilder::MakeCloneFromMemory(MemoryView(Records[i].Ptr, Records[i].Len));
+ CbObject Obj = LoadCompactBinaryObject(std::move(RecBuf));
+
+ SessionsService::LogEntryInput Input;
+ if (ReadLogEntry(Obj, Input))
+ {
+ ++Result.TotalCount;
+ if (i >= TailStart)
+ {
+ // Keep the CbObject alive for as long as the views
+ // in the input are needed — PreloadEntries will copy
+ // the strings into the session arena and we drop the
+ // buffer set when LoadResult is destroyed.
+ Result.TailEntries.push_back(std::move(Input));
+ Result.OwnedBuffers.push_back(std::move(Obj));
+ }
+ }
+ }
+ catch (const std::exception&)
+ {
+ // Skip malformed record — does not contribute to TotalCount.
+ }
+ }
+
+ return Result;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+class SessionLogStore
+{
+public:
+ explicit SessionLogStore(std::filesystem::path StoragePath);
+
+ Ref<SessionLog> GetOrCreateLogForSession(const Oid& SessionId);
+ void WriteSessionInfoFile(const SessionsService::SessionInfo& Info);
+ void DeleteSession(const Oid& SessionId);
+ uint64_t GetSessionSize(const Oid& SessionId) const;
+
+ struct PersistedSession
+ {
+ SessionsService::SessionInfo Info;
+ std::filesystem::path LogPath;
+ };
+ std::vector<PersistedSession> Scan() const;
+
private:
+ static LoggerRef Log()
+ {
+ static LoggerRef L(logging::Get("sessions"));
+ return L;
+ }
+
std::filesystem::path m_StoragePath;
};
-SessionsService::Session::Session(const SessionInfo& Info) : m_Info(Info)
+SessionLogStore::SessionLogStore(std::filesystem::path StoragePath) : m_StoragePath(std::move(StoragePath))
{
+ std::error_code Ec;
+ std::filesystem::create_directories(m_StoragePath, Ec);
+}
+
+Ref<SessionLog>
+SessionLogStore::GetOrCreateLogForSession(const Oid& SessionId)
+{
+ const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId);
+ std::error_code Ec;
+ std::filesystem::create_directories(Dir, Ec);
+ return Ref(new SessionLog(Dir / "log.bin"));
+}
+
+void
+SessionLogStore::DeleteSession(const Oid& SessionId)
+{
+ const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId);
+ std::error_code Ec;
+ std::filesystem::remove_all(Dir, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to remove session directory '{}': {}", Dir, Ec.message());
+ }
+}
+
+uint64_t
+SessionLogStore::GetSessionSize(const Oid& SessionId) const
+{
+ const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId);
+ std::error_code Ec;
+ uint64_t Total = 0;
+ std::filesystem::directory_iterator It{Dir, Ec};
+ if (Ec)
+ {
+ return 0;
+ }
+ for (const std::filesystem::directory_entry& Entry : It)
+ {
+ std::error_code FileEc;
+ if (Entry.is_regular_file(FileEc))
+ {
+ const uintmax_t Size = Entry.file_size(FileEc);
+ if (!FileEc)
+ {
+ Total += uint64_t(Size);
+ }
+ }
+ }
+ return Total;
}
-SessionsService::Session::~Session() = default;
void
-SessionsService::Session::AppendLog(LogEntry Entry)
+SessionLogStore::WriteSessionInfoFile(const SessionsService::SessionInfo& Info)
{
+ const std::filesystem::path Dir = SessionDir(m_StoragePath, Info.Id);
+ std::error_code Ec;
+ std::filesystem::create_directories(Dir, Ec);
+
+ CbObjectWriter Writer;
+ WriteSessionInfoFields(Writer, Info);
+ CbObject Obj = Writer.Save();
+ const MemoryView View = Obj.GetView();
+ if (View.GetSize() == 0)
+ {
+ return;
+ }
+ TemporaryFile::SafeWriteFile(Dir / "info.cb", View, Ec);
+}
+
+std::vector<SessionLogStore::PersistedSession>
+SessionLogStore::Scan() const
+{
+ std::vector<PersistedSession> Result;
+ std::error_code Ec;
+
+ if (!std::filesystem::exists(m_StoragePath, Ec))
+ {
+ return Result;
+ }
+
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator{m_StoragePath, Ec})
+ {
+ if (Ec || !Entry.is_directory(Ec))
+ {
+ continue;
+ }
+
+ const std::filesystem::path InfoPath = Entry.path() / "info.cb";
+ const std::filesystem::path LogPath = Entry.path() / "log.bin";
+
+ if (!std::filesystem::exists(InfoPath, Ec))
+ {
+ continue;
+ }
+
+ try
+ {
+ BasicFile InfoFile;
+ std::error_code OpenEc;
+ InfoFile.Open(InfoPath, BasicFile::Mode::kRead, OpenEc);
+ if (OpenEc)
+ {
+ continue;
+ }
+ IoBuffer InfoBuf = InfoFile.ReadAll();
+ if (InfoBuf.GetSize() == 0)
+ {
+ continue;
+ }
+ CbObject InfoObj = LoadCompactBinaryObject(std::move(InfoBuf));
+ PersistedSession PS{.Info = SessionsService::SessionInfo{
+ .Id = Oid::Zero,
+ .CreatedAt = DateTime{0},
+ .UpdatedAt = DateTime{0},
+ }};
+ if (!ReadSessionInfo(InfoObj, PS.Info))
+ {
+ continue;
+ }
+ PS.LogPath = LogPath;
+ Result.push_back(std::move(PS));
+ }
+ catch (const std::exception& E)
+ {
+ ZEN_WARN("Skipping session directory '{}': {}", Entry.path(), E.what());
+ }
+ }
+
+ return Result;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+SessionsService::Session::Session(const SessionInfo& Info, Ref<SessionLog> Log, ProcessHandle ClientProcess)
+: m_Info(Info)
+, m_Log(std::move(Log))
+, m_ClientProcess(std::move(ClientProcess))
+{
+}
+SessionsService::Session::~Session() = default;
+
+const char*
+SessionsService::Session::InternLoggerNameLocked(std::string_view Name)
+{
+ if (Name.empty())
+ {
+ return "";
+ }
+ if (auto It = m_InternedLoggerNames.find(Name); It != m_InternedLoggerNames.end())
+ {
+ return It->second;
+ }
+ const char* Arena = m_LogArena.DuplicateString(Name);
+ // The map's key view borrows from Arena (which lives as long as the
+ // session does), so it's safe to outlive `Name`.
+ m_InternedLoggerNames.emplace(std::string_view{Arena, Name.size()}, Arena);
+ return Arena;
+}
+
+const char*
+SessionsService::Session::AllocateLogStringLocked(std::string_view Str)
+{
+ if (Str.empty())
+ {
+ return "";
+ }
+ return m_LogArena.DuplicateString(Str);
+}
+
+uint64_t
+SessionsService::Session::AppendLog(LogEntryInput Input)
+{
+ // Persist first (outside the deque lock ordering) so disk I/O doesn't
+ // starve cursor readers holding the shared lock. The SessionLog has its
+ // own internal mutex that serializes file writes.
+ if (m_Log)
+ {
+ m_Log->Append(Input);
+ }
+
RwLock::ExclusiveLockScope Lock(m_LogLock);
- m_LogEntries.push_back(std::move(Entry));
- ++m_TotalAppended;
+ m_LogEntries.emplace_back();
+ LogEntry& Entry = m_LogEntries.back();
+ Entry.Timestamp = Input.Timestamp;
+ Entry.Level = Input.Level;
+ Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName);
+ Entry.Message = AllocateLogStringLocked(Input.Message);
+ Entry.Format = AllocateLogStringLocked(Input.Format);
+ Entry.Fields = std::move(Input.Fields);
+ const uint64_t NewCursor = ++m_TotalAppended;
while (m_LogEntries.size() > MaxLogEntries)
{
m_LogEntries.pop_front();
}
+ return NewCursor;
+}
+
+uint64_t
+SessionsService::Session::AppendLogBatch(std::span<LogEntryInput> Inputs)
+{
+ if (Inputs.empty())
+ {
+ return 0;
+ }
+
+ // Persist first (per-entry; SessionLog's internal mutex serializes
+ // these writes). We do this outside m_LogLock so file I/O doesn't
+ // stall cursor readers.
+ if (m_Log)
+ {
+ for (LogEntryInput& Input : Inputs)
+ {
+ m_Log->Append(Input);
+ }
+ }
+
+ RwLock::ExclusiveLockScope Lock(m_LogLock);
+ for (LogEntryInput& Input : Inputs)
+ {
+ m_LogEntries.emplace_back();
+ LogEntry& Entry = m_LogEntries.back();
+ Entry.Timestamp = Input.Timestamp;
+ Entry.Level = Input.Level;
+ Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName);
+ Entry.Message = AllocateLogStringLocked(Input.Message);
+ Entry.Format = AllocateLogStringLocked(Input.Format);
+ Entry.Fields = std::move(Input.Fields);
+ ++m_TotalAppended;
+ }
+ while (m_LogEntries.size() > MaxLogEntries)
+ {
+ m_LogEntries.pop_front();
+ }
+ return m_TotalAppended;
+}
+
+void
+SessionsService::Session::PreloadEntries(std::span<const LogEntryInput> Tail, uint64_t TotalCount)
+{
+ RwLock::ExclusiveLockScope Lock(m_LogLock);
+ m_LogEntries.clear();
+ for (const LogEntryInput& Input : Tail)
+ {
+ m_LogEntries.emplace_back();
+ LogEntry& Entry = m_LogEntries.back();
+ Entry.Timestamp = Input.Timestamp;
+ Entry.Level = Input.Level;
+ Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName);
+ Entry.Message = AllocateLogStringLocked(Input.Message);
+ Entry.Format = AllocateLogStringLocked(Input.Format);
+ Entry.Fields = Input.Fields;
+ }
+ m_TotalAppended = TotalCount;
}
std::vector<SessionsService::LogEntry>
@@ -118,17 +792,134 @@ SessionsService::Session::GetLogEntriesAfter(uint64_t AfterCursor) const
};
}
+uint64_t
+SessionsService::AppendLog(const Oid& SessionId, LogEntryInput Input)
+{
+ // Resolve the session without holding any external lock — GetSession
+ // acquires m_Lock shared briefly and returns a ref-counted handle.
+ Ref<Session> Target = GetSession(SessionId);
+ if (!Target)
+ {
+ return 0;
+ }
+
+ const uint64_t NewCursor = Target->AppendLog(std::move(Input));
+
+ // Fire after Session::m_LogLock is released (inside AppendLog) so the
+ // callback can safely call back into this service (e.g. to resolve
+ // the session again and fetch the delta for its subscribers) without
+ // any nested-lock concerns.
+ if (m_LogAppendedCallback)
+ {
+ m_LogAppendedCallback(SessionId, NewCursor);
+ }
+ return NewCursor;
+}
+
+uint64_t
+SessionsService::AppendLogBatch(const Oid& SessionId, std::span<LogEntryInput> Inputs)
+{
+ if (Inputs.empty())
+ {
+ return 0;
+ }
+ Ref<Session> Target = GetSession(SessionId);
+ if (!Target)
+ {
+ return 0;
+ }
+
+ const uint64_t NewCursor = Target->AppendLogBatch(Inputs);
+
+ // One callback fires for the whole batch — subscribers see all
+ // entries in a single delta rather than N separate deltas. Fired
+ // after Session::m_LogLock is released for the same reason as the
+ // single-entry path.
+ if (m_LogAppendedCallback)
+ {
+ m_LogAppendedCallback(SessionId, NewCursor);
+ }
+ return NewCursor;
+}
+
+void
+SessionsService::SetLogAppendedCallback(LogAppendedCallback Callback)
+{
+ m_LogAppendedCallback = std::move(Callback);
+}
+
//////////////////////////////////////////////////////////////////////////
-SessionsService::SessionsService() : m_Log(logging::Get("sessions"))
+SessionsService::SessionsService(std::filesystem::path StorageRoot) : m_Log(logging::Get("sessions"))
{
+ if (StorageRoot.empty())
+ {
+ return;
+ }
+
+ m_SessionLogs = std::make_unique<SessionLogStore>(StorageRoot);
+
+ // Load all previously-persisted sessions as ended sessions. Their log
+ // tails are preloaded into the in-memory deque so the UI can view them.
+ std::vector<SessionLogStore::PersistedSession> Persisted = m_SessionLogs->Scan();
+ for (SessionLogStore::PersistedSession& PS : Persisted)
+ {
+ // Sessions that were active at shutdown need a synthetic ended time so
+ // they sort correctly in the UI.
+ if (PS.Info.EndedAt.GetTicks() == 0)
+ {
+ PS.Info.EndedAt = PS.Info.UpdatedAt;
+ }
+
+ Ref<Session> S = Ref(new Session(PS.Info));
+
+ // Load the log tail (no SessionLog attached — historical sessions do
+ // not receive new appends and the file handle is released here).
+ // PreloadEntries copies/interns each input's strings into the
+ // session's arena, after which Loaded.OwnedBuffers can be
+ // released along with the rest of the LoadResult.
+ SessionLog Reader(PS.LogPath);
+ SessionLog::LoadResult Loaded = Reader.LoadTail(Session::MaxLogEntries);
+ S->PreloadEntries(Loaded.TailEntries, Loaded.TotalCount);
+
+ m_EndedSessions.push_back(std::move(S));
+ }
+
+ ZEN_INFO("Sessions service loaded {} persisted session(s) from '{}'", m_EndedSessions.size(), StorageRoot);
}
SessionsService::~SessionsService() = default;
bool
-SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, const Oid& JobId, CbObjectView Metadata)
+SessionsService::RegisterSession(const Oid& SessionId,
+ std::string AppName,
+ std::string Mode,
+ std::string Platform,
+ uint32_t ClientPid,
+ const Oid& ParentSessionId,
+ const Oid& JobId,
+ CbObjectView Metadata)
{
+ // Open a process handle eagerly — BEFORE any pid-reuse window opens. On
+ // Windows the handle is tied to the specific process instance, so a
+ // later pid recycle can't fool later liveness checks. Do the syscall
+ // outside the service lock (it's a kernel round-trip). On POSIX this is
+ // effectively a no-op (just stores the pid).
+ ProcessHandle ClientProcess;
+ if (ClientPid != 0)
+ {
+ std::error_code Ec;
+ ClientProcess.Initialize(static_cast<int>(ClientPid), Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Session {} registered with pid {} but OpenProcess failed: {} — liveness tracking disabled",
+ SessionId,
+ ClientPid,
+ Ec.message());
+ }
+ }
+
+ SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}};
// Log outside the lock scope - InProcSessionLogSink calls back into
// GetSession() which acquires m_Lock shared, so logging while holding
// m_Lock exclusively would deadlock.
@@ -141,35 +932,69 @@ SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, std:
}
const DateTime Now = DateTime::Now();
- m_Sessions.emplace(SessionId,
- Ref(new Session(SessionInfo{.Id = SessionId,
- .AppName = AppName,
- .Mode = Mode,
- .JobId = JobId,
- .Metadata = CbObject::Clone(Metadata),
- .CreatedAt = Now,
- .UpdatedAt = Now})));
+ SessionInfo Info{.Id = SessionId,
+ .AppName = AppName,
+ .Mode = Mode,
+ .Platform = Platform,
+ .ClientPid = ClientPid,
+ .ParentSessionId = ParentSessionId,
+ .JobId = JobId,
+ .Metadata = CbObject::Clone(Metadata),
+ .CreatedAt = Now,
+ .UpdatedAt = Now};
+
+ Ref<SessionLog> Log;
+ if (m_SessionLogs)
+ {
+ Log = m_SessionLogs->GetOrCreateLogForSession(SessionId);
+ }
+ m_Sessions.emplace(SessionId, Ref(new Session(Info, std::move(Log), std::move(ClientProcess))));
+ PersistedInfo = std::move(Info);
+ }
+
+ if (m_SessionLogs)
+ {
+ m_SessionLogs->WriteSessionInfoFile(PersistedInfo);
}
- ZEN_INFO("Session {} registered (AppName: {}, Mode: {}, JobId: {})", SessionId, AppName, Mode, JobId);
+ // Include the tracked pid so the log makes it obvious whether the client
+ // opted into liveness tracking (and whether our OpenProcess succeeded).
+ // "pid: 0" = no liveness tracking (remote or client didn't report);
+ // "pid: N" with an immediately-prior warning = OpenProcess failed.
+ ZEN_INFO("Session {} registered (AppName: {}, Mode: {}, Platform: {}, Pid: {}, ParentSessionId: {}, JobId: {})",
+ SessionId,
+ AppName,
+ Mode,
+ Platform,
+ ClientPid,
+ ParentSessionId,
+ JobId);
return true;
}
bool
SessionsService::UpdateSession(const Oid& SessionId, CbObjectView Metadata)
{
- RwLock::ExclusiveLockScope Lock(m_Lock);
-
- auto It = m_Sessions.find(SessionId);
- if (It == m_Sessions.end())
+ SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}};
{
- return false;
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+
+ auto It = m_Sessions.find(SessionId);
+ if (It == m_Sessions.end())
+ {
+ return false;
+ }
+
+ It.value()->UpdateMetadata(Metadata);
+ PersistedInfo = It.value()->Info();
}
- It.value()->UpdateMetadata(Metadata);
+ if (m_SessionLogs)
+ {
+ m_SessionLogs->WriteSessionInfoFile(PersistedInfo);
+ }
- const SessionInfo& Info = It.value()->Info();
- ZEN_DEBUG("Session {} updated (AppName: {}, JobId: {})", SessionId, Info.AppName, Info.JobId);
+ ZEN_DEBUG("Session {} updated (AppName: {}, JobId: {})", SessionId, PersistedInfo.AppName, PersistedInfo.JobId);
return true;
}
@@ -178,13 +1003,23 @@ SessionsService::GetSession(const Oid& SessionId) const
{
RwLock::SharedLockScope Lock(m_Lock);
- auto It = m_Sessions.find(SessionId);
- if (It == m_Sessions.end())
+ if (auto It = m_Sessions.find(SessionId); It != m_Sessions.end())
{
- return {};
+ return It->second;
+ }
+
+ // Fall back to ended sessions so HTTP consumers can fetch logs/metadata
+ // for sessions that have finished (including sessions loaded from disk on
+ // startup as historical ended sessions).
+ for (const Ref<Session>& Ended : m_EndedSessions)
+ {
+ if (Ended->Info().Id == SessionId)
+ {
+ return Ended;
+ }
}
- return It->second;
+ return {};
}
std::vector<Ref<SessionsService::Session>>
@@ -202,10 +1037,14 @@ SessionsService::GetSessions() const
}
bool
-SessionsService::RemoveSession(const Oid& SessionId)
+SessionsService::RemoveSession(const Oid& SessionId, std::string_view Reason)
{
- std::string RemovedAppName;
- Oid RemovedJobId;
+ std::string RemovedAppName;
+ Oid RemovedJobId;
+ SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}};
+ bool Persist = false;
+ Ref<Session> Ended;
+ const DateTime EndTime = DateTime::Now();
{
RwLock::ExclusiveLockScope Lock(m_Lock);
@@ -219,13 +1058,50 @@ SessionsService::RemoveSession(const Oid& SessionId)
RemovedAppName = It.value()->Info().AppName;
RemovedJobId = It.value()->Info().JobId;
- Ref<Session> Ended = It.value();
- Ended->SetEndedAt(DateTime::Now());
- m_EndedSessions.push_back(std::move(Ended));
+ Ended = It.value();
+ Ended->SetEndedAt(EndTime);
+ if (m_SessionLogs)
+ {
+ PersistedInfo = Ended->Info();
+ Persist = true;
+ }
+ m_EndedSessions.push_back(Ended);
m_Sessions.erase(It);
}
+ // Synthetic "Session ended" entry is appended *after* m_Lock is
+ // released. AppendLog hits disk via SessionLog::Append, so doing it
+ // inside the exclusive scope would block every other session lookup
+ // / registration / list while the I/O completes. The callback that
+ // pushes the delta to WS subscribers also fires from outside the
+ // lock — same self-deadlock concern as the GetSession path.
+ uint64_t SyntheticEndCursor = 0;
+ {
+ ExtendableStringBuilder<128> MsgBuilder;
+ MsgBuilder << "Session ended"sv;
+ if (!Reason.empty())
+ {
+ MsgBuilder << ": "sv << Reason;
+ }
+ SyntheticEndCursor = Ended->AppendLog(LogEntryInput{
+ .Timestamp = EndTime,
+ .Level = logging::Info,
+ .LoggerName = "sessions"sv,
+ .Message = MsgBuilder.ToView(),
+ });
+ }
+
+ if (m_LogAppendedCallback)
+ {
+ m_LogAppendedCallback(SessionId, SyntheticEndCursor);
+ }
+
+ if (Persist)
+ {
+ m_SessionLogs->WriteSessionInfoFile(PersistedInfo);
+ }
+
ZEN_INFO("Session {} removed (AppName: {}, JobId: {})", SessionId, RemovedAppName, RemovedJobId);
return true;
}
@@ -244,4 +1120,202 @@ SessionsService::GetSessionCount() const
return m_Sessions.size();
}
+// The "when was this session last relevant" timestamp used for age checks
+// and count-based eviction ordering: EndedAt if set, otherwise UpdatedAt.
+static DateTime
+ReferenceTime(const SessionsService::SessionInfo& Info)
+{
+ return Info.EndedAt.GetTicks() != 0 ? Info.EndedAt : Info.UpdatedAt;
+}
+
+size_t
+SessionsService::CheckProcessLiveness()
+{
+ // Snapshot active sessions under a shared lock, then probe liveness and
+ // drop dead ones without holding the service lock (IsRunning() is a
+ // kernel round-trip and RemoveSession() takes the lock exclusively).
+ std::vector<Ref<Session>> Candidates;
+ {
+ RwLock::SharedLockScope Lock(m_Lock);
+ Candidates.reserve(m_Sessions.size());
+ for (const auto& [Id, SessionRef] : m_Sessions)
+ {
+ if (SessionRef->GetClientProcess().IsValid())
+ {
+ Candidates.push_back(SessionRef);
+ }
+ }
+ }
+
+ size_t Ended = 0;
+ for (const Ref<Session>& S : Candidates)
+ {
+ // m_ClientProcess is set once at construction and never mutated, so
+ // reading it here without synchronization is safe.
+ if (!S->GetClientProcess().IsRunning())
+ {
+ // Build the termination reason. On Windows, GetExitCode() returns
+ // the real OS exit code and is cheap right after !IsRunning(); we
+ // map the common NTSTATUS codes (Ctrl-C, access violation, DLL
+ // init failure, …) to human-readable names. On POSIX the exit
+ // code is only populated via Wait() which we never call, so
+ // stick with the plain reason.
+ std::string Reason = "process exited";
+#if ZEN_PLATFORM_WINDOWS
+ const uint32_t ExitCode = static_cast<uint32_t>(S->GetClientProcess().GetExitCode());
+ if (ExitCode != 0)
+ {
+ Reason = DescribeWindowsExitCode(ExitCode);
+ }
+#endif
+ if (RemoveSession(S->Info().Id, Reason))
+ {
+ ++Ended;
+ }
+ }
+ }
+ return Ended;
+}
+
+SessionsService::PruneResult
+SessionsService::PruneExpired(TimeSpan MaxAge, size_t MaxCount, uint64_t MaxStorageBytes)
+{
+ PruneResult Result;
+ std::vector<Oid> ToDelete;
+
+ // Phase 1: age + count pruning (fast; purely in-memory).
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+
+ const uint64_t NowTicks = DateTime::Now().GetTicks();
+ const uint64_t CutoffTicks = NowTicks > MaxAge.GetTicks() ? NowTicks - MaxAge.GetTicks() : 0;
+ const DateTime Cutoff{CutoffTicks};
+
+ // Age-based pruning: drop ended sessions whose reference time is
+ // older than Cutoff.
+ auto ExpiredIt = std::remove_if(m_EndedSessions.begin(), m_EndedSessions.end(), [&](const Ref<Session>& S) {
+ if (ReferenceTime(S->Info()) < Cutoff)
+ {
+ ToDelete.push_back(S->Info().Id);
+ return true;
+ }
+ return false;
+ });
+ Result.ExpiredByAge = size_t(m_EndedSessions.end() - ExpiredIt);
+ m_EndedSessions.erase(ExpiredIt, m_EndedSessions.end());
+
+ // Count-based pruning: keep at most MaxCount sessions total. Active
+ // sessions are never touched; if there are already >= MaxCount active
+ // sessions then all ended sessions get evicted.
+ const size_t ActiveCount = m_Sessions.size();
+ const size_t EndedTarget = MaxCount > ActiveCount ? MaxCount - ActiveCount : 0;
+ if (m_EndedSessions.size() > EndedTarget)
+ {
+ const size_t ToRemove = m_EndedSessions.size() - EndedTarget;
+ // Move the `ToRemove` oldest entries to the front.
+ std::partial_sort(
+ m_EndedSessions.begin(),
+ m_EndedSessions.begin() + ToRemove,
+ m_EndedSessions.end(),
+ [](const Ref<Session>& A, const Ref<Session>& B) { return ReferenceTime(A->Info()) < ReferenceTime(B->Info()); });
+ for (size_t i = 0; i < ToRemove; ++i)
+ {
+ ToDelete.push_back(m_EndedSessions[i]->Info().Id);
+ }
+ m_EndedSessions.erase(m_EndedSessions.begin(), m_EndedSessions.begin() + ToRemove);
+ Result.ExpiredByCount = ToRemove;
+ }
+ }
+
+ // Phase 1 disk deletion, outside the service lock.
+ if (m_SessionLogs)
+ {
+ for (const Oid& Id : ToDelete)
+ {
+ m_SessionLogs->DeleteSession(Id);
+ }
+ }
+
+ // Phase 2: storage-footprint pruning. Snapshot remaining ended sessions
+ // (id + reference time) under a shared lock, then stat each directory
+ // outside the lock so we don't hold writers off during filesystem calls.
+ if (!m_SessionLogs)
+ {
+ return Result;
+ }
+
+ struct Candidate
+ {
+ Oid Id;
+ DateTime RefTime;
+ uint64_t Size = 0;
+ };
+ std::vector<Candidate> Candidates;
+ {
+ RwLock::SharedLockScope Lock(m_Lock);
+ Candidates.reserve(m_EndedSessions.size());
+ for (const Ref<Session>& S : m_EndedSessions)
+ {
+ Candidates.push_back(Candidate{.Id = S->Info().Id, .RefTime = ReferenceTime(S->Info())});
+ }
+ }
+
+ uint64_t TotalBytes = 0;
+ for (Candidate& C : Candidates)
+ {
+ C.Size = m_SessionLogs->GetSessionSize(C.Id);
+ TotalBytes += C.Size;
+ }
+
+ if (TotalBytes <= MaxStorageBytes)
+ {
+ return Result;
+ }
+
+ // Oldest first so we evict in chronological order.
+ std::sort(Candidates.begin(), Candidates.end(), [](const Candidate& A, const Candidate& B) { return A.RefTime < B.RefTime; });
+
+ std::vector<Oid> StorageDelete;
+ uint64_t Reclaimed = 0;
+ const uint64_t NeedBytes = TotalBytes - MaxStorageBytes;
+ for (const Candidate& C : Candidates)
+ {
+ if (Reclaimed >= NeedBytes)
+ {
+ break;
+ }
+ StorageDelete.push_back(C.Id);
+ Reclaimed += C.Size;
+ }
+
+ if (StorageDelete.empty())
+ {
+ return Result;
+ }
+
+ // Erase from m_EndedSessions under exclusive lock. Concurrent RemoveSession
+ // calls between the snapshot and here will have inserted new entries at
+ // the back, which we safely leave alone.
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ tsl::robin_map<Oid, uint8_t, Oid::Hasher> IdSet;
+ for (const Oid& Id : StorageDelete)
+ {
+ IdSet[Id] = 1;
+ }
+ auto It = std::remove_if(m_EndedSessions.begin(), m_EndedSessions.end(), [&](const Ref<Session>& S) {
+ return IdSet.contains(S->Info().Id);
+ });
+ m_EndedSessions.erase(It, m_EndedSessions.end());
+ }
+
+ for (const Oid& Id : StorageDelete)
+ {
+ m_SessionLogs->DeleteSession(Id);
+ }
+ Result.ExpiredByStorage = StorageDelete.size();
+
+ return Result;
+}
+
} // namespace zen
diff --git a/src/zenserver/sessions/sessions.h b/src/zenserver/sessions/sessions.h
index a84ca6506..a722704e0 100644
--- a/src/zenserver/sessions/sessions.h
+++ b/src/zenserver/sessions/sessions.h
@@ -4,6 +4,8 @@
#include <zencore/compactbinary.h>
#include <zencore/logbase.h>
+#include <zencore/memory/memoryarena.h>
+#include <zencore/process.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
@@ -11,7 +13,10 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <EASTL/deque.h>
#include <tsl/robin_map.h>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <filesystem>
+#include <functional>
#include <optional>
+#include <span>
#include <string>
#include <vector>
@@ -34,25 +39,62 @@ public:
Oid Id;
std::string AppName;
std::string Mode;
- Oid JobId;
- CbObject Metadata;
- DateTime CreatedAt;
- DateTime UpdatedAt;
- DateTime EndedAt{0};
+ std::string Platform; // Reported by the client, e.g. "windows", "linux", "macos"
+ uint32_t ClientPid = 0; // Non-zero = local PID to probe for liveness. 0 = don't track.
+ Oid ParentSessionId;
+ // Optional task/action identifier used to associate this session with a
+ // specific unit of work. Distinct from ParentSessionId, which records
+ // process/session ancestry.
+ Oid JobId;
+ CbObject Metadata;
+ DateTime CreatedAt;
+ DateTime UpdatedAt;
+ DateTime EndedAt{0};
};
+ /// Stored form of a log entry. The string fields are arena-borrowed
+ /// `const char*` — they live in the owning Session's MemoryArena and
+ /// are valid only for that Session's lifetime. Default copy is
+ /// intentionally shallow (string pointers are shared with the source);
+ /// callers must not let copies outlive the originating Session.
+ ///
+ /// Build entries via `LogEntryInput` and route them through
+ /// `Session::AppendLog` / `AppendLogBatch`, which intern logger names
+ /// and arena-allocate the other strings before storing.
struct LogEntry
{
- DateTime Timestamp;
- std::string Level;
- std::string Message;
- CbObject Data;
+ DateTime Timestamp{0};
+ // Sentinel: Off means "no level set" (e.g. plain-text POSTed entries
+ // where the client didn't include a level). Real log entries use
+ // Trace..Critical, so Off is free to reuse as "omit on serialize".
+ logging::LogLevel Level = logging::Off;
+ // Arena pointers (null-terminated). Empty string is the default
+ // — never null, so callers don't need to guard.
+ const char* LoggerName = ""; // Interned: one canonical copy per unique name across the session.
+ const char* Message = ""; // For structured entries: the rendered form (populated at intake).
+ const char* Format = ""; // UE_LOGFMT template; "" for plain entries.
+ CbObject Fields; // Present only when Format is non-empty.
+ };
+
+ /// Input form used to build an entry on the way into a Session. The
+ /// string_view fields are caller-borrowed; AppendLog interns/copies
+ /// them into the Session's arena before any LogEntry is built. Use
+ /// this struct rather than constructing LogEntry directly so the
+ /// arena ownership invariant stays one-sided.
+ struct LogEntryInput
+ {
+ DateTime Timestamp{0};
+ logging::LogLevel Level = logging::Off;
+ std::string_view LoggerName;
+ std::string_view Message;
+ std::string_view Format;
+ CbObject Fields;
};
class Session : public TRefCounted<Session>
{
public:
- Session(const SessionInfo& Info);
+ Session(const SessionInfo& Info, Ref<SessionLog> Log = {}, ProcessHandle ClientProcess = {});
~Session();
Session(Session&&) = delete;
@@ -67,12 +109,29 @@ public:
void SetEndedAt(DateTime When) { m_Info.EndedAt = When; }
- void AppendLog(LogEntry Entry);
+ /// Appends an entry to the in-memory deque and to the persisted
+ /// log. Returns the new cursor value (m_TotalAppended post-
+ /// increment). Logger name is interned, message and format are
+ /// arena-allocated — the input's string_views may safely be
+ /// caller-stack-bound.
+ uint64_t AppendLog(LogEntryInput Input);
+
+ /// Append-many counterpart that takes the deque lock exactly once
+ /// for the whole batch. Use this when an inbound HTTP POST carries
+ /// multiple entries — single-lock semantics keep entries from one
+ /// caller contiguous on the wire even when other appends race in,
+ /// and the WS-push observer can fire just once for the whole batch.
+ /// Returns the new cursor (the value at the tail of the batch).
+ uint64_t AppendLogBatch(std::span<LogEntryInput> Inputs);
+
std::vector<LogEntry> GetLogEntries(uint32_t Limit = 0, uint32_t Offset = 0) const;
uint64_t GetLogCount() const;
/// Returns entries appended after the given cursor and the new cursor value.
/// A cursor of 0 returns all entries currently in the deque.
+ /// The returned LogEntries borrow strings from this Session's
+ /// arena — callers must hold a Ref<Session> for as long as they
+ /// keep the result.
struct CursorResult
{
std::vector<LogEntry> Entries;
@@ -81,26 +140,118 @@ public:
};
CursorResult GetLogEntriesAfter(uint64_t AfterCursor) const;
+ // Seed this session with pre-existing log entries (e.g. loaded from disk
+ // on startup). Sets the total-appended counter to reflect what was on
+ // disk so cursors remain meaningful for historical sessions. The inputs
+ // are interned/arena-allocated into this session.
+ void PreloadEntries(std::span<const LogEntryInput> Tail, uint64_t TotalCount);
+
+ /// Process handle used for client-liveness checks. Acquired at
+ /// registration time (while the pid is known to refer to the reporting
+ /// process) and held for the session's lifetime; on Windows this is a
+ /// real HANDLE tied to the specific process instance and is immune to
+ /// pid reuse. Invalid (IsValid() == false) for remote sessions or when
+ /// OpenProcess() failed. Set once at construction — no synchronization
+ /// needed for readers.
+ const ProcessHandle& GetClientProcess() const { return m_ClientProcess; }
+ ProcessHandle& GetClientProcess() { return m_ClientProcess; }
+
+ static constexpr uint32_t MaxLogEntries = 10000;
+
private:
+ // Intern a logger name into m_LogArena and return the canonical
+ // pointer for that name. Subsequent calls with the same string
+ // return the same pointer. Caller must hold m_LogLock exclusive.
+ const char* InternLoggerNameLocked(std::string_view Name);
+
+ // Allocate a copy of Str into m_LogArena and return a null-
+ // terminated pointer. No deduplication. Caller must hold m_LogLock
+ // exclusive. Empty input returns "" (no allocation).
+ const char* AllocateLogStringLocked(std::string_view Str);
+
SessionInfo m_Info;
Ref<SessionLog> m_Log;
+ ProcessHandle m_ClientProcess;
mutable RwLock m_LogLock;
eastl::deque<LogEntry> m_LogEntries;
uint64_t m_TotalAppended = 0; // monotonically increasing counter
-
- static constexpr uint32_t MaxLogEntries = 10000;
+ // String storage for the in-memory deque. LoggerName is interned
+ // (one canonical copy per unique name); Message and Format are
+ // duplicated per entry. Both die with the Session — so the
+ // LogEntry pointers do too. Sized to fit a typical session's
+ // strings in one chunk; spills to additional chunks otherwise.
+ MemoryArena m_LogArena{4096};
+ tsl::robin_map<std::string_view, const char*> m_InternedLoggerNames;
};
- SessionsService();
+ /// Construct a SessionsService. If StorageRoot is non-empty, session
+ /// metadata and logs are persisted under that directory (one subdirectory
+ /// per session id) and previously-persisted sessions are loaded as ended.
+ explicit SessionsService(std::filesystem::path StorageRoot = {});
~SessionsService();
- bool RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, const Oid& JobId, CbObjectView Metadata);
- bool UpdateSession(const Oid& SessionId, CbObjectView Metadata);
- Ref<Session> GetSession(const Oid& SessionId) const;
+ bool RegisterSession(const Oid& SessionId,
+ std::string AppName,
+ std::string Mode,
+ std::string Platform,
+ uint32_t ClientPid,
+ const Oid& ParentSessionId,
+ const Oid& JobId,
+ CbObjectView Metadata);
+ bool UpdateSession(const Oid& SessionId, CbObjectView Metadata);
+ Ref<Session> GetSession(const Oid& SessionId) const;
std::vector<Ref<Session>> GetSessions() const;
std::vector<Ref<Session>> GetEndedSessions() const;
- bool RemoveSession(const Oid& SessionId);
- uint64_t GetSessionCount() const;
+ /// Ends a session. If Reason is non-empty, a synthetic log line is
+ /// appended to the session log before it's moved to ended so the
+ /// historical log has a clear closing event.
+ bool RemoveSession(const Oid& SessionId, std::string_view Reason = {});
+ uint64_t GetSessionCount() const;
+
+ /// Appends a log entry to `SessionId` and, if the session exists,
+ /// invokes the log-appended callback with the new cursor so downstream
+ /// push subscribers (e.g. the HTTP WS broadcast) can deliver the delta
+ /// without polling. Returns the new cursor, or 0 if the session is
+ /// unknown. Fires the callback AFTER any internal locks are released
+ /// so the callback can safely call back into this service.
+ uint64_t AppendLog(const Oid& SessionId, LogEntryInput Input);
+
+ /// Batch counterpart of AppendLog. Atomic with respect to other
+ /// appends to the same session — entries land contiguously on the
+ /// wire and persist in order — and fires exactly one push-callback
+ /// for the whole batch. Empty batches and unknown sessions are
+ /// no-ops returning 0.
+ uint64_t AppendLogBatch(const Oid& SessionId, std::span<LogEntryInput> Inputs);
+
+ /// Observer fired after an entry is appended to any session. Replaces
+ /// any previously set callback. Pass {} to clear. Only one listener is
+ /// supported — the single consumer today is the HTTP WebSocket push.
+ using LogAppendedCallback = std::function<void(const Oid& SessionId, uint64_t NewCursor)>;
+ void SetLogAppendedCallback(LogAppendedCallback Callback);
+
+ /// Drop ended sessions that are too old, that push us over the count
+ /// limit, or that push the on-disk footprint over the byte budget, and
+ /// delete their persisted directories. Active sessions are never
+ /// pruned. Returns the number removed by each criterion.
+ struct PruneResult
+ {
+ size_t ExpiredByAge = 0;
+ size_t ExpiredByCount = 0;
+ size_t ExpiredByStorage = 0;
+ };
+ PruneResult PruneExpired(TimeSpan MaxAge, size_t MaxCount, uint64_t MaxStorageBytes);
+
+ /// End any active session whose tracked client process is no longer
+ /// running. Sessions with an invalid ProcessHandle (remote, or
+ /// OpenProcess failed at registration) are skipped. Returns the number
+ /// of sessions ended by this pass.
+ size_t CheckProcessLiveness();
+
+ // Tuning defaults. Expressed in whole days / bytes so they're easy to
+ // override from a future command-line flag without touching internals.
+ static constexpr int kDefaultMaxSessionAgeDays = 365;
+ static constexpr size_t kDefaultMaxSessionCount = 1000;
+ static constexpr uint64_t kDefaultMaxStorageBytes = 50ull * 1024 * 1024; // 50 MiB
private:
LoggerRef& Log() { return m_Log; }
@@ -110,6 +261,10 @@ private:
tsl::robin_map<Oid, Ref<Session>, Oid::Hasher> m_Sessions;
std::vector<Ref<Session>> m_EndedSessions;
std::unique_ptr<SessionLogStore> m_SessionLogs;
+ // Set once at wiring-time (single consumer), never reassigned while
+ // hot, so no dedicated lock — just a plain member. Copy-on-call
+ // guards against the theoretical re-register race below.
+ LogAppendedCallback m_LogAppendedCallback;
};
} // namespace zen