aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/sessions/httpsessions.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-05-05 15:47:48 +0200
committerGitHub Enterprise <[email protected]>2026-05-05 15:47:48 +0200
commit01286c6233347d561064fc9e6cf9deaf2087ceb7 (patch)
treebdbfdf01725baa2d2dd3d73727e6506b41421dff /src/zenserver/sessions/httpsessions.cpp
parenthub async s3 client (#1024) (diff)
downloadarchived-zen-main.tar.xz
archived-zen-main.zip
sessions: persist to disk, prune, track client liveness, accept UE_LOGFMT (#1014)HEADmain
Branch started as a sessions-service overhaul (persistence, client liveness, UE_LOGFMT intake) and grew to pick up adjacent infrastructure work: an early-startup log backlog, a hardened `MemoryArena`, the `zen trace serve` viewer gaining a counter view + compact timeline + tabbed callsite panel, defensive fixes in the third-party `tourist` trace parser, a series of allocation reductions across the HTTP and compact-binary hot paths, and a new `zen sessions` CLI command tree. ## Sessions service **Persistence.** Each session lives on disk under `<DataRoot>/sessions/<id>/` as `info.cb` (metadata) plus `log.bin` (length-prefixed CbObject log records). On startup the service scans that directory and loads prior sessions as ended sessions, preloading the tail of each log so historical views work after a restart. `SessionLog` is noexcept-constructed and falls back to a disabled state on disk errors, so a bad disk can't take down `RegisterSession`. `GetSession` falls back to the ended-sessions list (fixes historical log fetches over HTTP). `LoadTail` counts only successfully-parsed records. **Pruning.** Periodic cleanup task drops ended sessions once any of three caps is exceeded: age (default 1 year), count (default 1000), or total on-disk footprint (default 50 MiB). Runs 30 s after startup, hourly thereafter. Active sessions never pruned; disk removal and directory stat happen outside the exclusive lock so a slow filesystem can't stall lookups. **Client liveness.** Sessions carry a `ProcessHandle` for the client-reported pid, captured at registration time so Windows pid recycling can't produce false positives. A 30 s asio timer probes liveness and ends dead sessions through the normal remove path, producing a synthetic `Session ended: process exited (...)` line persisted to `log.bin`. Windows decodes common NTSTATUS exit codes to human names (Ctrl-C, access violation, stack overflow, ...); POSIX stays at plain `process exited`. Clients auto-fill `ClientPid` only for local targets (unix socket / loopback); the server defensively accepts pids only from `IsLocalMachineRequest()` peers. zenserver also reports its own pid when registering its self-session, so it shows up with a real pid in the dashboard and `zen sessions ls`. **Synthetic end-of-session line.** `RemoveSession` takes an optional reason; before the session moves to the ended list it appends an Info-level `Session ended[: reason]` entry through the normal log path (released outside `m_Lock`). Current reasons: `client request` (HTTP DELETE), `server shutdown` (self-session), `process exited (...)` (liveness). **UE_LOGFMT structured entries.** `POST /sessions/{id}/log` now accepts `{level, logger, format, fields}` alongside the existing `{level, logger, message}` shape. New `logtemplate.{h,cpp}` implements UE's `StructuredLog.cpp` template grammar (field paths with `.name` / `[N]`, `{{`/`}}` escapes, `$text` / `$format` / `$locformat` object conventions, bounded recursion). Renders to a displayable message at intake while persisting raw format + fields so a future UI can drill into fields without another schema bump. Hot path is zero-alloc — renders into `ExtendableStringBuilder<256>` using stack-buffered `Oid::ToString` / `IoHash::ToHexString` overloads. UI shows a `{…}` marker with the raw template + JSON-pretty fields on hover. **Parent sessions.** `SessionInfo` gains `parent_session_id`; hub-managed storage server child processes inherit the hub's session id via `--parent-session=<id>`. `ZEN_SESSIONS_URL` env var becomes a fallback for `--sessions-url` / config when neither is provided. The in-process session log sink is disabled when a remote sessions target is configured (logs flow through `SessionsServiceClient` instead). The sessions UI groups child sessions under their parent (collapsible/expandable, sorts as a unit, supports nesting). **Platform reporting.** `SessionInfo` gains `Platform`, flowed end-to-end: client auto-fills via `GetRuntimePlatformName()`, server persists in `info.cb` (`plat`) and emits on GET. UI renders as a SimpleIcons-style inline SVG (windows / macOS / iOS / linux / wine / android / playstation / xbox / nintendo) with case-insensitive alias resolution (Win32/Win64, PS4/PS5, XSX/XSS, NintendoSwitch, iPhone/iPad, Darwin/OSX). Unknown values fall back to text; sorting runs on the underlying string. **WebSocket log streaming.** Sessions UI moves from 2 s polling to a WebSocket push model. New `WsSubscriber` has a stable id + helper methods. UI caps the log-line DOM at 5 000 entries with a shared cursor-regression helper, factored out of two call sites. Per-broadcast allocations trimmed on the push path; fixed a stack overrun in the WS log broadcast hex-id buffer. **Log memory.** `LogEntry::Level` is now `logging::LogLevel` (1 byte) instead of `std::string` (~32 B) — saves ~310 KB per full 10 k-entry deque and eliminates a per-message allocation in the in-proc sink. On-disk format writes an int32 and accepts either int or legacy string on read. `LogEntry` strings now live in a `MemoryArena`; logger names are interned across the deque. `SessionLog::Append` and `WriteSessionInfoFile` drop their `UniqueBuffer` round-trip and write `CbObject::GetView()` straight through `BasicFile` / `SafeWriteFile`. Multi-entry `POST /log` batched under one lock + one push. **In-proc log timestamps.** `InProcSessionLogSink::TimePointToDateTime` previously preserved only whole seconds, so every in-proc entry rendered at `.000` ms in the dashboard and `zen sessions tail`. It now adds the sub-second part (nanoseconds → 100 ns ticks) to keep ms precision end-to-end. **UI.** Side "Session Details" panel is gone — its info is inline in the table (appname, mode, platform, id, timestamps, this/log pills, active dot). Bottom panel is a tabbed `Log | Metadata` view with a right-side "Session Information" panel beside metadata; log-only controls (filter, newest-first, follow, log-level filter, expand/collapse) hide when Metadata is active, polling keeps running across tab switches. Wide-mode toggle fills the viewport edge-to-edge. Log lines show the logger category; timestamps render in 24 h with zero-padded fields regardless of locale. Sessions list defaults to All / 10 per page / created-desc, gains click-to-sort headers on the full dataset, a header filter box, and a pager aligned to the table's right edge. Duplicate auto-injected `<h1>Sessions</h1>` removed. ## `zen sessions` CLI New command tree on the `zen` client for inspecting the sessions service from the terminal: - **`zen sessions ls`** — lists sessions (active first, ended next; newest-first within each group) with id, status, app/mode, pid, created, duration, and log count. Supports `--status active|ended|all` (default `all`). - **`zen sessions status`** — prints the sessions service summary: self id, active / ended counts, and the read/write/delete/list/request/bad-request counters from `/stats/sessions`. - **`zen sessions tail [session]`** — tails a session's log. With no argument it tails zenserver's own session (resolved via `/sessions/list`'s `self_id`); an explicit 24-hex id targets any session, including ended ones (historical replay). `--lines N` (default 50, 0 = all buffered) trims the initial dump client-side. `--follow` prefers a WebSocket push subscription on `/sessions/ws` for sub-second latency; on upgrade failure (older server, blocked port, unix-socket transport) it falls back to HTTP cursor polling at `--interval-ms` (default 500), with sleeps chunked to 50 ms so Ctrl-C reacts quickly. Output matches `zen::logging::FullFormatter` (`[YY-MM-DD HH:MM:SS.mmm] [lvl] [logger] message`); on a TTY the level is colored and the logger is bold, with continuation lines indented under the message column using the *visible* prefix width. 404 surfaces as `(session ended)` and connection errors as `(server gone)` — both clean exits, so stopping the server mid-tail no longer prints a stack trace. - **`zen sessions ui`** — opens `<host>/dashboard/?page=sessions` in the user's default browser. Rejects unix-socket hosts. A small `ZenServiceClient::IsUnixSocket()` helper now wraps the unix-socket check used by `ui`, `sessions tail` (WS path), and `sessions ui`. ## Logging `BacklogSink` captures early-startup log entries in a fixed-capacity ring so late-attached sinks (session sink, file sink) can replay them. Detaches from the broadcast list when disabled; backed by destructor-only cleanup (no `unique_ptr` indirection per entry). Tuned defaults so the backlog covers typical bring-up without unbounded growth. ## `zen trace serve` viewer - Compact timeline mode for high-density views. - New `TRACE_INT_VALUE` / `TRACE_FLOAT_VALUE` counter trace points + a counters page in the viewer. - Callsite tables collapsed into a single tabbed panel. - Lossless `Oid <-> Guid` bridge for trace session ids; trace `SessionId` plumbed through. - `tourist` parser hardening: bounds-check `BufferStream::read`, validate `Type::info_size` before `patch()`, convert `parse_important_aux` to a loop (avoids deep recursion), widen `ParserPool` index to `uint32`, bounds-check field offsets in the dispatcher, pin `Types::parse` buffer up-front. ## `MemoryArena` Configurable chunk size, inline chunk list, oversize requests routed to truly-dedicated chunks (no slack waste, no fragmentation when one allocation is much larger than the chunk). ## Allocation cleanups across hot paths - `zenhttp::HttpRequestRouter::HandleRequest` and `FormatPackageMessageInternal`: drop heap allocations. - Compact-binary validation: `eastl::fixed_vector` + `eastl::sort`; eliminate `std::vector` churn. - `zenserverprocess`: trim transient allocations in spawn paths. - Sessions HTTP intake / broadcast: drop transient `std::string` allocs.
Diffstat (limited to 'src/zenserver/sessions/httpsessions.cpp')
-rw-r--r--src/zenserver/sessions/httpsessions.cpp504
1 files changed, 460 insertions, 44 deletions
diff --git a/src/zenserver/sessions/httpsessions.cpp b/src/zenserver/sessions/httpsessions.cpp
index 88db36828..1678ede60 100644
--- a/src/zenserver/sessions/httpsessions.cpp
+++ b/src/zenserver/sessions/httpsessions.cpp
@@ -7,8 +7,17 @@
#include <zencore/logging.h>
#include <zencore/string.h>
#include <zencore/trace.h>
+#include "logtemplate.h"
#include "sessions.h"
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <EASTL/fixed_list.h>
+#include <EASTL/fixed_vector.h>
+#include <json11.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <limits>
+
namespace zen {
using namespace std::literals;
@@ -21,13 +30,21 @@ HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService,
, m_StatsService(StatsService)
, m_Sessions(Sessions)
, m_PushTimer(IoContext)
+, m_CleanupTimer(IoContext)
+, m_LivenessTimer(IoContext)
{
Initialize();
}
HttpSessionsService::~HttpSessionsService()
{
+ // Break the callback edge before tearing anything else down so a
+ // late AppendLog on another thread can't fire BroadcastLogAppended
+ // after our subscriber list is gone.
+ m_Sessions.SetLogAppendedCallback({});
m_PushTimer.cancel();
+ m_CleanupTimer.cancel();
+ m_LivenessTimer.cancel();
m_StatsService.UnregisterHandler("sessions", *this);
m_StatusService.UnregisterHandler("sessions", *this);
}
@@ -135,12 +152,36 @@ HttpSessionsService::Initialize()
m_StatsService.RegisterHandler("sessions", *this);
m_StatusService.RegisterHandler("sessions", *this);
+ // Event-driven log push: the service fires this every time an entry
+ // is appended (including the synthetic "session ended" line emitted
+ // by RemoveSession). Subscribers receive a binary CB frame carrying
+ // the delta. Safe to call BroadcastLogAppended from any thread — it
+ // does its own locking and SendBinary is async-queued by the WS
+ // transport.
+ m_Sessions.SetLogAppendedCallback([this](const Oid& SessionId, uint64_t NewCursor) { BroadcastLogAppended(SessionId, NewCursor); });
+
EnqueuePushTimer();
+
+ // Run a cleanup pass shortly after startup so freshly-loaded historical
+ // data is pruned even if the server doesn't stay up for an hour.
+ m_CleanupTimer.expires_after(std::chrono::seconds(30));
+ m_CleanupTimer.async_wait([this](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+ RunCleanup();
+ EnqueueCleanupTimer();
+ });
+
+ EnqueueLivenessTimer();
}
static void
-WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info)
+WriteSessionInfo(CbWriter& Writer, const SessionsService::Session& Session)
{
+ const SessionsService::SessionInfo& Info = Session.Info();
+
Writer << "id" << Info.Id;
if (!Info.AppName.empty())
{
@@ -150,6 +191,18 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info)
{
Writer << "mode" << Info.Mode;
}
+ if (!Info.Platform.empty())
+ {
+ Writer << "platform" << Info.Platform;
+ }
+ if (Info.ClientPid != 0)
+ {
+ Writer << "pid" << Info.ClientPid;
+ }
+ if (Info.ParentSessionId != Oid::Zero)
+ {
+ Writer << "parent_session_id" << Info.ParentSessionId;
+ }
if (Info.JobId != Oid::Zero)
{
Writer << "jobid" << Info.JobId;
@@ -161,6 +214,11 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info)
Writer << "ended_at" << Info.EndedAt;
}
+ if (const uint64_t LogCount = Session.GetLogCount(); LogCount > 0)
+ {
+ Writer << "log_count" << LogCount;
+ }
+
if (Info.Metadata.GetSize() > 0)
{
Writer.AddObject("metadata"sv, Info.Metadata);
@@ -182,13 +240,13 @@ HttpSessionsService::BuildSessionListResponse()
for (const Ref<SessionsService::Session>& Session : Active)
{
Response.BeginObject();
- WriteSessionInfo(Response, Session->Info());
+ WriteSessionInfo(Response, *Session);
Response.EndObject();
}
for (const Ref<SessionsService::Session>& Session : Ended)
{
Response.BeginObject();
- WriteSessionInfo(Response, Session->Info());
+ WriteSessionInfo(Response, *Session);
Response.EndObject();
}
Response.EndArray();
@@ -231,7 +289,7 @@ HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req)
for (const Ref<SessionsService::Session>& Session : Sessions)
{
Response.BeginObject();
- WriteSessionInfo(Response, Session->Info());
+ WriteSessionInfo(Response, *Session);
Response.EndObject();
}
Response.EndArray();
@@ -262,24 +320,51 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
{
CbObject RequestObject = ServerRequest.ReadPayloadObject();
+ // Render the id into a stack buffer once for any success-reply
+ // paths below — avoids a std::string per POST/PUT.
+ char IdBuf[Oid::StringLength + 1] = {};
+ SessionId.ToString(IdBuf);
+ const std::string_view IdStr(IdBuf, Oid::StringLength);
+
if (ServerRequest.RequestVerb() == HttpVerb::kPost)
{
std::string AppName(RequestObject["appname"sv].AsString());
std::string Mode(RequestObject["mode"sv].AsString());
- Oid JobId = RequestObject["jobid"sv].AsObjectId();
- CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView();
+ std::string Platform(RequestObject["platform"sv].AsString());
+ Oid ParentSessionId = RequestObject["parent_session_id"sv].AsObjectId();
+ Oid JobId = RequestObject["jobid"sv].AsObjectId();
+ CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView();
+
+ // Only trust a client-reported pid when the HTTP layer
+ // says the request is local (unix socket or a loopback
+ // TCP peer). A remote client's pid refers to a different
+ // machine's process table — opening a local handle with
+ // it would at best be meaningless, at worst a liveness
+ // false positive.
+ uint32_t ClientPid = 0;
+ if (ServerRequest.IsLocalMachineRequest())
+ {
+ ClientPid = RequestObject["pid"sv].AsUInt32();
+ }
m_SessionsStats.SessionWriteCount++;
- if (m_Sessions.RegisterSession(SessionId, std::move(AppName), std::move(Mode), JobId, MetadataView))
+ if (m_Sessions.RegisterSession(SessionId,
+ std::move(AppName),
+ std::move(Mode),
+ std::move(Platform),
+ ClientPid,
+ ParentSessionId,
+ JobId,
+ MetadataView))
{
- return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", SessionId));
+ return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, IdStr);
}
else
{
// Already exists - try update instead
if (m_Sessions.UpdateSession(SessionId, MetadataView))
{
- return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", SessionId));
+ return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, IdStr);
}
return ServerRequest.WriteResponse(HttpResponseCode::InternalServerError);
}
@@ -290,7 +375,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
m_SessionsStats.SessionWriteCount++;
if (m_Sessions.UpdateSession(SessionId, RequestObject["metadata"sv].AsObjectView()))
{
- return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", SessionId));
+ return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, IdStr);
}
return ServerRequest.WriteResponse(HttpResponseCode::NotFound,
HttpContentType::kText,
@@ -304,7 +389,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
if (Session)
{
CbObjectWriter Response;
- WriteSessionInfo(Response, Session->Info());
+ WriteSessionInfo(Response, *Session);
return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save());
}
return ServerRequest.WriteResponse(HttpResponseCode::NotFound);
@@ -312,7 +397,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
case HttpVerb::kDelete:
{
m_SessionsStats.SessionDeleteCount++;
- if (m_Sessions.RemoveSession(SessionId))
+ if (m_Sessions.RemoveSession(SessionId, "client request"sv))
{
return ServerRequest.WriteResponse(HttpResponseCode::OK);
}
@@ -334,17 +419,33 @@ static void
WriteLogEntry(CbWriter& Writer, const SessionsService::LogEntry& Entry)
{
Writer << "timestamp" << Entry.Timestamp;
- if (!Entry.Level.empty())
+ if (Entry.Level != logging::Off)
{
- Writer << "level" << Entry.Level;
+ // Frontend renders on the string form (CSS class derives from it), so
+ // keep the wire format as the canonical lowercase name.
+ Writer << "level" << logging::ToString(Entry.Level);
}
- if (!Entry.Message.empty())
+ const std::string_view LoggerName{Entry.LoggerName};
+ if (!LoggerName.empty())
{
- Writer << "message" << Entry.Message;
+ Writer << "logger" << LoggerName;
}
- if (Entry.Data.GetSize() > 0)
+ const std::string_view Message{Entry.Message};
+ if (!Message.empty())
{
- Writer.AddObject("data"sv, Entry.Data);
+ Writer << "message" << Message;
+ }
+ // Structured-log form alongside the rendered message so a future UI
+ // can offer field-level drill-down without another schema bump. The
+ // existing UI only looks at "message" and is unaffected.
+ const std::string_view Format{Entry.Format};
+ if (!Format.empty())
+ {
+ Writer << "format" << Format;
+ if (Entry.Fields.GetSize() > 0)
+ {
+ Writer.AddObject("fields"sv, Entry.Fields);
+ }
}
}
@@ -378,12 +479,21 @@ HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req)
if (ServerRequest.RequestContentType() == HttpContentType::kText)
{
- // Raw text - split by newlines, one entry per line
+ // Raw text - split by newlines, one entry per line. Collect
+ // into a batch and append atomically: keeps a single client's
+ // payload contiguous on the wire even when other clients race
+ // in, and fires the WS push observer just once for the whole
+ // batch instead of once per line.
IoBuffer Payload = ServerRequest.ReadPayload();
std::string_view Text(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
const DateTime Now = DateTime::Now();
- size_t Pos = 0;
+ // 64 inline slots covers the typical SendLogBatch posting size
+ // (~50) without touching the heap. Spills to heap beyond that.
+ // LogEntryInput's string_views point into the request payload
+ // (Text), which lives for the duration of this handler.
+ eastl::fixed_vector<SessionsService::LogEntryInput, 64> Batch;
+ size_t Pos = 0;
while (Pos < Text.size())
{
size_t End = Text.find('\n', Pos);
@@ -401,60 +511,115 @@ HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req)
if (!Line.empty())
{
- Session->AppendLog(SessionsService::LogEntry{
+ Batch.push_back(SessionsService::LogEntryInput{
.Timestamp = Now,
- .Message = std::string(Line),
+ .Message = Line,
});
}
Pos = End + 1;
}
+ m_Sessions.AppendLogBatch(SessionId, Batch);
}
else
{
- // Structured log (JSON or CbObject)
- // Accepts a single record or an "entries" array of records
+ // Structured log (JSON or CbObject). Accepts a single record
+ // or an "entries" array of records — collect into a batch so
+ // a single POST lands atomically and fires one WS push.
CbObject RequestObject = ServerRequest.ReadPayloadObject();
const DateTime Now = DateTime::Now();
+ // 64 inline slots covers the typical SendLogBatch posting size
+ // (~50) without touching the heap. Spills to heap beyond that.
+ // LogEntryInput's string_views borrow from the parsed
+ // RequestObject's underlying buffer (the logger / message /
+ // format strings on the wire); we keep RequestObject alive
+ // for the whole intake.
+ eastl::fixed_vector<SessionsService::LogEntryInput, 64> Batch;
+
+ // Stable backing for messages we render from a structured
+ // template. fixed_list never moves nodes on insertion, so
+ // string_views into these strings stay valid until the list
+ // is destroyed at handler exit. 64 inline nodes match the
+ // batch's fixed-vector inline cap; spills to heap if a POST
+ // brings more.
+ eastl::fixed_list<std::string, 64> RenderedMessages;
+
auto AppendFromObject = [&](CbObjectView Obj) {
- CbFieldView LevelField = Obj["level"sv];
- std::string_view Level;
+ CbFieldView LevelField = Obj["level"sv];
+ logging::LogLevel Level = logging::Off;
if (LevelField.IsString())
{
- Level = LevelField.AsString();
+ Level = logging::ParseLogLevelString(LevelField.AsString());
}
else if (LevelField.IsInteger())
{
int32_t LevelInt = LevelField.AsInt32();
if (LevelInt >= 0 && LevelInt < logging::LogLevelCount)
{
- Level = logging::ToString(static_cast<logging::LogLevel>(LevelInt));
+ Level = static_cast<logging::LogLevel>(LevelInt);
}
}
- std::string Message(Obj["message"sv].AsString());
- CbObjectView DataView = Obj["data"sv].AsObjectView();
-
- Session->AppendLog(SessionsService::LogEntry{
- .Timestamp = Now,
- .Level = std::string(Level),
- .Message = std::move(Message),
- .Data = CbObject::Clone(DataView),
+ const std::string_view LoggerName = Obj["logger"sv].AsString();
+
+ // Two entry shapes. Structured entries carry `format` +
+ // `fields` and no `message` — we render the template right
+ // here so the rest of the pipeline (in-memory deque,
+ // persisted log.bin, UI GET response) keeps working the
+ // same way for both shapes.
+ CbFieldView FormatField = Obj["format"sv];
+ if (FormatField.IsString())
+ {
+ const std::string_view FormatView = FormatField.AsString();
+ CbObjectView FieldsView = Obj["fields"sv].AsObjectView();
+ ExtendableStringBuilder<256> RenderedBuilder;
+ RenderLogTemplate(FormatView, FieldsView, RenderedBuilder);
+
+ // Anchor the rendered string in the stable list so the
+ // LogEntryInput's view into it stays valid until the
+ // AppendLogBatch call below.
+ RenderedMessages.emplace_back(RenderedBuilder.ToView());
+ const std::string& StoredRendered = RenderedMessages.back();
+
+ Batch.push_back(SessionsService::LogEntryInput{
+ .Timestamp = Now,
+ .Level = Level,
+ .LoggerName = LoggerName,
+ .Message = StoredRendered,
+ .Format = FormatView,
+ .Fields = CbObject::Clone(FieldsView),
+ });
+ return;
+ }
+
+ // Plain entry.
+ Batch.push_back(SessionsService::LogEntryInput{
+ .Timestamp = Now,
+ .Level = Level,
+ .LoggerName = LoggerName,
+ .Message = Obj["message"sv].AsString(),
});
};
CbFieldView EntriesField = RequestObject["entries"sv];
if (EntriesField.IsArray())
{
- for (CbFieldView Entry : EntriesField)
+ // Pre-reserve so the 50-ish entries from a typical
+ // SendLogBatch don't trigger 4-5 reallocations as the
+ // vector grows.
+ CbArrayView Arr = EntriesField.AsArrayView();
+ Batch.reserve(Arr.Num());
+ for (CbFieldView Entry : Arr)
{
AppendFromObject(Entry.AsObjectView());
}
}
else
{
+ Batch.reserve(1);
AppendFromObject(RequestObject);
}
+ m_Sessions.AppendLogBatch(SessionId, Batch);
}
return ServerRequest.WriteResponse(HttpResponseCode::OK);
@@ -547,13 +712,78 @@ HttpSessionsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::s
{
ZEN_UNUSED(RelativeUri);
ZEN_INFO("Sessions WebSocket client connected");
- m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); });
+ const uint64_t NewId = m_NextSubscriberId.fetch_add(1, std::memory_order_relaxed);
+ m_WsConnectionsLock.WithExclusiveLock(
+ [&] { m_WsConnections.push_back(WsSubscriber{.Connection = std::move(Connection), .Id = NewId}); });
}
void
-HttpSessionsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/)
+HttpSessionsService::OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg)
{
- // No client-to-server messages expected
+ // Expected client→server protocol is JSON text frames; see
+ // sessions.js → _ws_send. Binary frames and malformed JSON are logged
+ // at debug and ignored so a confused client can't disturb others.
+ if (Msg.Opcode != WebSocketOpcode::kText)
+ {
+ return;
+ }
+ std::string_view PayloadText(static_cast<const char*>(Msg.Payload.GetData()), Msg.Payload.GetSize());
+ std::string ParseError;
+ json11::Json Parsed = json11::Json::parse(std::string(PayloadText), ParseError);
+ if (!ParseError.empty() || !Parsed.is_object())
+ {
+ ZEN_DEBUG("Ignoring malformed WebSocket frame: {}", ParseError.empty() ? "not an object" : ParseError);
+ return;
+ }
+
+ const std::string& Type = Parsed["type"].string_value();
+ if (Type == "sub_log")
+ {
+ const Oid SessionId = Oid::TryFromHexString(Parsed["session"].string_value());
+ if (SessionId == Oid::Zero)
+ {
+ ZEN_DEBUG("sub_log with invalid session id '{}'", Parsed["session"].string_value());
+ return;
+ }
+ // json11 reports int via int_value() (32-bit); cursors fit easily
+ // inside a session's lifetime so this is fine for the foreseeable
+ // future. Negative values are treated as 0.
+ const int CursorRaw = Parsed["cursor"].int_value();
+ const uint64_t Cursor = CursorRaw > 0 ? static_cast<uint64_t>(CursorRaw) : 0;
+
+ // Record the subscription and fire an immediate delta so we don't
+ // drop entries that landed between the client's HTTP replay and
+ // this frame. See BroadcastLogAppended for the broadcast flow.
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ for (WsSubscriber& Sub : m_WsConnections)
+ {
+ if (Sub.Connection.Get() == &Conn)
+ {
+ Sub.SubscribedSessionId = SessionId;
+ Sub.LastSentCursor = Cursor;
+ break;
+ }
+ }
+ });
+ // Pass UINT64_MAX to force a flush even if the cursor hasn't
+ // advanced — the subscriber's LastSentCursor may already lag the
+ // tail (e.g. rapid posts before the client subscribed).
+ BroadcastLogAppended(SessionId, std::numeric_limits<uint64_t>::max());
+ }
+ else if (Type == "unsub_log")
+ {
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ for (WsSubscriber& Sub : m_WsConnections)
+ {
+ if (Sub.Connection.Get() == &Conn)
+ {
+ Sub.Unsubscribe();
+ break;
+ }
+ }
+ });
+ }
+ // Unknown types are silently ignored so the protocol can grow.
}
void
@@ -561,8 +791,8 @@ HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]
{
ZEN_INFO("Sessions WebSocket client disconnected (code {})", Code);
m_WsConnectionsLock.WithExclusiveLock([&] {
- auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) {
- return C.Get() == &Conn;
+ auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const WsSubscriber& Sub) {
+ return Sub.Connection.Get() == &Conn;
});
m_WsConnections.erase(It, m_WsConnections.end());
});
@@ -571,8 +801,15 @@ HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]
void
HttpSessionsService::BroadcastSessions()
{
- std::vector<Ref<WebSocketConnection>> Connections;
- m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; });
+ // 8 inline slots covers any realistic number of concurrent UI tabs;
+ // spills to heap beyond that.
+ eastl::fixed_vector<Ref<WebSocketConnection>, 8> Connections;
+ m_WsConnectionsLock.WithSharedLock([&] {
+ for (const WsSubscriber& Sub : m_WsConnections)
+ {
+ Connections.push_back(Sub.Connection);
+ }
+ });
if (Connections.empty())
{
@@ -593,6 +830,107 @@ HttpSessionsService::BroadcastSessions()
}
void
+HttpSessionsService::BroadcastLogAppended(const Oid& SessionId, uint64_t NewCursor)
+{
+ Ref<SessionsService::Session> Session = m_Sessions.GetSession(SessionId);
+ if (!Session)
+ {
+ // Session vanished (e.g. pruned) between the append and the
+ // broadcast. No entries to ship.
+ return;
+ }
+
+ // Claim each subscriber's cursor and snapshot its delta atomically under
+ // the exclusive WS lock. Doing claim+fetch+cursor-bump together — rather
+ // than snapshot-shared / fetch-unlocked / bump-exclusive — closes the
+ // race where two concurrent BroadcastLogAppended calls would both
+ // observe the same FromCursor, fetch overlapping ranges, and ship the
+ // subscriber duplicate entries. Sends still happen after the lock is
+ // released to avoid holding it across async socket I/O.
+ struct PendingSend
+ {
+ Ref<WebSocketConnection> Connection;
+ SessionsService::Session::CursorResult Delta;
+ bool InitialSend; // true when FromCursor == 0
+ };
+ // 8 inline slots keeps the broadcast allocation-free for the typical UI
+ // case (1-2 tabs tailing one session); spills to heap if many clients
+ // happen to subscribe to the same session at once.
+ eastl::fixed_vector<PendingSend, 8> Sends;
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ for (WsSubscriber& Sub : m_WsConnections)
+ {
+ if (!Sub.IsSubscribedTo(SessionId))
+ {
+ continue;
+ }
+ // Cheap gate: if the subscriber already has everything up to
+ // NewCursor, skip. Sub_log uses UINT64_MAX to force a flush.
+ if (NewCursor != std::numeric_limits<uint64_t>::max() && Sub.LastSentCursor >= NewCursor)
+ {
+ continue;
+ }
+ if (!Sub.Connection->IsOpen())
+ {
+ continue;
+ }
+ const uint64_t FromCursor = Sub.LastSentCursor;
+ SessionsService::Session::CursorResult Delta = Session->GetLogEntriesAfter(FromCursor);
+ Sub.LastSentCursor = Delta.Cursor;
+ Sends.push_back({Sub.Connection, std::move(Delta), FromCursor == 0});
+ }
+ });
+ if (Sends.empty())
+ {
+ return;
+ }
+
+ // Render the hex id into a stack buffer — CbWriter only needs a
+ // string_view, so we avoid the 24-byte std::string allocation that
+ // Oid::ToString() would otherwise do on every broadcast. The buffer
+ // is StringLength + 1 because ToString writes a trailing NUL beyond
+ // the 24 hex chars; the view itself excludes the NUL.
+ char HexSessionIdBuf[Oid::StringLength + 1];
+ SessionId.ToString(HexSessionIdBuf);
+ const std::string_view HexSessionId(HexSessionIdBuf, Oid::StringLength);
+ for (const PendingSend& Send : Sends)
+ {
+ if (Send.Delta.Entries.empty() && !Send.InitialSend)
+ {
+ // Nothing new and the subscriber was primed — nothing to send.
+ continue;
+ }
+
+ // Binary CB frame — the client already has a CB parser
+ // (util/compactbinary.js). CB keeps structured entries typed end-
+ // to-end (hashes, ints, dates stay that way on the wire) and skips
+ // JSON escaping overhead on every append. Shape mirrors the HTTP
+ // GET response plus two routing fields (type + session). A fresh
+ // CbObjectWriter per iteration is required because the ctor calls
+ // BeginObject() to set up the implicit outer object — Save() then
+ // finalizes that object, leaving the writer in a state that
+ // Reset() doesn't restore.
+ CbObjectWriter Response;
+ Response << "type"sv
+ << "log"sv;
+ Response << "session"sv << HexSessionId;
+ Response << "cursor"sv << Send.Delta.Cursor;
+ Response << "count"sv << Send.Delta.Count;
+ Response.BeginArray("entries"sv);
+ for (const SessionsService::LogEntry& Entry : Send.Delta.Entries)
+ {
+ Response.BeginObject();
+ WriteLogEntry(Response, Entry);
+ Response.EndObject();
+ }
+ Response.EndArray();
+
+ CbObject Obj = Response.Save();
+ Send.Connection->SendBinary(Obj.GetView());
+ }
+}
+
+void
HttpSessionsService::EnqueuePushTimer()
{
m_PushTimer.expires_after(std::chrono::seconds(2));
@@ -607,4 +945,82 @@ HttpSessionsService::EnqueuePushTimer()
});
}
+//////////////////////////////////////////////////////////////////////////
+//
+// Periodic cleanup of expired / excess sessions
+//
+
+void
+HttpSessionsService::RunCleanup()
+{
+ const TimeSpan MaxAge = TimeSpan(SessionsService::kDefaultMaxSessionAgeDays, 0, 0, 0);
+ const size_t MaxCount = SessionsService::kDefaultMaxSessionCount;
+ const uint64_t MaxBytes = SessionsService::kDefaultMaxStorageBytes;
+ const SessionsService::PruneResult Result = m_Sessions.PruneExpired(MaxAge, MaxCount, MaxBytes);
+ if (Result.ExpiredByAge + Result.ExpiredByCount + Result.ExpiredByStorage > 0)
+ {
+ ZEN_INFO("Sessions cleanup: pruned {} by age, {} by count, {} by storage (max {} days, max {} sessions, max {} MiB)",
+ Result.ExpiredByAge,
+ Result.ExpiredByCount,
+ Result.ExpiredByStorage,
+ SessionsService::kDefaultMaxSessionAgeDays,
+ MaxCount,
+ MaxBytes / (1024 * 1024));
+ }
+}
+
+void
+HttpSessionsService::EnqueueCleanupTimer()
+{
+ m_CleanupTimer.expires_after(std::chrono::hours(1));
+ m_CleanupTimer.async_wait([this](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+ RunCleanup();
+ EnqueueCleanupTimer();
+ });
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Periodic liveness check for tracked local client processes
+//
+
+void
+HttpSessionsService::RunLivenessCheck()
+{
+ const size_t EndedByDeadClient = m_Sessions.CheckProcessLiveness();
+ if (EndedByDeadClient > 0)
+ {
+ ZEN_INFO("Sessions liveness: ended {} session(s) whose client process had exited", EndedByDeadClient);
+ }
+ else
+ {
+ // Debug-level so this doesn't spam at info every 30s, but lets an
+ // operator who's specifically investigating why their crashed
+ // session didn't clean up see whether anything is being tracked.
+ ZEN_DEBUG("Sessions liveness: no dead client processes found");
+ }
+}
+
+void
+HttpSessionsService::EnqueueLivenessTimer()
+{
+ // 30s strikes a balance between crash-detection latency and
+ // per-session OpenProcess/GetExitCode overhead. Active sessions with
+ // no reported pid (remote clients) are skipped in the inner loop so
+ // the cost scales with local sessions only.
+ m_LivenessTimer.expires_after(std::chrono::seconds(30));
+ m_LivenessTimer.async_wait([this](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+ RunLivenessCheck();
+ EnqueueLivenessTimer();
+ });
+}
+
} // namespace zen