aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/compute/computeserver.cpp8
-rw-r--r--src/zenserver/config/config.cpp40
-rw-r--r--src/zenserver/config/config.h46
-rw-r--r--src/zenserver/diag/otlphttp.cpp4
-rw-r--r--src/zenserver/frontend/html/pages/platform_icons.js61
-rw-r--r--src/zenserver/frontend/html/pages/sessions.js1015
-rw-r--r--src/zenserver/frontend/html/theme.js108
-rw-r--r--src/zenserver/frontend/html/util/compactbinary.js10
-rw-r--r--src/zenserver/frontend/html/zen.css421
-rw-r--r--src/zenserver/hub/zenhubserver.cpp8
-rw-r--r--src/zenserver/proxy/zenproxyserver.cpp8
-rw-r--r--src/zenserver/sessions/httpsessions.cpp504
-rw-r--r--src/zenserver/sessions/httpsessions.h60
-rw-r--r--src/zenserver/sessions/inprocsessionlogsink.cpp37
-rw-r--r--src/zenserver/sessions/logtemplate.cpp390
-rw-r--r--src/zenserver/sessions/logtemplate.h42
-rw-r--r--src/zenserver/sessions/sessions.cpp1166
-rw-r--r--src/zenserver/sessions/sessions.h193
-rw-r--r--src/zenserver/stats/statsreporter.cpp90
-rw-r--r--src/zenserver/stats/statsreporter.h8
-rw-r--r--src/zenserver/storage/storageconfig.cpp4
-rw-r--r--src/zenserver/storage/storageconfig.h3
-rw-r--r--src/zenserver/storage/zenstorageserver.cpp17
-rw-r--r--src/zenserver/zenserver.cpp69
-rw-r--r--src/zenserver/zenserver.h2
25 files changed, 3795 insertions, 519 deletions
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp
index b110f7538..4c39e8870 100644
--- a/src/zenserver/compute/computeserver.cpp
+++ b/src/zenserver/compute/computeserver.cpp
@@ -982,10 +982,14 @@ ZenComputeServer::Run()
SetNewState(kRunning);
- OnReady();
-
+ // Register the self-session and replay the backlog into it BEFORE
+ // OnReady disables the backlog — otherwise the in-proc session sink
+ // attaches against a disabled backlog and shows nothing from the
+ // startup window.
StartSelfSession("zencompute");
+ OnReady();
+
PostAnnounce();
EnqueueAnnounceTimer();
InitializeOrchestratorWebSocket();
diff --git a/src/zenserver/config/config.cpp b/src/zenserver/config/config.cpp
index 2a89fc637..5f53374fc 100644
--- a/src/zenserver/config/config.cpp
+++ b/src/zenserver/config/config.cpp
@@ -16,6 +16,7 @@
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
#include <zencore/logging.h>
+#include <zencore/session.h>
#include <zencore/string.h>
#include <zenutil/config/commandlineoptions.h>
#include <zenutil/config/environmentoptions.h>
@@ -147,6 +148,10 @@ ZenServerConfiguratorBase::AddCommonConfigOptions(LuaConfig::Options& LuaOptions
LuaOptions.AddOption("server.clean"sv, ServerOptions.IsCleanStart, "clean"sv);
LuaOptions.AddOption("server.security.configpath"sv, ServerOptions.SecurityConfigPath, "security-config-path"sv);
+ ////// sessions
+
+ LuaOptions.AddOption("sessions.url"sv, ServerOptions.SessionsTargetUrl, "sessions-url"sv);
+
////// network
LuaOptions.AddOption("network.httpclientbackend"sv, ServerOptions.HttpClient.Backend, "httpclient"sv);
@@ -205,6 +210,7 @@ struct ZenServerCmdLineOptions
std::string SecurityConfigPath;
std::string UnixSocketPath;
std::string PortStr;
+ std::string ParentSessionId;
ZenLoggingCmdLineOptions LoggingOptions;
@@ -285,10 +291,22 @@ ZenServerCmdLineOptions::AddCliOptions(cxxopts::Options& options, ZenServerConfi
.add_option("lifetime", "", "owner-pid", "Specify owning process id", cxxopts::value<int>(ServerOptions.OwnerPid), "<identifier>");
options.add_option("lifetime",
"",
+ "parent-session",
+ "Specify parent session id used to associate this process with another session",
+ cxxopts::value<std::string>(ParentSessionId),
+ "<oid>");
+ options.add_option("lifetime",
+ "",
"child-id",
"Specify id which can be used to signal parent",
cxxopts::value<std::string>(ServerOptions.ChildId),
"<identifier>");
+ options.add_option("lifetime",
+ "",
+ "sessions-url",
+ "URL of remote zenserver to announce session to",
+ cxxopts::value<std::string>(ServerOptions.SessionsTargetUrl),
+ "<url>");
#if ZEN_PLATFORM_WINDOWS
options.add_option("lifetime",
@@ -519,6 +537,18 @@ ZenServerCmdLineOptions::ApplyOptions(cxxopts::Options& options, ZenServerConfig
ServerOptions.BasePort = Port;
}
+ if (!ParentSessionId.empty())
+ {
+ Oid ParsedParentSessionId;
+ if (!Oid::TryParse(ParentSessionId, ParsedParentSessionId) || ParsedParentSessionId == Oid::Zero)
+ {
+ throw OptionParseException(fmt::format("invalid parent session id '{}': expected a 24-character object id", ParentSessionId),
+ options.help());
+ }
+ ServerOptions.ParentSessionId = ParsedParentSessionId;
+ SetParentSessionId(ParsedParentSessionId);
+ }
+
LoggingOptions.ApplyOptions(ServerOptions.LoggingConfig);
#if ZEN_WITH_HTTPSYS
@@ -688,6 +718,16 @@ ZenServerConfiguratorBase::Configure(int argc, char* argv[])
{
m_ServerOptions.BasePort = ZenServerConfig::kDefaultBasePort;
}
+
+ // Resolve sessions target. If neither CLI nor config supplied a value,
+ // fall back to the ZEN_SESSIONS_URL environment variable. When a remote
+ // target is configured, session logs flow through SessionsServiceClient
+ // instead of the in-proc broadcast sink.
+ if (m_ServerOptions.SessionsTargetUrl.empty())
+ {
+ m_ServerOptions.SessionsTargetUrl = GetEnvVariable("ZEN_SESSIONS_URL").value_or("");
+ }
+ m_ServerOptions.UseInProcSessionLogging = m_ServerOptions.SessionsTargetUrl.empty();
}
catch (const OptionParseException& e)
{
diff --git a/src/zenserver/config/config.h b/src/zenserver/config/config.h
index d35a1a8c7..186b80b6b 100644
--- a/src/zenserver/config/config.h
+++ b/src/zenserver/config/config.h
@@ -4,6 +4,7 @@
#include <zencore/logbase.h>
#include <zencore/trace.h>
+#include <zencore/uid.h>
#include <zencore/zencore.h>
#include <zenhttp/httpserver.h>
#include <zenutil/config/loggingconfig.h>
@@ -50,27 +51,30 @@ struct ZenServerConfig
ZenSentryConfig SentryConfig;
ZenStatsConfig StatsConfig;
ZenLoggingConfig LoggingConfig;
- static constexpr int kDefaultBasePort = 8558;
- int BasePort = 0; // Service listen port; 0 = auto (resolved per mode)
- int OwnerPid = 0; // Parent process id (zero for standalone)
- bool IsDebug = false;
- bool IsCleanStart = false; // Indicates whether all state should be wiped on startup or not
- bool IsPowerCycle = false; // When true, the process shuts down immediately after initialization
- bool IsTest = false;
- bool Detach = true; // Whether zenserver should detach from existing process group (Mac/Linux)
- int CoreLimit = 0; // If set, hardware concurrency queries are capped at this number
- int LieCpu = 0;
- bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements
- bool AllowPortProbing = true; // Automatically false if IsDedicated is true
- bool ShouldCrash = false; // Option for testing crash handling
- bool IsFirstRun = false;
- std::filesystem::path ConfigFile; // Path to Lua config file
- std::filesystem::path SystemRootDir; // System root directory (used for machine level config)
- std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
- std::filesystem::path DataDir; // Root directory for state (used for testing)
- std::filesystem::path BaseSnapshotDir; // Path to server state snapshot (will be copied into data dir on start)
- std::string ChildId; // Id assigned by parent process (used for lifetime management)
- std::filesystem::path SecurityConfigPath; // Path to a Json security configuration file
+ static constexpr int kDefaultBasePort = 8558;
+ int BasePort = 0; // Service listen port; 0 = auto (resolved per mode)
+ int OwnerPid = 0; // Parent process id (zero for standalone)
+ Oid ParentSessionId = Oid::Zero;
+ bool IsDebug = false;
+ bool IsCleanStart = false; // Indicates whether all state should be wiped on startup or not
+ bool IsPowerCycle = false; // When true, the process shuts down immediately after initialization
+ bool IsTest = false;
+ bool Detach = true; // Whether zenserver should detach from existing process group (Mac/Linux)
+ int CoreLimit = 0; // If set, hardware concurrency queries are capped at this number
+ int LieCpu = 0;
+ bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements
+ bool AllowPortProbing = true; // Automatically false if IsDedicated is true
+ bool ShouldCrash = false; // Option for testing crash handling
+ bool IsFirstRun = false;
+ bool UseInProcSessionLogging = true; // When false, session logs are expected to be forwarded externally.
+ std::string SessionsTargetUrl; // URL of remote zenserver to announce session to (overrides ZEN_SESSIONS_URL).
+ std::filesystem::path ConfigFile; // Path to Lua config file
+ std::filesystem::path SystemRootDir; // System root directory (used for machine level config)
+ std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
+ std::filesystem::path DataDir; // Root directory for state (used for testing)
+ std::filesystem::path BaseSnapshotDir; // Path to server state snapshot (will be copied into data dir on start)
+ std::string ChildId; // Id assigned by parent process (used for lifetime management)
+ std::filesystem::path SecurityConfigPath; // Path to a Json security configuration file
#if ZEN_WITH_TRACE
bool HasTraceCommandlineOptions = false;
diff --git a/src/zenserver/diag/otlphttp.cpp b/src/zenserver/diag/otlphttp.cpp
index d6e24cbe3..f7306a9fe 100644
--- a/src/zenserver/diag/otlphttp.cpp
+++ b/src/zenserver/diag/otlphttp.cpp
@@ -31,6 +31,10 @@ OtelHttpProtobufSink::OtelHttpProtobufSink(const std::string_view& Uri) : m_Otel
m_Encoder.AddResourceAttribute("service.version", ZEN_CFG_VERSION);
m_Encoder.AddResourceAttribute("host.name", GetMachineName());
m_Encoder.AddResourceAttribute("session.id", GetSessionIdString());
+ if (std::string_view ParentSessionId = GetParentSessionIdString(); !ParentSessionId.empty())
+ {
+ m_Encoder.AddResourceAttribute("parent_session.id", ParentSessionId);
+ }
m_Encoder.AddResourceAttribute("process.id", zen::GetCurrentProcessId());
m_TraceRecorder = new TraceRecorder(this);
diff --git a/src/zenserver/frontend/html/pages/platform_icons.js b/src/zenserver/frontend/html/pages/platform_icons.js
new file mode 100644
index 000000000..65a04c840
--- /dev/null
+++ b/src/zenserver/frontend/html/pages/platform_icons.js
@@ -0,0 +1,61 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+// SimpleIcons-style platform glyphs (viewBox 0 0 24 24, fill: currentColor)
+// used by the sessions table to render a recognizable icon instead of a raw
+// platform label. Lives in its own module because the data table is bulky and
+// other pages may want to reuse the resolver.
+
+"use strict";
+
+// Path data is intentionally one-line per entry to keep the icon table dense.
+// Unknown platforms fall through to a text cell at the call site.
+export const PLATFORM_ICONS = {
+ windows: { label: "Windows", path: "M0 3.449L9.75 2.1v9.451H0m10.949-9.602L24 0v11.4H10.949M0 12.6h9.75v9.451L0 20.699M10.949 12.6H24V24l-13.051-1.351" },
+ macos: { label: "macOS", path: "M17.05 20.28c-.98.95-2.05.8-3.08.35-1.09-.46-2.09-.48-3.24 0-1.44.62-2.2.44-3.06-.35C2.79 15.25 3.51 7.59 9.05 7.31c1.35.07 2.29.74 3.08.8 1.18-.24 2.31-.93 3.57-.84 1.51.12 2.65.72 3.4 1.8-3.12 1.87-2.38 5.98.48 7.13-.57 1.5-1.31 2.99-2.54 4.09l.01-.01zM12.03 7.25c-.15-2.23 1.66-4.07 3.74-4.25.29 2.58-2.34 4.5-3.74 4.25z" },
+ ios: { label: "iOS", path: "M17.05 20.28c-.98.95-2.05.8-3.08.35-1.09-.46-2.09-.48-3.24 0-1.44.62-2.2.44-3.06-.35C2.79 15.25 3.51 7.59 9.05 7.31c1.35.07 2.29.74 3.08.8 1.18-.24 2.31-.93 3.57-.84 1.51.12 2.65.72 3.4 1.8-3.12 1.87-2.38 5.98.48 7.13-.57 1.5-1.31 2.99-2.54 4.09l.01-.01zM12.03 7.25c-.15-2.23 1.66-4.07 3.74-4.25.29 2.58-2.34 4.5-3.74 4.25z" },
+ linux: { label: "Linux", path: "M12.504 0c-.155 0-.315.008-.48.021-4.226.333-3.105 4.807-3.17 6.298-.076 1.092-.3 1.953-1.05 3.02-.885 1.051-2.127 2.75-2.716 4.521-.278.832-.41 1.684-.287 2.489a.424.424 0 00-.11.135c-.26.268-.45.6-.663.839-.2.271-.53.4-.953.58-.42.17-.94.33-1.31.84-.5.84-.33 1.83.03 2.72.39.89.95 1.64.89 2.13-.06.72.21 1.24.59 1.48.38.24.83.24 1.17.24.34 0 .57-.01.67-.06.19-.1.38-.25.55-.26.22-.02.43.11.72.26.67.35 1.3.43 1.88.3.57-.13 1.08-.44 1.52-.78.9-.74 1.7-1.5 2.6-1.63.24-.03.5-.05.78-.05.13 0 .25 0 .39.02.11.01.26.1.5.26.25.17.6.4 1 .59.4.17.87.32 1.4.3.59-.02 1.18-.22 1.73-.63 1.57-1.19 4.21-.97 5.13-2.08.11-.14.18-.3.2-.48.02-.18 0-.36-.07-.54-.06-.19-.17-.38-.32-.57-.15-.19-.35-.38-.56-.55-.38-.31-.58-.67-.57-1.11.01-.28.14-.58.28-.85.11-.28.24-.54.24-.81-.02-.28-.13-.55-.36-.77-.23-.23-.55-.42-1.01-.51a.424.424 0 00-.14-.02c-.3-.04-.57-.04-.87-.04-.13-.31-.28-.62-.43-.91-.9-1.72-2.23-3.13-3.27-4.31-.92-1.02-1.55-2-1.62-3.07-.12-.9-.07-1.94-.1-2.93-.02-.92-.15-1.81-.67-2.5-.49-.7-1.32-1.22-2.73-1.22z" },
+ wine: { label: "Wine", path: "M8 2h8l-1 7a4 4 0 01-3 4v5h3v2H9v-2h3v-5a4 4 0 01-3-4L8 2z" },
+ android: { label: "Android", path: "M17.523 15.3414c-.5511 0-.9993-.4486-.9993-.9997s.4482-.9993.9993-.9993c.5511 0 .9993.4482.9993.9993.0001.5511-.4482.9997-.9993.9997m-11.046 0c-.5511 0-.9993-.4486-.9993-.9997s.4482-.9993.9993-.9993c.5511 0 .9993.4482.9993.9993 0 .5511-.4482.9997-.9993.9997m11.4045-6.02l1.9973-3.4592a.416.416 0 00-.1521-.5676.416.416 0 00-.5677.1521l-2.0223 3.503C15.5902 8.2439 13.8533 7.8508 12 7.8508s-3.5902.3931-5.1367 1.0989L4.841 5.4467a.4161.4161 0 00-.5677-.1521.4157.4157 0 00-.1521.5676l1.9973 3.4592C2.6889 11.1867.3432 14.6589 0 18.761h24c-.3432-4.1021-2.6889-7.5743-6.1185-9.4396" },
+ playstation: { label: "PlayStation", path: "M8.985 2.596v17.548l3.915 1.261V6.688c0-.69.304-1.151.794-.991.636.181.76.814.76 1.505v5.875c2.441 1.193 4.362 0 4.362-3.118 0-3.198-1.13-4.63-4.442-5.76-1.313-.444-3.697-1.203-5.389-1.603zM0 17.81c.069.24.213.489.487.728C4.024 21.22 9.45 22.395 15.03 22.395c.58 0 1.142-.034 1.725-.08-5.423-1.39-9.33-3.77-15.203-4.87a5.78 5.78 0 01-1.55-.364v.728zm18.7-8.97c.057-.035.114-.057.194-.08.695-.148 1.15.217 1.15.908 0 .706-.478 1.283-1.162 1.486-.08.023-.137.034-.194.057v4.74c.079-.023.148-.034.228-.057 3.426-1.193 4.374-2.796 4.374-5.502 0-2.637-1.37-4.063-3.357-4.748-.387-.137-.764-.228-1.162-.32l-.068-.023v3.54l-.003-.001z" },
+ xbox: { label: "Xbox", path: "M4.102 21.033C6.211 22.881 8.977 24 12 24c3.026 0 5.789-1.119 7.902-2.965 1.16-1.016-4.553-6.929-7.902-9.518-3.349 2.589-9.063 8.499-7.898 9.516zm11.08-18.52c2.699-1.159 5.062-1.169 6.52-.546l.025.033c-1.377-2.152-3.624-4.001-7.004-3.978-2.049 0-4.062.826-5.725 2.208 1.964.468 4.114 1.404 6.184 2.283zM2.27 1.976l.025-.033c1.458-.623 3.82-.613 6.519.546 2.07-.879 4.22-1.815 6.184-2.283C13.335.824 11.32-.002 9.272 0 5.891-.02 3.646 1.828 2.27 1.976zM1.62 19.46l-.012.003C.597 17.8 0 15.838 0 13.749c0-1.749.425-3.399 1.157-4.85.9-1.784 4.126-5.59 5.73-7.37.118-.131-4.425 1.976-5.267 9.931-.025.221-.025.442-.025.663 0 2.586.741 5 2.025 7.017zm20.763 0l-.011-.003c1.283-2.017 2.025-4.431 2.025-7.017 0-.22 0-.442-.025-.663-.842-7.955-5.386-10.062-5.267-9.93 1.604 1.779 4.83 5.585 5.73 7.37.732 1.45 1.157 3.1 1.157 4.849 0 2.09-.596 4.051-1.609 5.714l.013.003-.014-.324z" },
+ nintendo: { label: "Nintendo", path: "M14.176 24h3.674c3.376 0 6.15-2.774 6.15-6.15V6.15C24 2.775 21.226 0 17.85 0H14.16c-.205 0-.38.174-.38.38v23.24c0 .206.19.38.396.38zM8.252 24c.212 0 .39-.174.39-.384V.374c0-.21-.178-.374-.39-.374h-2.1A6.167 6.167 0 0 0 0 6.15v11.7C0 21.224 2.775 24 6.152 24h2.1zm-4.59-15.763a2.578 2.578 0 0 1 2.58-2.58 2.577 2.577 0 0 1 2.578 2.58 2.579 2.579 0 1 1-5.157 0zm12.556 11.928a3.063 3.063 0 0 1 3.067-3.065 3.063 3.063 0 0 1 3.065 3.065 3.067 3.067 0 0 1-3.065 3.07 3.065 3.065 0 0 1-3.067-3.07z" },
+};
+
+// Resolve a platform string (as reported by the client) to an icon entry.
+// Intentionally liberal so UE-style variants like "Win64", "PS5", "XSX",
+// "NintendoSwitch" all land on the right icon.
+export function resolve_platform_icon(platform)
+{
+ const p = platform.toLowerCase();
+ if (p.includes("windows") || p === "win32" || p === "win64" || p === "win") return PLATFORM_ICONS.windows;
+ if (p === "wine") return PLATFORM_ICONS.wine;
+ if (p.includes("android")) return PLATFORM_ICONS.android;
+ if (p === "ios" || p === "iphone" || p === "ipad" || p === "ipados" || p === "tvos") return PLATFORM_ICONS.ios;
+ if (p === "mac" || p === "macos" || p === "osx" || p === "darwin") return PLATFORM_ICONS.macos;
+ if (p === "linux") return PLATFORM_ICONS.linux;
+ if (p.includes("playstation") || /^ps\d/.test(p) || p === "psvita") return PLATFORM_ICONS.playstation;
+ if (p.includes("xbox") || p === "xsx" || p === "xss") return PLATFORM_ICONS.xbox;
+ if (p.includes("nintendo") || p === "switch") return PLATFORM_ICONS.nintendo;
+ return null;
+}
+
+// Build a <span> element representing the platform — either an inline SVG
+// glyph (when the platform resolves) or a plain text fallback.
+export function make_platform_cell(platform)
+{
+ const el = document.createElement("span");
+ if (!platform) { return el; }
+ const icon = resolve_platform_icon(platform);
+ if (!icon)
+ {
+ // Unknown platform — fall back to the raw label.
+ el.textContent = platform;
+ return el;
+ }
+ el.className = "platform-icon";
+ el.title = icon.label;
+ el.setAttribute("aria-label", icon.label);
+ // Paths are hard-coded (no user-controlled input), so innerHTML is safe.
+ el.innerHTML = `<svg viewBox="0 0 24 24" role="img" focusable="false"><path d="${icon.path}"/></svg>`;
+ return el;
+}
diff --git a/src/zenserver/frontend/html/pages/sessions.js b/src/zenserver/frontend/html/pages/sessions.js
index c74ede14e..70b850698 100644
--- a/src/zenserver/frontend/html/pages/sessions.js
+++ b/src/zenserver/frontend/html/pages/sessions.js
@@ -4,7 +4,69 @@
import { ZenPage } from "./page.js"
import { Fetcher } from "../util/fetcher.js"
+import { CbObject } from "../util/compactbinary.js"
import { Table, PropTable } from "../util/widgets.js"
+import { make_platform_cell } from "./platform_icons.js"
+
+// Run @p fn and swallow any thrown error, but log it at debug level under
+// @p label so the failure isn't completely invisible. Use this in places
+// where the failure mode is genuinely "drop the frame and move on" — JSON /
+// CB parse failures, optional WebSocket setup, transient send errors. The
+// debug-level log keeps normal consoles clean; surface them by enabling
+// "Verbose" in DevTools.
+function quietly(label, fn)
+{
+ try { return fn(); }
+ catch (e)
+ {
+ console.debug(`[sessions] ${label}:`, e);
+ return undefined;
+ }
+}
+
+// Dev tools read better with 24h + zero-padded fields; don't defer to the
+// browser's default locale which is 12h AM/PM for en-US.
+const TIME_OPTS = { hour12: false, hour: "2-digit", minute: "2-digit", second: "2-digit" };
+const DATE_OPTS = { year: "numeric", month: "2-digit", day: "2-digit", ...TIME_OPTS };
+
+// UE log levels in ascending order of severity. Each incoming entry's
+// level string (lowercased) maps to a numeric rank we can compare against
+// the user-selected threshold when filtering. Both the short ("warn") and
+// long ("warning") spellings are covered because zencore emits the long
+// form but older clients / tests may use the short one.
+const LEVEL_RANK = {
+ trace: 0,
+ debug: 1,
+ info: 2,
+ warning: 3, warn: 3,
+ error: 4, err: 4,
+ critical: 5,
+};
+
+// Cap on the in-DOM log line count. The server's deque is bounded
+// (MaxLogEntries on the C++ side) but the WS push delivers every new
+// entry forever during a long Cook, so the browser DOM would grow
+// without bound. At 5000 lines the panel still feels live for tail-
+// following while keeping the page responsive. Older entries fall off
+// the far end on every append.
+const MAX_LOG_LINES_IN_DOM = 5000;
+
+// Level-filter dropdown options. `rank` is the minimum rank that survives
+// — entries with a lower rank are hidden. -1 disables level filtering.
+const LEVEL_FILTER_OPTIONS = [
+ { value: "all", label: "All levels", rank: -1 },
+ { value: "debug", label: "Debug+", rank: 1 },
+ { value: "info", label: "Info+", rank: 2 },
+ { value: "warn", label: "Warn+", rank: 3 },
+ { value: "error", label: "Error+", rank: 4 },
+];
+
+// Double-chevron icons for the expand / collapse panel toggle. Up when
+// collapsed (click to grow the log panel upward into the table's space);
+// down when expanded (click to shrink back down). currentColor so the
+// surrounding button styles control tinting.
+const ICON_CHEVRON_UP = '<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true"><polyline points="6 17 12 11 18 17"/><polyline points="6 11 12 5 18 11"/></svg>';
+const ICON_CHEVRON_DOWN = '<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true"><polyline points="6 7 12 13 18 7"/><polyline points="6 13 12 19 18 13"/></svg>';
function fmt_date(iso)
{
@@ -13,15 +75,15 @@ function fmt_date(iso)
const now = new Date();
if (d.getFullYear() === now.getFullYear() && d.getMonth() === now.getMonth() && d.getDate() === now.getDate())
{
- return d.toLocaleTimeString();
+ return d.toLocaleTimeString([], TIME_OPTS);
}
- return d.toLocaleString();
+ return d.toLocaleString([], DATE_OPTS);
}
function fmt_time(iso)
{
if (!iso) { return ""; }
- return new Date(iso).toLocaleTimeString();
+ return new Date(iso).toLocaleTimeString([], TIME_OPTS);
}
////////////////////////////////////////////////////////////////////////////////
@@ -33,32 +95,43 @@ export class Page extends ZenPage
{
this.set_title("sessions");
- this._status = this.get_param("status", "active");
+ this._status = this.get_param("status", "all");
const section = this.add_section("Sessions");
- section._parent.inner().classList.add("sessions-section");
-
- this._init_status_tabs(section, this._status);
+ const section_dom = section._parent.inner();
+ section_dom.classList.add("sessions-section");
+ // The "Sessions" nav item in the banner already identifies the page;
+ // drop the auto-generated section heading so it doesn't duplicate.
+ const heading = section_dom.querySelector(":scope > h1, :scope > h2");
+ if (heading) { heading.remove(); }
const query = (this._status === "ended" || this._status === "all") ? "?status=" + this._status : "";
const data = await new Fetcher().resource("/sessions/" + query).json();
const sessions = data.sessions || [];
this._self_id = data.self_id || null;
- // Layout: table on the left, detail panel on the right
- this._container = section.tag().classify("sessions-layout");
- this._table_host = this._container.tag().classify("sessions-table");
- this._detail_panel = this._container.tag().classify("sessions-detail");
- this._detail_panel.tag().classify("sessions-detail-placeholder").text("Select a session to view details.");
+ // Flat vertical layout: header row, then table, then the bottom panel
+ // (tabs for log and metadata). All session-level info that used to
+ // live in a side panel is now shown inline in the table columns.
+ this._init_status_tabs(section, this._status);
+ this._table_host = section.tag().classify("sessions-table");
this._selected_id = this._self_id;
this._selected_row = null;
- this._page_size = 25;
+ this._page_size = 10;
this._page = 0;
-
- // Log panel below the table/detail layout
- this._log_panel = section.tag().classify("sessions-log-panel");
- this._log_panel.inner().style.display = "none";
- this._log_poll_timer = null;
+ this._text_filter = "";
+ this._sort_key = "created_at";
+ this._sort_asc = false;
+
+ this._panel = section.tag().classify("sessions-log-panel");
+ this._panel.inner().style.display = "none";
+ this._active_tab = "log";
+ this._log_expanded = false;
+ // Persist the level threshold across session selections so users
+ // don't have to re-pick after clicking a different session.
+ this._log_min_level = -1;
+ this._log_min_level_name = "all";
+ this._collapsed_session_groups = new Set();
this._render_sessions(sessions);
this._connect_ws();
@@ -68,8 +141,16 @@ export class Page extends ZenPage
{
const status = this._status;
- // Clear existing table content
+ // Clear existing table content (and any prior pager contents; the
+ // pager lives in the header row but its state depends on what we're
+ // about to render).
this._table_host.inner().replaceChildren();
+ if (this._pager_host)
+ {
+ this._pager_host.replaceChildren();
+ }
+
+ this._last_sessions = sessions;
if (sessions.length === 0)
{
@@ -79,74 +160,247 @@ export class Page extends ZenPage
return;
}
- let columns;
+ // Apply the text filter (case-insensitive substring match across the
+ // session fields a user is likely to scan for: id, appname, mode).
+ const filter = this._text_filter;
+ let filtered = filter
+ ? sessions.filter(s => {
+ const haystack = [s.id, s.appname, s.mode].filter(v => v).join(" ").toLowerCase();
+ return haystack.includes(filter);
+ })
+ : sessions;
+
+ if (filtered.length === 0)
+ {
+ this._table_host.tag().classify("empty-state").text("No sessions match the filter.");
+ this._selected_row = null;
+ return;
+ }
+
+ // When the log panel is expanded, collapse the row set to just the
+ // selected session so the log gets the maximum vertical real estate.
+ // The expand toggle lives in the panel header — see _show_session_panel.
+ if (this._log_expanded && this._selected_id)
+ {
+ const selected = sessions.find(s => s.id === this._selected_id);
+ if (selected) { filtered = [selected]; }
+ }
+
+ // Column specs carry both the header label and how to extract the
+ // real sort value so date columns compare chronologically rather
+ // than by locale-formatted text.
+ const str_val = (field) => (s) => (s[field] || "").toLowerCase();
+ const date_val = (field) => (s) => s[field] ? new Date(s[field]).getTime() : 0;
+
+ const common = [
+ { name: "appname", key: "appname", kind: "str", get: str_val("appname") },
+ { name: "mode", key: "mode", kind: "str", get: str_val("mode") },
+ { name: "platform", key: "platform", kind: "str", get: str_val("platform") },
+ { name: "id", key: "id", kind: "str", get: str_val("id") },
+ { name: "created", key: "created_at", kind: "date", get: date_val("created_at") },
+ ];
+ let last_col;
if (status === "all")
{
- columns = ["id", "appname", "mode", "created", "last activity"];
+ last_col = { name: "last activity", key: "last_activity", kind: "date",
+ get: s => new Date(s.ended_at || s.updated_at || 0).getTime() };
}
else if (status === "ended")
{
- columns = ["id", "appname", "mode", "created", "ended"];
+ last_col = { name: "ended", key: "ended_at", kind: "date", get: date_val("ended_at") };
}
else
{
- columns = ["id", "appname", "mode", "created", "updated"];
+ last_col = { name: "updated", key: "updated_at", kind: "date", get: date_val("updated_at") };
}
- this._last_sessions = sessions;
- const total = sessions.length;
+ const col_specs = [...common, last_col];
+
+ // Pick the active sort column (fall back to created_at if the current
+ // sort key isn't in this tab's column set — e.g. switching from "all"
+ // back to "ended" after sorting by last_activity).
+ const sort_col = col_specs.find(c => c.key === this._sort_key) || col_specs.find(c => c.key === "created_at");
+ const dir = this._sort_asc ? 1 : -1;
+ const compare_sessions = (a, b) => {
+ const av = sort_col.get(a), bv = sort_col.get(b);
+ if (av < bv) return -1 * dir;
+ if (av > bv) return 1 * dir;
+ return 0;
+ };
+ const grouped = this._build_session_groups(filtered);
+ grouped.parents.sort(compare_sessions);
+ for (const parent of grouped.parents)
+ {
+ grouped.children.get(parent.id)?.sort(compare_sessions);
+ }
+
+ const total = grouped.parents.length;
const page_count = this._page_size > 0 ? Math.ceil(total / this._page_size) : 1;
if (this._page >= page_count) { this._page = Math.max(0, page_count - 1); }
const start = this._page_size > 0 ? this._page * this._page_size : 0;
- const visible = this._page_size > 0 ? sessions.slice(start, start + this._page_size) : sessions;
+ const visible = this._page_size > 0 ? grouped.parents.slice(start, start + this._page_size) : grouped.parents;
- const table = new Table(this._table_host, columns, Table.Flag_FitLeft);
+ const column_names = col_specs.map(c => c.name);
+ const table = new Table(this._table_host, column_names, Table.Flag_FitLeft, -1);
+
+ // Attach header click handlers + active-column indicator.
+ const zen_table = this._table_host.inner().querySelector(".zen_table");
+ const header_elem = zen_table ? zen_table.firstElementChild : null;
+ if (header_elem)
+ {
+ const header_cells = header_elem.children;
+ for (let i = 0; i < col_specs.length; i++)
+ {
+ const col = col_specs[i];
+ const cell = header_cells[i];
+ if (!cell) { continue; }
+ cell.style.cursor = "pointer";
+ cell.style.userSelect = "none";
+ if (col.key === sort_col.key)
+ {
+ cell.textContent = col.name + (this._sort_asc ? " \u25B2" : " \u25BC");
+ cell.classList.add("sessions-sort-active");
+ }
+ cell.addEventListener("click", () => {
+ if (this._sort_key === col.key)
+ {
+ this._sort_asc = !this._sort_asc;
+ }
+ else
+ {
+ this._sort_key = col.key;
+ // New column defaults: dates start descending (newest
+ // first — the natural reading for timestamps); string
+ // columns start ascending (A→Z).
+ this._sort_asc = col.kind !== "date";
+ }
+ this._page = 0;
+ this._render_sessions(this._last_sessions);
+ });
+ }
+ }
let new_selected_row = null;
let new_selected_session = null;
- for (const session of visible)
- {
+ const render_session_row = (session, is_child = false, child_count = 0) => {
const created = fmt_date(session.created_at);
const updated = fmt_date(session.updated_at);
const ended = fmt_date(session.ended_at);
const mode = session.mode || "-";
+ const appname = session.appname || "-";
+ const platform = session.platform || "";
+ const full_id = session.id || "";
+ // Elide the middle of the 24-char OID so the column stays narrow;
+ // the full id is still available as a tooltip on the cell below.
+ const id_display = full_id.length > 12
+ ? full_id.slice(0, 8) + "\u2026" + full_id.slice(-4)
+ : (full_id || "-");
+
let row_values;
if (status === "all")
{
const last_activity = session.ended_at ? ended : updated;
- row_values = [session.id || "-", session.appname || "-", mode, created, last_activity];
+ row_values = [appname, mode, platform, id_display, created, last_activity];
}
else if (status === "ended")
{
- row_values = [session.id || "-", session.appname || "-", mode, created, ended];
+ row_values = [appname, mode, platform, id_display, created, ended];
}
else
{
- row_values = [session.id || "-", session.appname || "-", mode, created, updated];
+ row_values = [appname, mode, platform, id_display, created, updated];
}
const row = table.add_row(...row_values);
+ // Swap the platform cell's text for a recognizable icon. Sort
+ // already runs on session.platform so the cell content doesn't
+ // affect ordering.
+ const platform_cell = row.get_cell(2).inner();
+ platform_cell.replaceChildren(make_platform_cell(platform));
+
+ if (full_id)
+ {
+ row.get_cell(3).inner().title = full_id;
+ }
+
+ // Indicator layout in the appname cell: [group toggle] [child elbow]
+ // [dot] appname [this] [log]. The pills sit after the name so their
+ // widths don't push names around and misalign the column across rows.
+ const appname_cell = row.get_cell(0);
+ if (child_count > 0)
+ {
+ const collapsed = this._collapsed_session_groups.has(session.id);
+ const toggle = document.createElement("button");
+ toggle.type = "button";
+ toggle.className = "sessions-group-toggle";
+ toggle.textContent = collapsed ? "\u25B8" : "\u25BE";
+ toggle.title = collapsed ? "Expand child sessions" : "Collapse child sessions";
+ toggle.addEventListener("click", (ev) => {
+ ev.stopPropagation();
+ if (collapsed)
+ {
+ this._collapsed_session_groups.delete(session.id);
+ }
+ else
+ {
+ this._collapsed_session_groups.add(session.id);
+ }
+ this._render_sessions(this._last_sessions);
+ });
+ appname_cell.inner().prepend(toggle);
+ }
+ else if (is_child)
+ {
+ const spacer = document.createElement("span");
+ spacer.className = "sessions-group-child-spacer";
+ spacer.textContent = "\u2514";
+ appname_cell.inner().prepend(spacer);
+ }
+ else
+ {
+ const spacer = document.createElement("span");
+ spacer.className = "sessions-group-toggle-spacer";
+ appname_cell.inner().prepend(spacer);
+ }
+
if (status === "all" && !session.ended_at)
{
- const id_cell = row.get_cell(0);
const dot = document.createElement("span");
dot.className = "health-dot health-green";
dot.style.marginRight = "6px";
dot.style.width = "8px";
dot.style.height = "8px";
dot.title = "active";
- id_cell.inner().prepend(dot);
+ appname_cell.inner().insertBefore(dot, appname_cell.inner().firstChild.nextSibling);
}
if (this._self_id && session.id === this._self_id)
{
const pill = document.createElement("span");
- pill.className = "sessions-self-pill";
+ pill.className = "sessions-pill sessions-self-pill";
pill.textContent = "this";
- row.get_cell(1).inner().prepend(pill);
+ appname_cell.inner().appendChild(pill);
+ }
+
+ if (session.log_count)
+ {
+ const log_pill = document.createElement("span");
+ log_pill.className = "sessions-pill sessions-log-indicator-pill";
+ log_pill.textContent = "log";
+ log_pill.title = session.log_count + " log entr" + (session.log_count === 1 ? "y" : "ies");
+ appname_cell.inner().appendChild(log_pill);
+ }
+
+ const row_elem = row.inner();
+ if (is_child)
+ {
+ for (const cell of row_elem.children)
+ {
+ cell.classList.add("sessions-child-row");
+ }
}
// Restore selection
@@ -157,12 +411,28 @@ export class Page extends ZenPage
}
// Table rows use display:contents so we attach click to each cell
- const row_elem = row.inner();
for (const cell of row_elem.children)
{
cell.style.cursor = "pointer";
cell.addEventListener("click", () => this._select_session(row, session));
}
+ };
+
+ const render_session_tree = (session, is_child = false) => {
+ const children = grouped.children.get(session.id) || [];
+ render_session_row(session, is_child, children.length);
+ if (!this._collapsed_session_groups.has(session.id))
+ {
+ for (const child of children)
+ {
+ render_session_tree(child, true);
+ }
+ }
+ };
+
+ for (const session of visible)
+ {
+ render_session_tree(session);
}
this._selected_row = null;
@@ -171,40 +441,106 @@ export class Page extends ZenPage
this._select_session(new_selected_row, new_selected_session);
}
- if (this._page_size > 0 && total > this._page_size)
+ this._render_pager(total, page_count);
+ }
+
+ _build_session_groups(sessions)
+ {
+ const by_id = new Map();
+ for (const session of sessions)
{
- const footer = document.createElement("div");
- footer.className = "sessions-pager";
+ if (session.id)
+ {
+ by_id.set(session.id, session);
+ }
+ }
- const make_btn = (label, enabled, on_click) => {
- const btn = document.createElement("button");
- btn.className = "history-tab";
- btn.textContent = label;
- btn.disabled = !enabled;
- if (enabled)
+ const parents = [];
+ const children = new Map();
+ for (const session of sessions)
+ {
+ const parent_id = session.parent_session_id;
+ if (parent_id && by_id.has(parent_id) && parent_id !== session.id)
+ {
+ let group = children.get(parent_id);
+ if (!group)
{
- btn.addEventListener("click", on_click);
+ group = [];
+ children.set(parent_id, group);
}
- return btn;
- };
+ group.push(session);
+ }
+ else
+ {
+ parents.push(session);
+ }
+ }
- footer.appendChild(make_btn("\u25C0", this._page > 0, () => {
- this._page--;
- this._render_sessions(sessions);
- }));
+ // Keep the currently selected child visible when live updates rebuild the
+ // table by automatically expanding the group that contains it.
+ if (this._selected_id)
+ {
+ for (const [parent_id, group] of children)
+ {
+ if (group.some(s => s.id === this._selected_id))
+ {
+ this._collapsed_session_groups.delete(parent_id);
+ break;
+ }
+ }
+ }
- const label = document.createElement("span");
- label.className = "sessions-pager-label";
- label.textContent = `${this._page + 1} / ${page_count}`;
- footer.appendChild(label);
+ return { parents, children };
+ }
- footer.appendChild(make_btn("\u25B6", this._page < page_count - 1, () => {
- this._page++;
- this._render_sessions(sessions);
- }));
+ // Shared button.history-tab builder used for both the pager arrows and
+ // the status-mode tab strip. opts: { active?, disabled?, on_click? }.
+ _make_history_tab(label, opts)
+ {
+ const btn = document.createElement("button");
+ btn.className = "history-tab";
+ btn.textContent = label;
+ if (opts.active) { btn.classList.add("active"); }
+ if (opts.disabled) { btn.disabled = true; }
+ if (opts.on_click && !opts.disabled)
+ {
+ btn.addEventListener("click", opts.on_click);
+ }
+ return btn;
+ }
- this._table_host.inner().appendChild(footer);
+ _render_pager(total, page_count)
+ {
+ if (!this._pager_host)
+ {
+ return;
+ }
+ this._pager_host.replaceChildren();
+ if (!(this._page_size > 0 && total > this._page_size))
+ {
+ return;
}
+
+ this._pager_host.appendChild(this._make_history_tab("\u25C0", {
+ disabled: this._page === 0,
+ on_click: () => {
+ this._page--;
+ this._render_sessions(this._last_sessions);
+ },
+ }));
+
+ const label = document.createElement("span");
+ label.className = "sessions-pager-label";
+ label.textContent = `${this._page + 1} / ${page_count}`;
+ this._pager_host.appendChild(label);
+
+ this._pager_host.appendChild(this._make_history_tab("\u25B6", {
+ disabled: this._page >= page_count - 1,
+ on_click: () => {
+ this._page++;
+ this._render_sessions(this._last_sessions);
+ },
+ }));
}
_filter_sessions(all_sessions)
@@ -222,11 +558,15 @@ export class Page extends ZenPage
_connect_ws()
{
- try
- {
+ quietly("ws connect", () => {
const proto = location.protocol === "https:" ? "wss:" : "ws:";
const ws = new WebSocket(`${proto}//${location.host}/sessions/ws`);
- try { this._ws_paused = localStorage.getItem("zen-ws-paused") === "true"; } catch (e) { this._ws_paused = false; }
+ // Log-push frames arrive as compact-binary over binary frames;
+ // asking for ArrayBuffer (default in modern browsers, explicit
+ // here) lets us feed the bytes straight into our CB parser.
+ ws.binaryType = "arraybuffer";
+ this._ws_paused = quietly("ws-paused storage read", () =>
+ localStorage.getItem("zen-ws-paused") === "true") === true;
document.addEventListener("zen-ws-toggle", (e) => {
this._ws_paused = e.detail.paused;
});
@@ -236,52 +576,189 @@ export class Page extends ZenPage
{
return;
}
- try
+ // Two transports share this socket:
+ // - text/JSON: the session-list snapshots broadcast on a
+ // timer (untyped for backward compat).
+ // - binary/CB: event-driven log deltas, stamped with
+ // type="log" so future frame types can be added.
+ if (typeof ev.data === "string")
{
- const data = JSON.parse(ev.data);
- if (data.self_id) { this._self_id = data.self_id; }
- const all_sessions = data.sessions || [];
- const filtered = this._filter_sessions(all_sessions);
- this._render_sessions(filtered);
+ quietly("ws json frame", () => {
+ const data = JSON.parse(ev.data);
+ if (data.self_id) { this._self_id = data.self_id; }
+ const all_sessions = data.sessions || [];
+ const filtered = this._filter_sessions(all_sessions);
+ this._render_sessions(filtered);
+ });
+ }
+ else if (ev.data instanceof ArrayBuffer)
+ {
+ quietly("ws cb frame", () => {
+ const bytes = new Uint8Array(ev.data);
+ // CbObject extends CbFieldView, not CbObjectView —
+ // to_js_object() lives on CbObjectView.prototype.
+ // Bridge via as_object() which wraps the field as
+ // a view of the same underlying bytes.
+ const frame = new CbObject(bytes).as_object().to_js_object();
+ this._handle_ws_frame(frame);
+ });
}
- catch (e) { /* ignore parse errors */ }
};
+ ws.onopen = () => {
+ // Resubscribe after a (re)connect if a session is already
+ // selected, so live tailing resumes without a reselect.
+ this._resubscribe_log();
+ };
ws.onclose = () => { this._ws = null; };
ws.onerror = () => { ws.close(); };
this._ws = ws;
+ });
+ }
+
+ _handle_ws_frame(frame)
+ {
+ if (frame && frame.type === "log")
+ {
+ // Guard against stale deltas that arrive after the user has
+ // switched sessions — the server does its best but there's
+ // always a window.
+ if (frame.session !== this._log_session_id)
+ {
+ return;
+ }
+ if (typeof frame.cursor === "number" && frame.cursor < this._log_cursor)
+ {
+ // Cursor regressed (session reset while we were subscribed).
+ this._resync_log_from_zero();
+ return;
+ }
+ this._log_cursor = frame.cursor || this._log_cursor;
+ if (Array.isArray(frame.entries) && frame.entries.length > 0)
+ {
+ this._append_log_entries(frame.entries);
+ }
}
- catch (e) { /* WebSocket not available */ }
+ }
+
+ // Wipe the panel and re-replay the log from cursor 0, then re-
+ // subscribe so the WS feeds deltas from the fresh tail. Used both
+ // when the WS frame reports a cursor regression (session reset
+ // while we were subscribed) and when an HTTP refetch sees the same.
+ // Returns the underlying fetch promise so callers can await if they
+ // need ordering guarantees.
+ _resync_log_from_zero()
+ {
+ this._log_cursor = 0;
+ if (this._log_body) { this._log_body.replaceChildren(); }
+ return this._fetch_log().then(() => this._subscribe_log());
+ }
+
+ _ws_send(obj)
+ {
+ const ws = this._ws;
+ if (!ws || ws.readyState !== WebSocket.OPEN) { return false; }
+ return quietly("ws send", () => { ws.send(JSON.stringify(obj)); return true; }) === true;
+ }
+
+ _subscribe_log()
+ {
+ if (!this._log_session_id) { return; }
+ // Don't subscribe until the initial replay has resolved — the
+ // cursor is stale 0 until then and the server would flush the
+ // entire history we're about to fetch via HTTP, duplicating
+ // every line in the DOM.
+ if (!this._log_fetch_done) { return; }
+ this._ws_send({ type: "sub_log", session: this._log_session_id, cursor: this._log_cursor | 0 });
+ }
+
+ _unsubscribe_log()
+ {
+ this._ws_send({ type: "unsub_log" });
+ }
+
+ // Called on ws.onopen to restore the subscription after a reconnect.
+ _resubscribe_log()
+ {
+ this._subscribe_log();
}
_init_status_tabs(host, active_status)
{
+ const row = document.createElement("div");
+ row.className = "sessions-header-row";
+ host.tag().inner().appendChild(row);
+
const tabs_el = document.createElement("div");
tabs_el.className = "history-tabs";
- tabs_el.style.marginBottom = "8px";
- tabs_el.style.width = "fit-content";
- host.tag().inner().appendChild(tabs_el);
+ row.appendChild(tabs_el);
const make_tab = (label, mode) => {
- const btn = document.createElement("button");
- btn.className = "history-tab";
- btn.textContent = label;
- if (mode === active_status)
- {
- btn.classList.add("active");
- }
- btn.addEventListener("click", () => {
- if (mode === active_status) { return; }
- this.set_param("status", mode);
- this.reload();
- });
- tabs_el.appendChild(btn);
+ tabs_el.appendChild(this._make_history_tab(label, {
+ active: mode === active_status,
+ on_click: mode === active_status ? null : () => {
+ this.set_param("status", mode);
+ this.reload();
+ },
+ }));
};
make_tab("Active", "active");
make_tab("Ended", "ended");
make_tab("All", "all");
+
+ const filter_input = document.createElement("input");
+ filter_input.type = "search";
+ filter_input.className = "sessions-list-filter";
+ filter_input.placeholder = "Filter\u2026";
+ filter_input.autocomplete = "off";
+ filter_input.spellcheck = false;
+ filter_input.addEventListener("input", () => {
+ this._text_filter = filter_input.value.toLowerCase().trim();
+ this._page = 0;
+ if (this._last_sessions)
+ {
+ this._render_sessions(this._last_sessions);
+ }
+ });
+ row.appendChild(filter_input);
+
+ // Right-aligned pager host; populated per-render in _render_sessions
+ // so the arrows don't live inside the table (where they'd shift
+ // vertically as the table grows/shrinks between pages).
+ const spacer = document.createElement("span");
+ spacer.style.flex = "1";
+ row.appendChild(spacer);
+
+ this._pager_host = document.createElement("div");
+ this._pager_host.className = "sessions-header-pager";
+ row.appendChild(this._pager_host);
+ }
+
+ _session_detail_metadata(session)
+ {
+ const details = {
+ status: session.ended_at ? "ended" : "active",
+ session_id: session.id || "-",
+ parent_session_id: session.parent_session_id || "-",
+ appname: session.appname || "-",
+ mode: session.mode || "-",
+ platform: session.platform || "-",
+ pid: session.pid || "-",
+ jobid: session.jobid || "-",
+ created_at: session.created_at ? fmt_date(session.created_at) : "-",
+ updated_at: session.updated_at ? fmt_date(session.updated_at) : "-",
+ };
+ if (session.ended_at)
+ {
+ details.ended_at = fmt_date(session.ended_at);
+ }
+ if (session.log_count)
+ {
+ details.log_count = session.log_count;
+ }
+ return details;
}
_select_session(row, session)
@@ -304,72 +781,89 @@ export class Page extends ZenPage
cell.classList.add("sessions-selected");
}
- // Only rebuild the detail panel and log when the session changes
- if (!changed)
- {
- return;
- }
-
- // Rebuild detail panel
- const panel = this._detail_panel;
- panel.inner().replaceChildren();
-
- panel.tag("h3").text("Session Details");
-
- const props = new PropTable(panel);
- props.add_property("id", session.id || "-");
- props.add_property("appname", session.appname || "-");
- if (session.mode)
+ // Rebuild the bottom panel only when the selection actually changes.
+ if (changed)
{
- props.add_property("mode", session.mode);
+ this._show_session_panel(session);
}
- if (session.jobid)
- {
- props.add_property("jobid", session.jobid);
- }
- props.add_property("created", fmt_date(session.created_at));
- props.add_property("updated", fmt_date(session.updated_at));
- if (session.ended_at)
- {
- props.add_property("ended", fmt_date(session.ended_at));
- }
-
- if (session.metadata && Object.keys(session.metadata).length > 0)
- {
- panel.tag("h3").text("Metadata");
- const meta_props = new PropTable(panel);
- meta_props.add_object(session.metadata);
- }
-
- // Show log panel for this session
- this._show_log(session.id);
}
- _show_log(session_id)
+ _show_session_panel(session)
{
- // Stop any existing poll
- if (this._log_poll_timer)
+ // Unsubscribe from the previous session's log stream (if any)
+ // before we switch. The server treats a subsequent sub_log as a
+ // replacement, but an explicit unsub makes the intent clear and
+ // stops any in-flight pushes that could arrive as we're wiping
+ // the panel.
+ if (this._log_session_id && this._log_session_id !== session.id)
{
- clearInterval(this._log_poll_timer);
- this._log_poll_timer = null;
+ this._unsubscribe_log();
}
- this._log_session_id = session_id;
+ this._log_session_id = session.id;
this._log_cursor = 0; // monotonic cursor for incremental fetching
+ this._log_fetch_done = false; // gates _subscribe_log until replay resolves
this._log_follow = true;
this._log_newest_first = true;
+ this._log_filter = "";
- this._log_panel.inner().style.display = "";
- this._log_panel.inner().replaceChildren();
+ this._panel.inner().style.display = "";
+ this._panel.inner().replaceChildren();
- // Header
+ // Header with tab strip, filter, and log-view controls
const header = document.createElement("div");
header.className = "sessions-log-header";
- const title = document.createElement("span");
- title.className = "sessions-log-title";
- title.textContent = "Log";
- header.appendChild(title);
+ const tabs = document.createElement("div");
+ tabs.className = "sessions-panel-tabs";
+ header.appendChild(tabs);
+
+ const log_tab = document.createElement("button");
+ log_tab.type = "button";
+ log_tab.className = "sessions-panel-tab";
+ log_tab.textContent = "Log";
+ tabs.appendChild(log_tab);
+
+ const meta_tab = document.createElement("button");
+ meta_tab.type = "button";
+ meta_tab.className = "sessions-panel-tab";
+ meta_tab.textContent = "Metadata";
+ tabs.appendChild(meta_tab);
+
+ // Spacer sits between the tab strip and the right-hand controls so
+ // the Expand button stays flush right on both tabs (log_controls is
+ // hidden on Metadata — if the spacer lived there too, the button
+ // would jump left).
+ const spacer = document.createElement("span");
+ spacer.className = "sessions-log-spacer";
+ header.appendChild(spacer);
+
+ // Log-only controls: filter, newest-first, follow. Hidden when the
+ // Metadata tab is active since they don't apply there.
+ const log_controls = document.createElement("span");
+ log_controls.className = "sessions-log-controls";
+ header.appendChild(log_controls);
+
+ // Level filter: hides entries below the selected severity. Sits
+ // before the text filter since level is a coarser cut than text.
+ const level_select = document.createElement("select");
+ level_select.className = "sessions-log-level-filter";
+ level_select.title = "Hide log entries below this severity level";
+ for (const opt of LEVEL_FILTER_OPTIONS)
+ {
+ const o = document.createElement("option");
+ o.value = opt.value;
+ o.textContent = opt.label;
+ level_select.appendChild(o);
+ }
+ level_select.value = this._log_min_level_name;
+ level_select.addEventListener("change", () => {
+ this._log_min_level_name = level_select.value;
+ const opt = LEVEL_FILTER_OPTIONS.find(o => o.value === level_select.value);
+ this._log_min_level = opt ? opt.rank : -1;
+ this._apply_log_filter();
+ });
+ log_controls.appendChild(level_select);
const filter_input = document.createElement("input");
filter_input.type = "text";
@@ -379,12 +873,7 @@ export class Page extends ZenPage
this._log_filter = filter_input.value.toLowerCase();
this._apply_log_filter();
});
- header.appendChild(filter_input);
- this._log_filter = "";
-
- const spacer = document.createElement("span");
- spacer.style.flex = "1";
- header.appendChild(spacer);
+ log_controls.appendChild(filter_input);
const order_btn = document.createElement("button");
order_btn.className = "history-tab active";
@@ -394,7 +883,7 @@ export class Page extends ZenPage
order_btn.classList.toggle("active", this._log_newest_first);
this._reorder_log();
});
- header.appendChild(order_btn);
+ log_controls.appendChild(order_btn);
const follow_btn = document.createElement("button");
follow_btn.className = "history-tab active";
@@ -407,30 +896,113 @@ export class Page extends ZenPage
this._scroll_to_follow();
}
});
- header.appendChild(follow_btn);
+ log_controls.appendChild(follow_btn);
this._log_follow_btn = follow_btn;
- this._log_panel.inner().appendChild(header);
+ // Expand / collapse toggle: applies to the whole page layout (table
+ // vs log panel balance) so it lives outside log_controls and stays
+ // visible on both tabs. Double-chevron direction mirrors the way
+ // the panel grows — up when there's room to expand, down when
+ // expanded and ready to collapse back.
+ const expand_btn = document.createElement("button");
+ expand_btn.type = "button";
+ expand_btn.className = "history-tab sessions-panel-toggle";
+ const refresh_toggle = () => {
+ expand_btn.innerHTML = this._log_expanded ? ICON_CHEVRON_DOWN : ICON_CHEVRON_UP;
+ expand_btn.title = this._log_expanded
+ ? "Restore the sessions table"
+ : "Collapse the sessions table to focus on this session's log";
+ expand_btn.setAttribute("aria-label", this._log_expanded ? "Collapse log panel" : "Expand log panel");
+ expand_btn.classList.toggle("active", this._log_expanded);
+ };
+ refresh_toggle();
+ expand_btn.addEventListener("click", () => {
+ this._log_expanded = !this._log_expanded;
+ refresh_toggle();
+ if (this._last_sessions) { this._render_sessions(this._last_sessions); }
+ });
+ header.appendChild(expand_btn);
+
+ this._panel.inner().appendChild(header);
// Log body
- const body = document.createElement("div");
- body.className = "sessions-log-body";
- body.addEventListener("scroll", () => {
+ const log_body = document.createElement("div");
+ log_body.className = "sessions-log-body";
+ log_body.addEventListener("scroll", () => {
const at_follow_edge = this._log_newest_first
- ? (body.scrollTop <= 4)
- : (body.scrollTop + body.clientHeight >= body.scrollHeight - 4);
+ ? (log_body.scrollTop <= 4)
+ : (log_body.scrollTop + log_body.clientHeight >= log_body.scrollHeight - 4);
if (this._log_follow !== at_follow_edge)
{
this._log_follow = at_follow_edge;
this._log_follow_btn.classList.toggle("active", this._log_follow);
}
});
- this._log_panel.inner().appendChild(body);
- this._log_body = body;
+ this._panel.inner().appendChild(log_body);
+ this._log_body = log_body;
+
+ // Metadata/details body. Keep polling running regardless of which tab is
+ // visible so cursors stay fresh. Free-form metadata gets the primary
+ // left-hand panel; core session fields sit beside it on the right.
+ // Use .tag() so child panels are real Components — PropTable reaches into
+ // its parent's DOM element through the Component API.
+ const meta_body_widget = this._panel.tag().classify("sessions-metadata-body");
+ const meta_body = meta_body_widget.inner();
+ const meta_layout = meta_body_widget.tag().classify("sessions-metadata-layout");
+ const metadata_panel = meta_layout.tag().classify("sessions-metadata-panel");
+ const details_panel = meta_layout.tag().classify("sessions-metadata-panel").classify("sessions-metadata-core-panel");
+
+ const metadata_heading = document.createElement("div");
+ metadata_heading.className = "sessions-metadata-heading";
+ metadata_heading.textContent = "Metadata";
+ metadata_panel.inner().appendChild(metadata_heading);
+
+ const has_metadata = session.metadata && Object.keys(session.metadata).length > 0;
+ if (has_metadata)
+ {
+ const meta_props = new PropTable(metadata_panel);
+ meta_props.add_object(session.metadata);
+ }
+ else
+ {
+ const empty = document.createElement("div");
+ empty.className = "sessions-log-empty";
+ empty.textContent = "No metadata.";
+ metadata_panel.inner().appendChild(empty);
+ }
- // Initial fetch + start polling
- this._fetch_log();
- this._log_poll_timer = setInterval(() => this._fetch_log(), 2000);
+ const details_heading = document.createElement("div");
+ details_heading.className = "sessions-metadata-heading";
+ details_heading.textContent = "Session Information";
+ details_panel.inner().appendChild(details_heading);
+
+ const detail_props = new PropTable(details_panel);
+ detail_props.add_object(this._session_detail_metadata(session));
+
+ const set_active_tab = (tab) => {
+ this._active_tab = tab;
+ const is_log = tab === "log";
+ log_tab.classList.toggle("active", is_log);
+ meta_tab.classList.toggle("active", !is_log);
+ log_body.style.display = is_log ? "" : "none";
+ meta_body.style.display = is_log ? "none" : "";
+ log_controls.style.display = is_log ? "" : "none";
+ };
+ log_tab.addEventListener("click", () => set_active_tab("log"));
+ meta_tab.addEventListener("click", () => set_active_tab("meta"));
+ set_active_tab(this._active_tab || "log");
+
+ // Initial HTTP fetch gives us the full history in one shot; after
+ // it returns we hand off to the WebSocket for live deltas. Mark
+ // the panel as "fetch done" so _resubscribe_log (fired from
+ // ws.onopen) can avoid racing a too-early subscribe with
+ // cursor=0 that'd cause a duplicate flush. No more setInterval
+ // — pushes arrive the moment an entry is appended. See
+ // _handle_ws_frame.
+ this._fetch_log().then(() => {
+ this._log_fetch_done = true;
+ this._subscribe_log();
+ });
}
async _fetch_log()
@@ -453,10 +1025,10 @@ export class Page extends ZenPage
if (cursor < this._log_cursor)
{
- // Cursor went backwards — session was reset. Full re-render.
- this._log_cursor = 0;
- this._log_body.replaceChildren();
- this._fetch_log();
+ // Cursor went backwards — session was reset. Resync via
+ // the shared helper so the WS-frame and HTTP-fetch paths
+ // stay in lockstep.
+ await this._resync_log_from_zero();
return;
}
@@ -471,7 +1043,12 @@ export class Page extends ZenPage
this._show_log_empty();
}
}
- catch (e) { /* ignore */ }
+ catch (e)
+ {
+ // quietly() can't wrap an awaited body, so the catch is open-coded
+ // here — same debug-log policy as the sync paths above.
+ console.debug("[sessions] fetch log:", e);
+ }
}
_show_log_empty()
@@ -520,6 +1097,30 @@ export class Page extends ZenPage
}
}
+ // Cap DOM size. Drop the oldest lines from whichever end of the
+ // container holds them — that's the bottom in newest-first mode,
+ // the top in oldest-first mode. The user can no longer scroll
+ // further back than MAX_LOG_LINES_IN_DOM until they switch
+ // sessions and replay from cursor 0.
+ const overflow = body.children.length - MAX_LOG_LINES_IN_DOM;
+ if (overflow > 0)
+ {
+ if (this._log_newest_first)
+ {
+ for (let i = 0; i < overflow; i++)
+ {
+ body.removeChild(body.lastElementChild);
+ }
+ }
+ else
+ {
+ for (let i = 0; i < overflow; i++)
+ {
+ body.removeChild(body.firstElementChild);
+ }
+ }
+ }
+
if (this._log_follow)
{
this._scroll_to_follow();
@@ -572,12 +1173,56 @@ export class Page extends ZenPage
if (entry.level)
{
+ const key = entry.level.toLowerCase();
+ const rank = LEVEL_RANK[key];
+ // Stamp the rank so _line_passes_filters can check it later
+ // without re-parsing the text. Unknown levels leave it unset.
+ if (rank !== undefined) { line.dataset.levelRank = String(rank); }
const lvl = document.createElement("span");
- lvl.className = "sessions-log-level sessions-log-level-" + entry.level.toLowerCase();
+ lvl.className = "sessions-log-level sessions-log-level-" + key;
lvl.textContent = entry.level;
line.appendChild(lvl);
}
+ // Always render the logger column (even if empty) so the message
+ // column stays aligned across rows whether or not a category is set.
+ const cat = document.createElement("span");
+ cat.className = "sessions-log-logger";
+ if (entry.logger)
+ {
+ cat.textContent = entry.logger;
+ cat.title = entry.logger;
+ }
+ line.appendChild(cat);
+
+ // Marker for UE_LOGFMT structured entries. The server pre-renders
+ // `format` against `fields` into `message`, but both raw pieces ride
+ // along in the JSON so we can flag them visually and let a future UI
+ // hook in for field-level drill-down. Tooltip shows the raw template
+ // plus the arguments bag so you can see exactly what UE sent.
+ if (entry.format)
+ {
+ const fmt_marker = document.createElement("span");
+ fmt_marker.className = "sessions-log-fmt-marker";
+ fmt_marker.textContent = "{\u2026}";
+ let tooltip = "format: " + entry.format;
+ if (entry.fields && Object.keys(entry.fields).length > 0)
+ {
+ try
+ {
+ tooltip += "\nfields: " + JSON.stringify(entry.fields, null, 2);
+ }
+ catch (_e)
+ {
+ // Shouldn't happen for server-produced JSON, but guard
+ // against self-referential structures just in case.
+ tooltip += "\nfields: <unserializable>";
+ }
+ }
+ fmt_marker.title = tooltip;
+ line.appendChild(fmt_marker);
+ }
+
if (entry.message)
{
const msg = document.createElement("span");
@@ -586,20 +1231,36 @@ export class Page extends ZenPage
line.appendChild(msg);
}
- if (entry.data && Object.keys(entry.data).length > 0)
+ if (!this._line_passes_filters(line))
{
- const data_span = document.createElement("span");
- data_span.className = "sessions-log-data";
- data_span.textContent = JSON.stringify(entry.data);
- line.appendChild(data_span);
+ line.style.display = "none";
}
+ return line;
+ }
+
+ // Shared predicate for both the initial render (_create_log_line) and
+ // full sweeps (_apply_log_filter). Keeps the two paths in sync.
+ _line_passes_filters(line)
+ {
+ if (this._log_min_level >= 0)
+ {
+ const rank_str = line.dataset.levelRank;
+ // Entries without a known level rank pass through — they may
+ // carry info that shouldn't be silently dropped (e.g. the
+ // synthetic "session ended" line or legacy entries without a
+ // level field).
+ if (rank_str !== undefined && rank_str !== "")
+ {
+ const rank = Number(rank_str);
+ if (!Number.isNaN(rank) && rank < this._log_min_level) { return false; }
+ }
+ }
if (this._log_filter && !line.textContent.toLowerCase().includes(this._log_filter))
{
- line.style.display = "none";
+ return false;
}
-
- return line;
+ return true;
}
_apply_log_filter()
@@ -608,17 +1269,9 @@ export class Page extends ZenPage
{
return;
}
- const filter = this._log_filter;
for (const line of this._log_body.querySelectorAll(".sessions-log-line"))
{
- if (!filter || line.textContent.toLowerCase().includes(filter))
- {
- line.style.display = "";
- }
- else
- {
- line.style.display = "none";
- }
+ line.style.display = this._line_passes_filters(line) ? "" : "none";
}
}
}
diff --git a/src/zenserver/frontend/html/theme.js b/src/zenserver/frontend/html/theme.js
index 52ca116ab..7382d3ea0 100644
--- a/src/zenserver/frontend/html/theme.js
+++ b/src/zenserver/frontend/html/theme.js
@@ -4,18 +4,25 @@
// Persists choice in localStorage. Applies data-theme attribute on <html>.
(function() {
- var KEY = 'zen-theme';
-
- function getStored() {
- try { return localStorage.getItem(KEY); } catch (e) { return null; }
+ // Wrap localStorage so a single key's get/set/clear all swallow the
+ // SecurityError that fires in private-mode / cookies-disabled browsers.
+ // `clear` removes the key entirely (used for theme to reset to system
+ // preference); `set` stores the raw value passed (callers serialize).
+ function safeStorage(key) {
+ return {
+ get: function() {
+ try { return localStorage.getItem(key); } catch (e) { return null; }
+ },
+ set: function(value) {
+ try { localStorage.setItem(key, value); } catch (e) {}
+ },
+ clear: function() {
+ try { localStorage.removeItem(key); } catch (e) {}
+ },
+ };
}
- function setStored(value) {
- try {
- if (value) localStorage.setItem(KEY, value);
- else localStorage.removeItem(KEY);
- } catch (e) {}
- }
+ var themeStore = safeStorage('zen-theme');
function apply(theme) {
if (theme)
@@ -30,32 +37,53 @@
}
// Apply stored preference immediately (before paint)
- var stored = getStored();
- apply(stored);
+ apply(themeStore.get());
+
+ // Wide-mode preference: persisted across sessions, applied before paint
+ // so the layout doesn't flash at the default width on reload. Lifts the
+ // 1400px #container cap and the body's horizontal padding so the main
+ // content fills the viewport edge-to-edge.
+ var wideStore = safeStorage('zen-wide');
+ function getWide() { return wideStore.get() === 'true'; }
+ function setWide(value) {
+ if (value) wideStore.set('true');
+ else wideStore.clear();
+ }
+ function applyWide(wide) {
+ if (wide) document.documentElement.setAttribute('data-wide', 'true');
+ else document.documentElement.removeAttribute('data-wide');
+ }
+ applyWide(getWide());
+
+ // Double-chevron SVGs for the wide toggle — outward when content is
+ // narrow (click to fill the viewport), inward when wide (click to snap
+ // back to the 1400px cap). currentColor so button styles tint it.
+ var ICON_WIDEN = '<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true"><polyline points="8 6 2 12 8 18"/><polyline points="16 6 22 12 16 18"/></svg>';
+ var ICON_NARROW = '<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true"><polyline points="4 6 10 12 4 18"/><polyline points="20 6 14 12 20 18"/></svg>';
// Create toggle button once DOM is ready
function createToggle() {
var btn = document.createElement('button');
btn.id = 'zen_theme_toggle';
+ btn.className = 'zen-floating-toggle';
btn.title = 'Toggle theme';
function updateIcon() {
- var effective = getEffective(getStored());
+ var effective = getEffective(themeStore.get());
// Show sun in dark mode (click to go light), moon in light mode (click to go dark)
btn.textContent = effective === 'dark' ? '\u2600' : '\u263E';
- var isManual = getStored() != null;
+ var isManual = themeStore.get() != null;
btn.title = isManual
? 'Theme: ' + effective + ' (click to change, double-click for system)'
: 'Theme: system (click to change)';
}
btn.addEventListener('click', function() {
- var current = getStored();
- var effective = getEffective(current);
+ var effective = getEffective(themeStore.get());
// Toggle to the opposite
var next = effective === 'dark' ? 'light' : 'dark';
- setStored(next);
+ themeStore.set(next);
apply(next);
updateIcon();
});
@@ -63,26 +91,26 @@
btn.addEventListener('dblclick', function(e) {
e.preventDefault();
// Reset to system preference
- setStored(null);
+ themeStore.clear();
apply(null);
updateIcon();
});
// Update icon when system preference changes
window.matchMedia('(prefers-color-scheme: dark)').addEventListener('change', function() {
- if (!getStored()) updateIcon();
+ if (!themeStore.get()) updateIcon();
});
updateIcon();
document.body.appendChild(btn);
// WebSocket pause/play toggle
- var WS_KEY = 'zen-ws-paused';
+ var wsStore = safeStorage('zen-ws-paused');
var wsBtn = document.createElement('button');
wsBtn.id = 'zen_ws_toggle';
+ wsBtn.className = 'zen-floating-toggle';
- var initialPaused = false;
- try { initialPaused = localStorage.getItem(WS_KEY) === 'true'; } catch (e) {}
+ var initialPaused = wsStore.get() === 'true';
function updateWsIcon(paused) {
wsBtn.dataset.paused = paused ? 'true' : 'false';
@@ -92,21 +120,43 @@
updateWsIcon(initialPaused);
- // Fire initial event so pages pick up persisted state
- document.addEventListener('DOMContentLoaded', function() {
- if (initialPaused) {
- document.dispatchEvent(new CustomEvent('zen-ws-toggle', { detail: { paused: true } }));
- }
- });
+ // No initial event is dispatched: createToggle runs at (or after)
+ // DOMContentLoaded, so any listener gated on DOMContentLoaded would
+ // not fire. Page scripts read localStorage('zen-ws-paused') directly
+ // for their initial paused state and subscribe to zen-ws-toggle for
+ // subsequent transitions.
wsBtn.addEventListener('click', function() {
var paused = wsBtn.dataset.paused !== 'true';
- try { localStorage.setItem(WS_KEY, paused ? 'true' : 'false'); } catch (e) {}
+ wsStore.set(paused ? 'true' : 'false');
updateWsIcon(paused);
document.dispatchEvent(new CustomEvent('zen-ws-toggle', { detail: { paused: paused } }));
});
document.body.appendChild(wsBtn);
+
+ // Wide-mode toggle. Sits to the left of the pause and theme toggles.
+ var wideBtn = document.createElement('button');
+ wideBtn.id = 'zen_wide_toggle';
+ wideBtn.className = 'zen-floating-toggle';
+
+ function updateWideIcon(wide) {
+ wideBtn.dataset.wide = wide ? 'true' : 'false';
+ wideBtn.innerHTML = wide ? ICON_NARROW : ICON_WIDEN;
+ wideBtn.title = wide ? 'Narrow the main content' : 'Fill the viewport width';
+ wideBtn.setAttribute('aria-label', wide ? 'Narrow content' : 'Widen content');
+ }
+
+ updateWideIcon(getWide());
+
+ wideBtn.addEventListener('click', function() {
+ var wide = !getWide();
+ setWide(wide);
+ applyWide(wide);
+ updateWideIcon(wide);
+ });
+
+ document.body.appendChild(wideBtn);
}
if (document.readyState === 'loading')
diff --git a/src/zenserver/frontend/html/util/compactbinary.js b/src/zenserver/frontend/html/util/compactbinary.js
index 270c96a2f..bd5bf95b3 100644
--- a/src/zenserver/frontend/html/util/compactbinary.js
+++ b/src/zenserver/frontend/html/util/compactbinary.js
@@ -334,7 +334,7 @@ CbFieldView.prototype.clone = function()
{
const ret = new CbFieldView()
ret._type = this._type;
- ret._name = ret._name;
+ ret._name = this._name;
ret._data_view = new Uint8Array(this._data_view);
return ret;
}
@@ -352,8 +352,10 @@ CbObjectView.prototype[Symbol.iterator] = function()
var data_view = this.get_payload();
const [payload_size, varint_len] = VarInt.read_uint(data_view);
+ // Empty object — return a proper empty iterator, not a bare `{}` which
+ // would crash `for...of` with "undefined is not a function".
if (payload_size == 0)
- return {};
+ return [][Symbol.iterator]();
data_view = data_view.subarray(varint_len, payload_size + varint_len);
var uniform_type = CbFieldType.HasFieldType;
@@ -467,8 +469,10 @@ CbArrayView.prototype[Symbol.iterator] = function()
data_view = data_view.subarray(varint_len, payload_size + varint_len);
const item_count_bytes = VarInt.measure(data_view);
+ // Empty array — return a proper empty iterator, not a bare `{}` which
+ // would crash `for...of` with "undefined is not a function".
if (item_count_bytes >= payload_size)
- return {};
+ return [][Symbol.iterator]();
data_view = data_view.subarray(item_count_bytes);
var uniform_type = CbFieldType.HasFieldType;
diff --git a/src/zenserver/frontend/html/zen.css b/src/zenserver/frontend/html/zen.css
index d3c6c9036..46714a83d 100644
--- a/src/zenserver/frontend/html/zen.css
+++ b/src/zenserver/frontend/html/zen.css
@@ -2,64 +2,12 @@
/* theme -------------------------------------------------------------------- */
-/* system preference (default) */
-@media (prefers-color-scheme: light) {
- :root {
- --theme_g0: #1f2328;
- --theme_g1: #656d76;
- --theme_g2: #d0d7de;
- --theme_g3: #f6f8fa;
- --theme_g4: #ffffff;
-
- --theme_p0: #0969da;
- --theme_p4: #ddf4ff;
- --theme_p1: color-mix(in oklab, var(--theme_p0), var(--theme_p4) 35%);
- --theme_p2: color-mix(in oklab, var(--theme_p0), var(--theme_p4) 60%);
- --theme_p3: color-mix(in oklab, var(--theme_p0), var(--theme_p4) 85%);
-
- --theme_ln: var(--theme_p0);
- --theme_er: #ffebe9;
-
- --theme_ok: #1a7f37;
- --theme_warn: #9a6700;
- --theme_fail: #cf222e;
-
- --theme_bright: #1f2328;
- --theme_faint: #6e7781;
- --theme_border_subtle: #d8dee4;
- --theme_highlight: #b8860b44;
- }
-}
-
-@media (prefers-color-scheme: dark) {
- :root {
- --theme_g0: #c9d1d9;
- --theme_g1: #8b949e;
- --theme_g2: #30363d;
- --theme_g3: #161b22;
- --theme_g4: #0d1117;
-
- --theme_p0: #58a6ff;
- --theme_p4: #1c2128;
- --theme_p1: color-mix(in oklab, var(--theme_p0), var(--theme_p4) 35%);
- --theme_p2: color-mix(in oklab, var(--theme_p0), var(--theme_p4) 60%);
- --theme_p3: color-mix(in oklab, var(--theme_p0), var(--theme_p4) 85%);
-
- --theme_ln: #58a6ff;
- --theme_er: #1c1c1c;
-
- --theme_ok: #3fb950;
- --theme_warn: #d29922;
- --theme_fail: #f85149;
-
- --theme_bright: #f0f6fc;
- --theme_faint: #6e7681;
- --theme_border_subtle: #21262d;
- --theme_highlight: #e3b341aa;
- }
-}
-
-/* manual overrides (higher specificity than media queries) */
+/* Light tokens apply to the explicit data-theme="light" override and as the
+ default when no system preference matches the dark @media query below.
+ Dark tokens apply to data-theme="dark" and (when no explicit preference is
+ set) to dark system preference. Selector lists keep each token list defined
+ exactly once. */
+:root,
:root[data-theme="light"] {
--theme_g0: #1f2328;
--theme_g1: #656d76;
@@ -67,6 +15,10 @@
--theme_g3: #f6f8fa;
--theme_g4: #ffffff;
+ /* surface backgrounds: bg0 matches the body, bg1 is one step raised */
+ --theme_bg0: var(--theme_g4);
+ --theme_bg1: var(--theme_g3);
+
--theme_p0: #0969da;
--theme_p4: #ddf4ff;
--theme_p1: color-mix(in oklab, var(--theme_p0), var(--theme_p4) 35%);
@@ -86,6 +38,32 @@
--theme_highlight: #b8860b44;
}
+@media (prefers-color-scheme: dark) {
+ :root:not([data-theme="light"]) {
+ --theme_g0: #c9d1d9;
+ --theme_g1: #8b949e;
+ --theme_g2: #30363d;
+ --theme_g3: #161b22;
+ --theme_g4: #0d1117;
+
+ --theme_p0: #58a6ff;
+ --theme_p4: #1c2128;
+
+ --theme_ln: #58a6ff;
+ --theme_er: #1c1c1c;
+
+ --theme_ok: #3fb950;
+ --theme_warn: #d29922;
+ --theme_fail: #f85149;
+
+ --theme_bright: #f0f6fc;
+ --theme_faint: #6e7681;
+ --theme_border_subtle: #21262d;
+ --theme_highlight: #e3b341aa;
+ }
+}
+
+/* Manual data-theme="dark" wins over system preference. */
:root[data-theme="dark"] {
--theme_g0: #c9d1d9;
--theme_g1: #8b949e;
@@ -95,9 +73,6 @@
--theme_p0: #58a6ff;
--theme_p4: #1c2128;
- --theme_p1: color-mix(in oklab, var(--theme_p0), var(--theme_p4) 35%);
- --theme_p2: color-mix(in oklab, var(--theme_p0), var(--theme_p4) 60%);
- --theme_p3: color-mix(in oklab, var(--theme_p0), var(--theme_p4) 85%);
--theme_ln: #58a6ff;
--theme_er: #1c1c1c;
@@ -114,10 +89,12 @@
/* theme toggle ------------------------------------------------------------- */
-#zen_ws_toggle {
+/* Shared shape for the fixed top-right utility buttons (theme, wide, ws-pause).
+ Per-button rules below add only the `right` offset and any glyph-specific
+ typography (font-size for emoji buttons, padding-zero for SVG buttons). */
+.zen-floating-toggle {
position: fixed;
top: 16px;
- right: 60px;
z-index: 10;
width: 36px;
height: 36px;
@@ -129,43 +106,48 @@
display: flex;
align-items: center;
justify-content: center;
- font-size: 18px;
- line-height: 1;
transition: color 0.15s, background 0.15s, border-color 0.15s;
user-select: none;
}
-#zen_ws_toggle:hover {
+.zen-floating-toggle:hover {
color: var(--theme_g0);
background: var(--theme_p4);
border-color: var(--theme_g1);
}
#zen_theme_toggle {
- position: fixed;
- top: 16px;
right: 16px;
- z-index: 10;
- width: 36px;
- height: 36px;
- border-radius: 6px;
- border: 1px solid var(--theme_g2);
- background: var(--theme_g3);
- color: var(--theme_g1);
- cursor: pointer;
- display: flex;
- align-items: center;
- justify-content: center;
font-size: 18px;
line-height: 1;
- transition: color 0.15s, background 0.15s, border-color 0.15s;
- user-select: none;
}
-#zen_theme_toggle:hover {
- color: var(--theme_g0);
- background: var(--theme_p4);
- border-color: var(--theme_g1);
+#zen_ws_toggle {
+ right: 60px;
+ font-size: 18px;
+ line-height: 1;
+}
+
+#zen_wide_toggle {
+ right: 104px;
+ padding: 0;
+}
+
+#zen_wide_toggle svg {
+ width: 18px;
+ height: 18px;
+ display: block;
+}
+
+/* Wide mode: lift the 1400px cap on the main container and drop the body's
+ horizontal padding so content fills the viewport edge-to-edge. Vertical
+ body padding stays so content doesn't touch the top of the viewport. */
+html[data-wide="true"] body {
+ padding-left: 0;
+ padding-right: 0;
+}
+html[data-wide="true"] #container {
+ max-width: none;
}
/* page --------------------------------------------------------------------- */
@@ -342,15 +324,39 @@ a {
height: calc(100vh - 80px);
}
-.sessions-layout {
+.sessions-header-row {
display: flex;
- gap: 1.5em;
- align-items: flex-start;
- flex-shrink: 0;
+ align-items: center;
+ gap: 12px;
+ margin-bottom: 8px;
+ flex-wrap: wrap;
+}
+
+.sessions-list-filter {
+ padding: 6px 12px;
+ font-size: 14px;
+ font-family: inherit;
+ border: 1px solid var(--theme_g2);
+ border-radius: 6px;
+ background: var(--theme_bg1);
+ color: var(--theme_bright);
+ outline: none;
+ width: 240px;
+ max-width: 100%;
+}
+.sessions-list-filter:focus {
+ border-color: var(--theme_ln);
+ background: var(--theme_bg0);
+}
+.sessions-list-filter::placeholder {
+ color: var(--theme_g1);
}
.sessions-table {
- flex: 1;
+ /* Natural height so the bottom panel sits right below the last row. The
+ section's column-flex lets .sessions-log-panel (flex: 1) absorb the
+ remaining vertical space for log viewing. */
+ flex-shrink: 0;
min-width: 0;
}
@@ -358,31 +364,157 @@ a {
text-align: right;
}
-.sessions-table .zen_table > div > div:nth-child(2) {
+/* appname (col 1), mode (col 2), platform (col 3) read better left-aligned */
+.sessions-table .zen_table > div > div:nth-child(1),
+.sessions-table .zen_table > div > div:nth-child(2),
+.sessions-table .zen_table > div > div:nth-child(3) {
+ text-align: left;
+}
+
+/* id (col 4) is a hex string — monospace */
+.sessions-table .zen_table > div > div:nth-child(4) {
font-family: 'SF Mono', 'Cascadia Mono', Consolas, 'DejaVu Sans Mono', monospace;
}
-.sessions-detail {
- width: 600px;
- flex-shrink: 0;
- font-size: 13px;
+/* Platform-column icon: sized to the table row height, picks up theme color
+ via currentColor. Unknown platforms fall back to a plain text label. */
+.platform-icon {
+ display: inline-flex;
+ align-items: center;
+ color: var(--theme_g0);
+ opacity: 0.85;
+}
+.platform-icon svg {
+ width: 16px;
+ height: 16px;
+ fill: currentColor;
}
-.sessions-detail h3 {
- margin: 0 0 0.6em 0;
- font-size: 13px;
- text-transform: uppercase;
- letter-spacing: 0.5px;
+/* Clickable column headers: the first row gets hover affordance. */
+.sessions-table .zen_table > div:first-child > div:hover {
+ color: var(--theme_g0);
+}
+.sessions-table .zen_table > div:first-child > div.sessions-sort-active {
+ color: var(--theme_ln);
+}
+
+.sessions-group-toggle,
+.sessions-group-toggle-spacer,
+.sessions-group-child-spacer {
+ display: inline-flex;
+ align-items: center;
+ justify-content: center;
+ width: 18px;
+ margin-right: 4px;
color: var(--theme_g1);
+ font-family: 'SF Mono', 'Cascadia Mono', Consolas, 'DejaVu Sans Mono', monospace;
}
-.sessions-detail .zen_table {
- margin-bottom: 1em;
+.sessions-group-toggle {
+ border: 0;
+ padding: 0;
+ background: transparent;
+ cursor: pointer;
+}
+
+.sessions-group-toggle:hover {
+ color: var(--theme_ln);
+}
+
+.sessions-child-row {
+ background-color: color-mix(in srgb, var(--theme_bg1) 75%, transparent);
+}
+
+.sessions-child-row:first-child {
+ padding-left: 14px;
}
-.sessions-detail-placeholder {
+/* Bottom-panel tab strip (Log / Metadata). Lives inside
+ .sessions-log-header alongside the log-view controls. */
+.sessions-panel-tabs {
+ display: flex;
+ gap: 2px;
+}
+.sessions-panel-tab {
+ background: transparent;
+ border: none;
+ border-bottom: 2px solid transparent;
+ padding: 4px 10px;
color: var(--theme_g1);
- font-style: italic;
+ font: inherit;
+ font-size: 12px;
+ font-weight: 600;
+ text-transform: uppercase;
+ letter-spacing: 0.5px;
+ cursor: pointer;
+}
+.sessions-panel-tab:hover {
+ color: var(--theme_g0);
+}
+.sessions-panel-tab.active {
+ color: var(--theme_ln);
+ border-bottom-color: var(--theme_ln);
+}
+
+/* Log-only controls (filter, newest-first, follow). Wrapped so we can hide
+ them as a group when the Metadata tab is active. Natural width — the
+ sibling .sessions-log-spacer does the pushing. */
+.sessions-log-controls {
+ display: flex;
+ align-items: center;
+ gap: 8px;
+}
+
+/* Flex spacer between the tab strip and the right-hand controls. Keeps the
+ Expand button flush right even when log_controls is hidden. */
+.sessions-log-spacer {
+ flex: 1;
+}
+
+.sessions-metadata-body {
+ padding: 10px 12px;
+ overflow-y: auto;
+ flex: 1;
+ min-height: 0;
+}
+
+.sessions-metadata-layout {
+ display: grid;
+ grid-template-columns: minmax(0, 1fr) minmax(280px, max-content);
+ gap: 16px;
+ align-items: start;
+}
+
+.sessions-metadata-panel {
+ min-width: 0;
+}
+
+.sessions-metadata-core-panel {
+ border-left: 1px solid var(--theme_g3);
+ padding-left: 16px;
+}
+
+.sessions-metadata-heading {
+ margin: 10px 0 6px;
+ color: var(--theme_ln);
+ font-weight: 600;
+}
+
+.sessions-metadata-heading:first-child {
+ margin-top: 0;
+}
+
+@media (max-width: 900px) {
+ .sessions-metadata-layout {
+ grid-template-columns: 1fr;
+ }
+
+ .sessions-metadata-core-panel {
+ border-left: 0;
+ border-top: 1px solid var(--theme_g3);
+ padding-left: 0;
+ padding-top: 10px;
+ }
}
.sessions-selected {
@@ -415,7 +547,6 @@ a {
color: var(--theme_g1);
}
.sessions-log-filter {
- margin-left: 12px;
padding: 6px 12px;
font-size: 14px;
font-family: inherit;
@@ -433,6 +564,37 @@ a {
.sessions-log-filter::placeholder {
color: var(--theme_g1);
}
+.sessions-log-level-filter {
+ padding: 4px 8px;
+ font-size: 12px;
+ font-family: inherit;
+ border: 1px solid var(--theme_g2);
+ border-radius: 6px;
+ background: var(--theme_bg1);
+ color: var(--theme_bright);
+ outline: none;
+ cursor: pointer;
+}
+.sessions-log-level-filter:focus {
+ border-color: var(--theme_ln);
+}
+
+/* Chevron-icon variant of .history-tab for the log-panel expand toggle.
+ Overrides the text-button padding/letterspacing so the icon sits in a
+ roughly square pill. Double-chevron up when collapsed, down when
+ expanded — see ICON_CHEVRON_* in sessions.js. */
+.sessions-panel-toggle {
+ display: inline-flex;
+ align-items: center;
+ justify-content: center;
+ padding: 4px 8px;
+ letter-spacing: 0;
+}
+.sessions-panel-toggle svg {
+ width: 14px;
+ height: 14px;
+ display: block;
+}
.sessions-log-body {
flex: 1;
min-height: 0;
@@ -474,29 +636,49 @@ a {
font-weight: 600;
}
.sessions-log-level-info { color: var(--theme_ln); }
-.sessions-log-level-warn { color: #d29922; }
-.sessions-log-level-error { color: #f85149; }
+.sessions-log-level-warn { color: var(--theme_warn); }
+.sessions-log-level-error { color: var(--theme_fail); }
.sessions-log-level-debug { color: var(--theme_g1); }
+.sessions-log-logger {
+ flex-shrink: 0;
+ width: 12em;
+ color: var(--theme_ln);
+ opacity: 0.75;
+ overflow: hidden;
+ text-overflow: ellipsis;
+ white-space: nowrap;
+}
.sessions-log-msg {
white-space: pre-wrap;
word-break: break-all;
}
-.sessions-log-data {
- color: var(--theme_g1);
- white-space: pre-wrap;
- word-break: break-all;
+.sessions-log-fmt-marker {
+ flex-shrink: 0;
+ color: var(--theme_ln);
+ opacity: 0.6;
+ font-weight: 600;
+ cursor: help;
+ font-family: 'SF Mono', 'Cascadia Mono', Consolas, 'DejaVu Sans Mono', monospace;
}
-.sessions-self-pill {
+.sessions-pill {
display: inline-block;
font-size: 0.7em;
font-weight: 600;
padding: 1px 6px;
- margin-right: 6px;
+ margin-left: 6px;
border-radius: 8px;
+ vertical-align: middle;
+ text-transform: uppercase;
+ letter-spacing: 0.3px;
+}
+.sessions-self-pill {
background-color: var(--theme_p4);
color: var(--theme_g0);
- vertical-align: middle;
+}
+.sessions-log-indicator-pill {
+ background-color: var(--theme_g2);
+ color: var(--theme_g0);
}
.objectstore-bucket-detail {
@@ -535,6 +717,17 @@ a {
opacity: 0.7;
}
+/* Pager lives in the header row, to the right of the filter input. No
+ top margin since it's on the same baseline as tabs/filter. */
+.sessions-header-pager {
+ display: flex;
+ align-items: center;
+ gap: 6px;
+}
+.sessions-header-pager:empty {
+ display: none;
+}
+
/* expandable cell ---------------------------------------------------------- */
.zen_expand_icon {
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index a2a366a80..e9749afe8 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -981,10 +981,14 @@ ZenHubServer::Run()
SetNewState(kRunning);
- OnReady();
-
+ // Register the self-session and replay the backlog into it BEFORE
+ // OnReady disables the backlog — otherwise the in-proc session sink
+ // attaches against a disabled backlog and shows nothing from the
+ // startup window.
StartSelfSession("zenhub");
+ OnReady();
+
m_Http->Run(IsInteractiveMode);
SetNewState(kShuttingDown);
diff --git a/src/zenserver/proxy/zenproxyserver.cpp b/src/zenserver/proxy/zenproxyserver.cpp
index ffa9a4295..b3ce208c3 100644
--- a/src/zenserver/proxy/zenproxyserver.cpp
+++ b/src/zenserver/proxy/zenproxyserver.cpp
@@ -383,10 +383,14 @@ ZenProxyServer::Run()
SetNewState(kRunning);
- OnReady();
-
+ // Register the self-session and replay the backlog into it BEFORE
+ // OnReady disables the backlog — otherwise the in-proc session sink
+ // attaches against a disabled backlog and shows nothing from the
+ // startup window.
StartSelfSession("zenproxy");
+ OnReady();
+
m_Http->Run(IsInteractiveMode);
SetNewState(kShuttingDown);
diff --git a/src/zenserver/sessions/httpsessions.cpp b/src/zenserver/sessions/httpsessions.cpp
index 88db36828..1678ede60 100644
--- a/src/zenserver/sessions/httpsessions.cpp
+++ b/src/zenserver/sessions/httpsessions.cpp
@@ -7,8 +7,17 @@
#include <zencore/logging.h>
#include <zencore/string.h>
#include <zencore/trace.h>
+#include "logtemplate.h"
#include "sessions.h"
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <EASTL/fixed_list.h>
+#include <EASTL/fixed_vector.h>
+#include <json11.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <limits>
+
namespace zen {
using namespace std::literals;
@@ -21,13 +30,21 @@ HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService,
, m_StatsService(StatsService)
, m_Sessions(Sessions)
, m_PushTimer(IoContext)
+, m_CleanupTimer(IoContext)
+, m_LivenessTimer(IoContext)
{
Initialize();
}
HttpSessionsService::~HttpSessionsService()
{
+ // Break the callback edge before tearing anything else down so a
+ // late AppendLog on another thread can't fire BroadcastLogAppended
+ // after our subscriber list is gone.
+ m_Sessions.SetLogAppendedCallback({});
m_PushTimer.cancel();
+ m_CleanupTimer.cancel();
+ m_LivenessTimer.cancel();
m_StatsService.UnregisterHandler("sessions", *this);
m_StatusService.UnregisterHandler("sessions", *this);
}
@@ -135,12 +152,36 @@ HttpSessionsService::Initialize()
m_StatsService.RegisterHandler("sessions", *this);
m_StatusService.RegisterHandler("sessions", *this);
+ // Event-driven log push: the service fires this every time an entry
+ // is appended (including the synthetic "session ended" line emitted
+ // by RemoveSession). Subscribers receive a binary CB frame carrying
+ // the delta. Safe to call BroadcastLogAppended from any thread — it
+ // does its own locking and SendBinary is async-queued by the WS
+ // transport.
+ m_Sessions.SetLogAppendedCallback([this](const Oid& SessionId, uint64_t NewCursor) { BroadcastLogAppended(SessionId, NewCursor); });
+
EnqueuePushTimer();
+
+ // Run a cleanup pass shortly after startup so freshly-loaded historical
+ // data is pruned even if the server doesn't stay up for an hour.
+ m_CleanupTimer.expires_after(std::chrono::seconds(30));
+ m_CleanupTimer.async_wait([this](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+ RunCleanup();
+ EnqueueCleanupTimer();
+ });
+
+ EnqueueLivenessTimer();
}
static void
-WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info)
+WriteSessionInfo(CbWriter& Writer, const SessionsService::Session& Session)
{
+ const SessionsService::SessionInfo& Info = Session.Info();
+
Writer << "id" << Info.Id;
if (!Info.AppName.empty())
{
@@ -150,6 +191,18 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info)
{
Writer << "mode" << Info.Mode;
}
+ if (!Info.Platform.empty())
+ {
+ Writer << "platform" << Info.Platform;
+ }
+ if (Info.ClientPid != 0)
+ {
+ Writer << "pid" << Info.ClientPid;
+ }
+ if (Info.ParentSessionId != Oid::Zero)
+ {
+ Writer << "parent_session_id" << Info.ParentSessionId;
+ }
if (Info.JobId != Oid::Zero)
{
Writer << "jobid" << Info.JobId;
@@ -161,6 +214,11 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info)
Writer << "ended_at" << Info.EndedAt;
}
+ if (const uint64_t LogCount = Session.GetLogCount(); LogCount > 0)
+ {
+ Writer << "log_count" << LogCount;
+ }
+
if (Info.Metadata.GetSize() > 0)
{
Writer.AddObject("metadata"sv, Info.Metadata);
@@ -182,13 +240,13 @@ HttpSessionsService::BuildSessionListResponse()
for (const Ref<SessionsService::Session>& Session : Active)
{
Response.BeginObject();
- WriteSessionInfo(Response, Session->Info());
+ WriteSessionInfo(Response, *Session);
Response.EndObject();
}
for (const Ref<SessionsService::Session>& Session : Ended)
{
Response.BeginObject();
- WriteSessionInfo(Response, Session->Info());
+ WriteSessionInfo(Response, *Session);
Response.EndObject();
}
Response.EndArray();
@@ -231,7 +289,7 @@ HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req)
for (const Ref<SessionsService::Session>& Session : Sessions)
{
Response.BeginObject();
- WriteSessionInfo(Response, Session->Info());
+ WriteSessionInfo(Response, *Session);
Response.EndObject();
}
Response.EndArray();
@@ -262,24 +320,51 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
{
CbObject RequestObject = ServerRequest.ReadPayloadObject();
+ // Render the id into a stack buffer once for any success-reply
+ // paths below — avoids a std::string per POST/PUT.
+ char IdBuf[Oid::StringLength + 1] = {};
+ SessionId.ToString(IdBuf);
+ const std::string_view IdStr(IdBuf, Oid::StringLength);
+
if (ServerRequest.RequestVerb() == HttpVerb::kPost)
{
std::string AppName(RequestObject["appname"sv].AsString());
std::string Mode(RequestObject["mode"sv].AsString());
- Oid JobId = RequestObject["jobid"sv].AsObjectId();
- CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView();
+ std::string Platform(RequestObject["platform"sv].AsString());
+ Oid ParentSessionId = RequestObject["parent_session_id"sv].AsObjectId();
+ Oid JobId = RequestObject["jobid"sv].AsObjectId();
+ CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView();
+
+ // Only trust a client-reported pid when the HTTP layer
+ // says the request is local (unix socket or a loopback
+ // TCP peer). A remote client's pid refers to a different
+ // machine's process table — opening a local handle with
+ // it would at best be meaningless, at worst a liveness
+ // false positive.
+ uint32_t ClientPid = 0;
+ if (ServerRequest.IsLocalMachineRequest())
+ {
+ ClientPid = RequestObject["pid"sv].AsUInt32();
+ }
m_SessionsStats.SessionWriteCount++;
- if (m_Sessions.RegisterSession(SessionId, std::move(AppName), std::move(Mode), JobId, MetadataView))
+ if (m_Sessions.RegisterSession(SessionId,
+ std::move(AppName),
+ std::move(Mode),
+ std::move(Platform),
+ ClientPid,
+ ParentSessionId,
+ JobId,
+ MetadataView))
{
- return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", SessionId));
+ return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, IdStr);
}
else
{
// Already exists - try update instead
if (m_Sessions.UpdateSession(SessionId, MetadataView))
{
- return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", SessionId));
+ return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, IdStr);
}
return ServerRequest.WriteResponse(HttpResponseCode::InternalServerError);
}
@@ -290,7 +375,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
m_SessionsStats.SessionWriteCount++;
if (m_Sessions.UpdateSession(SessionId, RequestObject["metadata"sv].AsObjectView()))
{
- return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", SessionId));
+ return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, IdStr);
}
return ServerRequest.WriteResponse(HttpResponseCode::NotFound,
HttpContentType::kText,
@@ -304,7 +389,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
if (Session)
{
CbObjectWriter Response;
- WriteSessionInfo(Response, Session->Info());
+ WriteSessionInfo(Response, *Session);
return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save());
}
return ServerRequest.WriteResponse(HttpResponseCode::NotFound);
@@ -312,7 +397,7 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
case HttpVerb::kDelete:
{
m_SessionsStats.SessionDeleteCount++;
- if (m_Sessions.RemoveSession(SessionId))
+ if (m_Sessions.RemoveSession(SessionId, "client request"sv))
{
return ServerRequest.WriteResponse(HttpResponseCode::OK);
}
@@ -334,17 +419,33 @@ static void
WriteLogEntry(CbWriter& Writer, const SessionsService::LogEntry& Entry)
{
Writer << "timestamp" << Entry.Timestamp;
- if (!Entry.Level.empty())
+ if (Entry.Level != logging::Off)
{
- Writer << "level" << Entry.Level;
+ // Frontend renders on the string form (CSS class derives from it), so
+ // keep the wire format as the canonical lowercase name.
+ Writer << "level" << logging::ToString(Entry.Level);
}
- if (!Entry.Message.empty())
+ const std::string_view LoggerName{Entry.LoggerName};
+ if (!LoggerName.empty())
{
- Writer << "message" << Entry.Message;
+ Writer << "logger" << LoggerName;
}
- if (Entry.Data.GetSize() > 0)
+ const std::string_view Message{Entry.Message};
+ if (!Message.empty())
{
- Writer.AddObject("data"sv, Entry.Data);
+ Writer << "message" << Message;
+ }
+ // Structured-log form alongside the rendered message so a future UI
+ // can offer field-level drill-down without another schema bump. The
+ // existing UI only looks at "message" and is unaffected.
+ const std::string_view Format{Entry.Format};
+ if (!Format.empty())
+ {
+ Writer << "format" << Format;
+ if (Entry.Fields.GetSize() > 0)
+ {
+ Writer.AddObject("fields"sv, Entry.Fields);
+ }
}
}
@@ -378,12 +479,21 @@ HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req)
if (ServerRequest.RequestContentType() == HttpContentType::kText)
{
- // Raw text - split by newlines, one entry per line
+ // Raw text - split by newlines, one entry per line. Collect
+ // into a batch and append atomically: keeps a single client's
+ // payload contiguous on the wire even when other clients race
+ // in, and fires the WS push observer just once for the whole
+ // batch instead of once per line.
IoBuffer Payload = ServerRequest.ReadPayload();
std::string_view Text(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
const DateTime Now = DateTime::Now();
- size_t Pos = 0;
+ // 64 inline slots covers the typical SendLogBatch posting size
+ // (~50) without touching the heap. Spills to heap beyond that.
+ // LogEntryInput's string_views point into the request payload
+ // (Text), which lives for the duration of this handler.
+ eastl::fixed_vector<SessionsService::LogEntryInput, 64> Batch;
+ size_t Pos = 0;
while (Pos < Text.size())
{
size_t End = Text.find('\n', Pos);
@@ -401,60 +511,115 @@ HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req)
if (!Line.empty())
{
- Session->AppendLog(SessionsService::LogEntry{
+ Batch.push_back(SessionsService::LogEntryInput{
.Timestamp = Now,
- .Message = std::string(Line),
+ .Message = Line,
});
}
Pos = End + 1;
}
+ m_Sessions.AppendLogBatch(SessionId, Batch);
}
else
{
- // Structured log (JSON or CbObject)
- // Accepts a single record or an "entries" array of records
+ // Structured log (JSON or CbObject). Accepts a single record
+ // or an "entries" array of records — collect into a batch so
+ // a single POST lands atomically and fires one WS push.
CbObject RequestObject = ServerRequest.ReadPayloadObject();
const DateTime Now = DateTime::Now();
+ // 64 inline slots covers the typical SendLogBatch posting size
+ // (~50) without touching the heap. Spills to heap beyond that.
+ // LogEntryInput's string_views borrow from the parsed
+ // RequestObject's underlying buffer (the logger / message /
+ // format strings on the wire); we keep RequestObject alive
+ // for the whole intake.
+ eastl::fixed_vector<SessionsService::LogEntryInput, 64> Batch;
+
+ // Stable backing for messages we render from a structured
+ // template. fixed_list never moves nodes on insertion, so
+ // string_views into these strings stay valid until the list
+ // is destroyed at handler exit. 64 inline nodes match the
+ // batch's fixed-vector inline cap; spills to heap if a POST
+ // brings more.
+ eastl::fixed_list<std::string, 64> RenderedMessages;
+
auto AppendFromObject = [&](CbObjectView Obj) {
- CbFieldView LevelField = Obj["level"sv];
- std::string_view Level;
+ CbFieldView LevelField = Obj["level"sv];
+ logging::LogLevel Level = logging::Off;
if (LevelField.IsString())
{
- Level = LevelField.AsString();
+ Level = logging::ParseLogLevelString(LevelField.AsString());
}
else if (LevelField.IsInteger())
{
int32_t LevelInt = LevelField.AsInt32();
if (LevelInt >= 0 && LevelInt < logging::LogLevelCount)
{
- Level = logging::ToString(static_cast<logging::LogLevel>(LevelInt));
+ Level = static_cast<logging::LogLevel>(LevelInt);
}
}
- std::string Message(Obj["message"sv].AsString());
- CbObjectView DataView = Obj["data"sv].AsObjectView();
-
- Session->AppendLog(SessionsService::LogEntry{
- .Timestamp = Now,
- .Level = std::string(Level),
- .Message = std::move(Message),
- .Data = CbObject::Clone(DataView),
+ const std::string_view LoggerName = Obj["logger"sv].AsString();
+
+ // Two entry shapes. Structured entries carry `format` +
+ // `fields` and no `message` — we render the template right
+ // here so the rest of the pipeline (in-memory deque,
+ // persisted log.bin, UI GET response) keeps working the
+ // same way for both shapes.
+ CbFieldView FormatField = Obj["format"sv];
+ if (FormatField.IsString())
+ {
+ const std::string_view FormatView = FormatField.AsString();
+ CbObjectView FieldsView = Obj["fields"sv].AsObjectView();
+ ExtendableStringBuilder<256> RenderedBuilder;
+ RenderLogTemplate(FormatView, FieldsView, RenderedBuilder);
+
+ // Anchor the rendered string in the stable list so the
+ // LogEntryInput's view into it stays valid until the
+ // AppendLogBatch call below.
+ RenderedMessages.emplace_back(RenderedBuilder.ToView());
+ const std::string& StoredRendered = RenderedMessages.back();
+
+ Batch.push_back(SessionsService::LogEntryInput{
+ .Timestamp = Now,
+ .Level = Level,
+ .LoggerName = LoggerName,
+ .Message = StoredRendered,
+ .Format = FormatView,
+ .Fields = CbObject::Clone(FieldsView),
+ });
+ return;
+ }
+
+ // Plain entry.
+ Batch.push_back(SessionsService::LogEntryInput{
+ .Timestamp = Now,
+ .Level = Level,
+ .LoggerName = LoggerName,
+ .Message = Obj["message"sv].AsString(),
});
};
CbFieldView EntriesField = RequestObject["entries"sv];
if (EntriesField.IsArray())
{
- for (CbFieldView Entry : EntriesField)
+ // Pre-reserve so the 50-ish entries from a typical
+ // SendLogBatch don't trigger 4-5 reallocations as the
+ // vector grows.
+ CbArrayView Arr = EntriesField.AsArrayView();
+ Batch.reserve(Arr.Num());
+ for (CbFieldView Entry : Arr)
{
AppendFromObject(Entry.AsObjectView());
}
}
else
{
+ Batch.reserve(1);
AppendFromObject(RequestObject);
}
+ m_Sessions.AppendLogBatch(SessionId, Batch);
}
return ServerRequest.WriteResponse(HttpResponseCode::OK);
@@ -547,13 +712,78 @@ HttpSessionsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::s
{
ZEN_UNUSED(RelativeUri);
ZEN_INFO("Sessions WebSocket client connected");
- m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); });
+ const uint64_t NewId = m_NextSubscriberId.fetch_add(1, std::memory_order_relaxed);
+ m_WsConnectionsLock.WithExclusiveLock(
+ [&] { m_WsConnections.push_back(WsSubscriber{.Connection = std::move(Connection), .Id = NewId}); });
}
void
-HttpSessionsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/)
+HttpSessionsService::OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg)
{
- // No client-to-server messages expected
+ // Expected client→server protocol is JSON text frames; see
+ // sessions.js → _ws_send. Binary frames and malformed JSON are logged
+ // at debug and ignored so a confused client can't disturb others.
+ if (Msg.Opcode != WebSocketOpcode::kText)
+ {
+ return;
+ }
+ std::string_view PayloadText(static_cast<const char*>(Msg.Payload.GetData()), Msg.Payload.GetSize());
+ std::string ParseError;
+ json11::Json Parsed = json11::Json::parse(std::string(PayloadText), ParseError);
+ if (!ParseError.empty() || !Parsed.is_object())
+ {
+ ZEN_DEBUG("Ignoring malformed WebSocket frame: {}", ParseError.empty() ? "not an object" : ParseError);
+ return;
+ }
+
+ const std::string& Type = Parsed["type"].string_value();
+ if (Type == "sub_log")
+ {
+ const Oid SessionId = Oid::TryFromHexString(Parsed["session"].string_value());
+ if (SessionId == Oid::Zero)
+ {
+ ZEN_DEBUG("sub_log with invalid session id '{}'", Parsed["session"].string_value());
+ return;
+ }
+ // json11 reports int via int_value() (32-bit); cursors fit easily
+ // inside a session's lifetime so this is fine for the foreseeable
+ // future. Negative values are treated as 0.
+ const int CursorRaw = Parsed["cursor"].int_value();
+ const uint64_t Cursor = CursorRaw > 0 ? static_cast<uint64_t>(CursorRaw) : 0;
+
+ // Record the subscription and fire an immediate delta so we don't
+ // drop entries that landed between the client's HTTP replay and
+ // this frame. See BroadcastLogAppended for the broadcast flow.
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ for (WsSubscriber& Sub : m_WsConnections)
+ {
+ if (Sub.Connection.Get() == &Conn)
+ {
+ Sub.SubscribedSessionId = SessionId;
+ Sub.LastSentCursor = Cursor;
+ break;
+ }
+ }
+ });
+ // Pass UINT64_MAX to force a flush even if the cursor hasn't
+ // advanced — the subscriber's LastSentCursor may already lag the
+ // tail (e.g. rapid posts before the client subscribed).
+ BroadcastLogAppended(SessionId, std::numeric_limits<uint64_t>::max());
+ }
+ else if (Type == "unsub_log")
+ {
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ for (WsSubscriber& Sub : m_WsConnections)
+ {
+ if (Sub.Connection.Get() == &Conn)
+ {
+ Sub.Unsubscribe();
+ break;
+ }
+ }
+ });
+ }
+ // Unknown types are silently ignored so the protocol can grow.
}
void
@@ -561,8 +791,8 @@ HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]
{
ZEN_INFO("Sessions WebSocket client disconnected (code {})", Code);
m_WsConnectionsLock.WithExclusiveLock([&] {
- auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) {
- return C.Get() == &Conn;
+ auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const WsSubscriber& Sub) {
+ return Sub.Connection.Get() == &Conn;
});
m_WsConnections.erase(It, m_WsConnections.end());
});
@@ -571,8 +801,15 @@ HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]
void
HttpSessionsService::BroadcastSessions()
{
- std::vector<Ref<WebSocketConnection>> Connections;
- m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; });
+ // 8 inline slots covers any realistic number of concurrent UI tabs;
+ // spills to heap beyond that.
+ eastl::fixed_vector<Ref<WebSocketConnection>, 8> Connections;
+ m_WsConnectionsLock.WithSharedLock([&] {
+ for (const WsSubscriber& Sub : m_WsConnections)
+ {
+ Connections.push_back(Sub.Connection);
+ }
+ });
if (Connections.empty())
{
@@ -593,6 +830,107 @@ HttpSessionsService::BroadcastSessions()
}
void
+HttpSessionsService::BroadcastLogAppended(const Oid& SessionId, uint64_t NewCursor)
+{
+ Ref<SessionsService::Session> Session = m_Sessions.GetSession(SessionId);
+ if (!Session)
+ {
+ // Session vanished (e.g. pruned) between the append and the
+ // broadcast. No entries to ship.
+ return;
+ }
+
+ // Claim each subscriber's cursor and snapshot its delta atomically under
+ // the exclusive WS lock. Doing claim+fetch+cursor-bump together — rather
+ // than snapshot-shared / fetch-unlocked / bump-exclusive — closes the
+ // race where two concurrent BroadcastLogAppended calls would both
+ // observe the same FromCursor, fetch overlapping ranges, and ship the
+ // subscriber duplicate entries. Sends still happen after the lock is
+ // released to avoid holding it across async socket I/O.
+ struct PendingSend
+ {
+ Ref<WebSocketConnection> Connection;
+ SessionsService::Session::CursorResult Delta;
+ bool InitialSend; // true when FromCursor == 0
+ };
+ // 8 inline slots keeps the broadcast allocation-free for the typical UI
+ // case (1-2 tabs tailing one session); spills to heap if many clients
+ // happen to subscribe to the same session at once.
+ eastl::fixed_vector<PendingSend, 8> Sends;
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ for (WsSubscriber& Sub : m_WsConnections)
+ {
+ if (!Sub.IsSubscribedTo(SessionId))
+ {
+ continue;
+ }
+ // Cheap gate: if the subscriber already has everything up to
+ // NewCursor, skip. Sub_log uses UINT64_MAX to force a flush.
+ if (NewCursor != std::numeric_limits<uint64_t>::max() && Sub.LastSentCursor >= NewCursor)
+ {
+ continue;
+ }
+ if (!Sub.Connection->IsOpen())
+ {
+ continue;
+ }
+ const uint64_t FromCursor = Sub.LastSentCursor;
+ SessionsService::Session::CursorResult Delta = Session->GetLogEntriesAfter(FromCursor);
+ Sub.LastSentCursor = Delta.Cursor;
+ Sends.push_back({Sub.Connection, std::move(Delta), FromCursor == 0});
+ }
+ });
+ if (Sends.empty())
+ {
+ return;
+ }
+
+ // Render the hex id into a stack buffer — CbWriter only needs a
+ // string_view, so we avoid the 24-byte std::string allocation that
+ // Oid::ToString() would otherwise do on every broadcast. The buffer
+ // is StringLength + 1 because ToString writes a trailing NUL beyond
+ // the 24 hex chars; the view itself excludes the NUL.
+ char HexSessionIdBuf[Oid::StringLength + 1];
+ SessionId.ToString(HexSessionIdBuf);
+ const std::string_view HexSessionId(HexSessionIdBuf, Oid::StringLength);
+ for (const PendingSend& Send : Sends)
+ {
+ if (Send.Delta.Entries.empty() && !Send.InitialSend)
+ {
+ // Nothing new and the subscriber was primed — nothing to send.
+ continue;
+ }
+
+ // Binary CB frame — the client already has a CB parser
+ // (util/compactbinary.js). CB keeps structured entries typed end-
+ // to-end (hashes, ints, dates stay that way on the wire) and skips
+ // JSON escaping overhead on every append. Shape mirrors the HTTP
+ // GET response plus two routing fields (type + session). A fresh
+ // CbObjectWriter per iteration is required because the ctor calls
+ // BeginObject() to set up the implicit outer object — Save() then
+ // finalizes that object, leaving the writer in a state that
+ // Reset() doesn't restore.
+ CbObjectWriter Response;
+ Response << "type"sv
+ << "log"sv;
+ Response << "session"sv << HexSessionId;
+ Response << "cursor"sv << Send.Delta.Cursor;
+ Response << "count"sv << Send.Delta.Count;
+ Response.BeginArray("entries"sv);
+ for (const SessionsService::LogEntry& Entry : Send.Delta.Entries)
+ {
+ Response.BeginObject();
+ WriteLogEntry(Response, Entry);
+ Response.EndObject();
+ }
+ Response.EndArray();
+
+ CbObject Obj = Response.Save();
+ Send.Connection->SendBinary(Obj.GetView());
+ }
+}
+
+void
HttpSessionsService::EnqueuePushTimer()
{
m_PushTimer.expires_after(std::chrono::seconds(2));
@@ -607,4 +945,82 @@ HttpSessionsService::EnqueuePushTimer()
});
}
+//////////////////////////////////////////////////////////////////////////
+//
+// Periodic cleanup of expired / excess sessions
+//
+
+void
+HttpSessionsService::RunCleanup()
+{
+ const TimeSpan MaxAge = TimeSpan(SessionsService::kDefaultMaxSessionAgeDays, 0, 0, 0);
+ const size_t MaxCount = SessionsService::kDefaultMaxSessionCount;
+ const uint64_t MaxBytes = SessionsService::kDefaultMaxStorageBytes;
+ const SessionsService::PruneResult Result = m_Sessions.PruneExpired(MaxAge, MaxCount, MaxBytes);
+ if (Result.ExpiredByAge + Result.ExpiredByCount + Result.ExpiredByStorage > 0)
+ {
+ ZEN_INFO("Sessions cleanup: pruned {} by age, {} by count, {} by storage (max {} days, max {} sessions, max {} MiB)",
+ Result.ExpiredByAge,
+ Result.ExpiredByCount,
+ Result.ExpiredByStorage,
+ SessionsService::kDefaultMaxSessionAgeDays,
+ MaxCount,
+ MaxBytes / (1024 * 1024));
+ }
+}
+
+void
+HttpSessionsService::EnqueueCleanupTimer()
+{
+ m_CleanupTimer.expires_after(std::chrono::hours(1));
+ m_CleanupTimer.async_wait([this](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+ RunCleanup();
+ EnqueueCleanupTimer();
+ });
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Periodic liveness check for tracked local client processes
+//
+
+void
+HttpSessionsService::RunLivenessCheck()
+{
+ const size_t EndedByDeadClient = m_Sessions.CheckProcessLiveness();
+ if (EndedByDeadClient > 0)
+ {
+ ZEN_INFO("Sessions liveness: ended {} session(s) whose client process had exited", EndedByDeadClient);
+ }
+ else
+ {
+ // Debug-level so this doesn't spam at info every 30s, but lets an
+ // operator who's specifically investigating why their crashed
+ // session didn't clean up see whether anything is being tracked.
+ ZEN_DEBUG("Sessions liveness: no dead client processes found");
+ }
+}
+
+void
+HttpSessionsService::EnqueueLivenessTimer()
+{
+ // 30s strikes a balance between crash-detection latency and
+ // per-session OpenProcess/GetExitCode overhead. Active sessions with
+ // no reported pid (remote clients) are skipped in the inner loop so
+ // the cost scales with local sessions only.
+ m_LivenessTimer.expires_after(std::chrono::seconds(30));
+ m_LivenessTimer.async_wait([this](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+ RunLivenessCheck();
+ EnqueueLivenessTimer();
+ });
+}
+
} // namespace zen
diff --git a/src/zenserver/sessions/httpsessions.h b/src/zenserver/sessions/httpsessions.h
index 6ebe61c8d..2c0185176 100644
--- a/src/zenserver/sessions/httpsessions.h
+++ b/src/zenserver/sessions/httpsessions.h
@@ -13,6 +13,8 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <asio/steady_timer.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <atomic>
+
namespace zen {
class SessionsService;
@@ -69,14 +71,64 @@ private:
SessionsStats m_SessionsStats;
metrics::OperationTiming m_HttpRequests;
- // WebSocket push
- RwLock m_WsConnectionsLock;
- std::vector<Ref<WebSocketConnection>> m_WsConnections;
- asio::steady_timer m_PushTimer;
+ // WebSocket push.
+ //
+ // Each connection can subscribe to a single session's log stream. The
+ // subscription is optional (SubscribedSessionId == Oid::Zero means
+ // "session-list broadcasts only"). LastSentCursor is the cursor value
+ // most recently delivered for the subscribed session; the broadcaster
+ // uses it to pull the correct delta from the service.
+ //
+ // Id is a process-monotonic generation token assigned at OnWebSocket-
+ // Open. Pointer matching is fine for OnWebSocketMessage /
+ // OnWebSocketClose where the live `WebSocketConnection&` parameter is
+ // unambiguous; Id-based matching is reserved for any future code path
+ // that wants to refer to a subscriber across lock releases without
+ // risking a slot-reuse mix-up.
+ struct WsSubscriber
+ {
+ Ref<WebSocketConnection> Connection;
+ uint64_t Id = 0;
+ Oid SubscribedSessionId = Oid::Zero;
+ uint64_t LastSentCursor = 0;
+
+ // True iff the subscriber is currently subscribed to `Session`.
+ // Centralizes the (SubscribedSessionId != Oid::Zero) sentinel
+ // check that broadcaster, cursor-bump path, and any future filter
+ // would otherwise each open-code.
+ bool IsSubscribedTo(const Oid& Session) const { return SubscribedSessionId != Oid::Zero && SubscribedSessionId == Session; }
+
+ // Drop the active subscription. After this returns, IsSubscribedTo
+ // is false for every session id.
+ void Unsubscribe()
+ {
+ SubscribedSessionId = Oid::Zero;
+ LastSentCursor = 0;
+ }
+ };
+ RwLock m_WsConnectionsLock;
+ std::vector<WsSubscriber> m_WsConnections;
+ std::atomic_uint64_t m_NextSubscriberId{1}; // 0 reserved as "not yet assigned"
+ asio::steady_timer m_PushTimer;
void BroadcastSessions();
void EnqueuePushTimer();
+ // Event-driven log push. Called from SessionsService's log-appended
+ // callback; iterates subscribers of the given session and ships any
+ // entries they haven't seen yet.
+ void BroadcastLogAppended(const Oid& SessionId, uint64_t NewCursor);
+
+ // Periodic cleanup of old / excess ended sessions
+ asio::steady_timer m_CleanupTimer;
+ void EnqueueCleanupTimer();
+ void RunCleanup();
+
+ // Periodic client-process liveness check for locally-connected sessions.
+ asio::steady_timer m_LivenessTimer;
+ void EnqueueLivenessTimer();
+ void RunLivenessCheck();
+
Oid m_SelfSessionId = Oid::Zero;
CbObject BuildSessionListResponse();
diff --git a/src/zenserver/sessions/inprocsessionlogsink.cpp b/src/zenserver/sessions/inprocsessionlogsink.cpp
index 04c5f7312..c935522bc 100644
--- a/src/zenserver/sessions/inprocsessionlogsink.cpp
+++ b/src/zenserver/sessions/inprocsessionlogsink.cpp
@@ -12,28 +12,31 @@ static constexpr uint64_t UnixEpochBiasSeconds = uint64_t(double(1970 - 1) * 365
static DateTime
TimePointToDateTime(logging::LogClock::time_point Time)
{
- auto Duration = Time.time_since_epoch();
- auto Seconds = std::chrono::duration_cast<std::chrono::seconds>(Duration);
- uint64_t Ticks = (UnixEpochBiasSeconds + static_cast<uint64_t>(Seconds.count())) * TimeSpan::TicksPerSecond;
- return DateTime{Ticks};
+ // DateTime ticks are 100 ns each. Splitting the time_point into whole-second
+ // and sub-second parts and converting both lets us preserve sub-second
+ // precision; the previous implementation truncated to seconds, which made
+ // every entry land at .000 ms in tail / dashboard renderings.
+ auto Duration = Time.time_since_epoch();
+ auto Seconds = std::chrono::duration_cast<std::chrono::seconds>(Duration);
+ auto SubSecondNanos = std::chrono::duration_cast<std::chrono::nanoseconds>(Duration - Seconds);
+ uint64_t SecondsTicks = (UnixEpochBiasSeconds + static_cast<uint64_t>(Seconds.count())) * TimeSpan::TicksPerSecond;
+ uint64_t SubSecondTicks = static_cast<uint64_t>(SubSecondNanos.count()) / static_cast<uint64_t>(TimeSpan::NanosecondsPerTick);
+ return DateTime{SecondsTicks + SubSecondTicks};
}
void
InProcSessionLogSink::Log(const logging::LogMessage& Msg)
{
- Ref<SessionsService::Session> Session = m_Service.GetSession(m_SessionId);
- if (!Session)
- {
- return;
- }
-
- SessionsService::LogEntry Entry{
- .Timestamp = TimePointToDateTime(Msg.GetTime()),
- .Level = std::string(logging::ToString(Msg.GetLevel())),
- .Message = std::string(Msg.GetPayload()),
- };
-
- Session->AppendLog(std::move(Entry));
+ // Route through the service-level AppendLog so the log-appended
+ // callback fires — otherwise WS subscribers tailing the self-session
+ // don't see in-proc lines until they reload and re-fetch via HTTP.
+ m_Service.AppendLog(m_SessionId,
+ SessionsService::LogEntryInput{
+ .Timestamp = TimePointToDateTime(Msg.GetTime()),
+ .Level = Msg.GetLevel(),
+ .LoggerName = Msg.GetLoggerName(),
+ .Message = Msg.GetPayload(),
+ });
}
} // namespace zen
diff --git a/src/zenserver/sessions/logtemplate.cpp b/src/zenserver/sessions/logtemplate.cpp
new file mode 100644
index 000000000..b4d8f37e8
--- /dev/null
+++ b/src/zenserver/sessions/logtemplate.cpp
@@ -0,0 +1,390 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "logtemplate.h"
+
+#include <zencore/fmtutils.h>
+#include <zencore/guid.h>
+#include <zencore/iohash.h>
+#include <zencore/string.h>
+#include <zencore/uid.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+using namespace std::literals;
+
+namespace {
+
+ // Bounded recursion so pathological nesting (e.g. an object that references
+ // itself through $format) can't stack-overflow the server. Depth counts
+ // every nested template expansion OR value descent.
+ constexpr size_t kMaxRecursionDepth = 16;
+
+ void RenderTemplateInto(std::string_view Template, CbObjectView Fields, StringBuilderBase& Out, bool Localized, size_t Depth);
+ void RenderValue(CbFieldView Field, StringBuilderBase& Out, bool Localized, size_t Depth);
+
+ //////////////////////////////////////////////////////////////////////////
+ //
+ // Path resolution: walk a UE field_path — `name` followed by zero or
+ // more `.name` / `[N]` segments — starting from the fields root. Returns
+ // an empty field on any miss.
+ //
+
+ CbFieldView ResolvePath(CbObjectView Root, std::string_view Path)
+ {
+ CbFieldView Cur;
+ bool Entered = false; // have we applied at least one segment?
+
+ const auto ApplyName = [&](std::string_view Name) -> bool {
+ if (Name.empty())
+ {
+ return false;
+ }
+ if (!Entered)
+ {
+ Cur = Root[Name];
+ }
+ else
+ {
+ if (!Cur.IsObject())
+ {
+ return false;
+ }
+ Cur = Cur.AsObjectView()[Name];
+ }
+ Entered = true;
+ return Cur.operator bool();
+ };
+
+ const auto ApplyIndex = [&](uint64_t Idx) -> bool {
+ if (!Entered || !Cur.IsArray())
+ {
+ return false;
+ }
+ uint64_t N = 0;
+ for (CbFieldView Elem : Cur.AsArrayView().CreateViewIterator())
+ {
+ if (N == Idx)
+ {
+ Cur = Elem;
+ return Cur.operator bool();
+ }
+ ++N;
+ }
+ return false;
+ };
+
+ size_t i = 0;
+ while (i < Path.size())
+ {
+ const char C = Path[i];
+ if (C == '.')
+ {
+ ++i;
+ continue;
+ }
+ if (C == '[')
+ {
+ const size_t End = Path.find(']', i + 1);
+ if (End == std::string_view::npos)
+ {
+ return {};
+ }
+ uint64_t Idx = 0;
+ for (size_t j = i + 1; j < End; ++j)
+ {
+ const char D = Path[j];
+ if (D < '0' || D > '9')
+ {
+ return {};
+ }
+ Idx = Idx * 10 + uint64_t(D - '0');
+ }
+ if (!ApplyIndex(Idx))
+ {
+ return {};
+ }
+ i = End + 1;
+ continue;
+ }
+ // Name segment: run until the next '.' or '['.
+ const size_t NameStart = i;
+ while (i < Path.size() && Path[i] != '.' && Path[i] != '[')
+ {
+ ++i;
+ }
+ if (!ApplyName(Path.substr(NameStart, i - NameStart)))
+ {
+ return {};
+ }
+ }
+ return Entered ? Cur : CbFieldView{};
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ //
+ // Primitive rendering. Uses natural string forms for each CbField type.
+ // Non-string values are emitted without quotes — the caller (the JSON-ish
+ // fallback) adds quotes only around string values.
+ //
+
+ void RenderPrimitive(CbFieldView Field, StringBuilderBase& Out)
+ {
+ if (Field.IsString())
+ {
+ Out << Field.AsString();
+ return;
+ }
+ if (Field.IsBool())
+ {
+ Out << (Field.AsBool() ? "true"sv : "false"sv);
+ return;
+ }
+ if (Field.IsInteger())
+ {
+ Out << Field.AsInt64();
+ return;
+ }
+ if (Field.IsFloat())
+ {
+ // format_to into the builder directly — avoids the std::string
+ // fmt::format would otherwise build just to hand to Append.
+ fmt::format_to(StringBuilderAppender(Out), "{}", Field.AsDouble());
+ return;
+ }
+ if (Field.IsDateTime())
+ {
+ Out.Append(Field.AsDateTime().ToIso8601());
+ return;
+ }
+ if (Field.IsObjectId())
+ {
+ // ToString(char[]) writes the 24-char hex into a caller buffer;
+ // the std::string overload would allocate.
+ char Buf[Oid::StringLength + 1] = {};
+ Field.AsObjectId().ToString(Buf);
+ Out << std::string_view(Buf, Oid::StringLength);
+ return;
+ }
+ if (Field.IsHash())
+ {
+ // Appender overload writes the 40-char hex directly into the
+ // builder; the std::string overload would allocate.
+ Field.AsHash().ToHexString(Out);
+ return;
+ }
+ if (Field.IsUuid())
+ {
+ Guid G = Field.AsUuid();
+ G.ToString(Out);
+ return;
+ }
+ if (Field.IsNull())
+ {
+ Out << "null"sv;
+ return;
+ }
+ // Binary / attachment / custom / unknown → emit nothing rather than
+ // a stream of garbage bytes.
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ //
+ // JSON-ish fallback for bare objects / nested arrays. Compact single-line
+ // with quoted string keys and string values, raw other types. Intended
+ // for debug display — not strictly RFC-8259 JSON.
+ //
+
+ void AppendJsonishString(std::string_view S, StringBuilderBase& Out)
+ {
+ Out << '"';
+ for (char C : S)
+ {
+ switch (C)
+ {
+ case '"':
+ Out << "\\\""sv;
+ break;
+ case '\\':
+ Out << "\\\\"sv;
+ break;
+ case '\n':
+ Out << "\\n"sv;
+ break;
+ case '\r':
+ Out << "\\r"sv;
+ break;
+ case '\t':
+ Out << "\\t"sv;
+ break;
+ default:
+ Out << C;
+ break;
+ }
+ }
+ Out << '"';
+ }
+
+ void AppendJsonishValue(CbFieldView Field, StringBuilderBase& Out, bool Localized, size_t Depth)
+ {
+ if (Field.IsString())
+ {
+ AppendJsonishString(Field.AsString(), Out);
+ return;
+ }
+ // Non-string leaves and nested objects/arrays go through RenderValue
+ // so object short-circuits ($text / $format / ...) still apply.
+ RenderValue(Field, Out, Localized, Depth);
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ //
+ // Value rendering (the decision tree from the plan).
+ //
+
+ void RenderValue(CbFieldView Field, StringBuilderBase& Out, bool Localized, size_t Depth)
+ {
+ if (Depth >= kMaxRecursionDepth)
+ {
+ Out << "…"sv;
+ return;
+ }
+ if (Field.IsObject())
+ {
+ CbObjectView Obj = Field.AsObjectView();
+
+ if (CbFieldView Text = Obj["$text"sv]; Text.IsString())
+ {
+ Out << Text.AsString();
+ return;
+ }
+ if (CbFieldView Format = Obj["$format"sv]; Format.IsString())
+ {
+ RenderTemplateInto(Format.AsString(), Obj, Out, /*Localized=*/false, Depth + 1);
+ return;
+ }
+ if (CbFieldView LocFormat = Obj["$locformat"sv]; LocFormat.IsString())
+ {
+ RenderTemplateInto(LocFormat.AsString(), Obj, Out, /*Localized=*/true, Depth + 1);
+ return;
+ }
+
+ // Bare object — JSON-ish fallback.
+ Out << '{';
+ bool First = true;
+ for (CbFieldView Entry : Obj.CreateViewIterator())
+ {
+ if (!First)
+ {
+ Out << ", "sv;
+ }
+ First = false;
+ AppendJsonishString(Entry.GetName(), Out);
+ Out << ": "sv;
+ AppendJsonishValue(Entry, Out, Localized, Depth + 1);
+ }
+ Out << '}';
+ return;
+ }
+ if (Field.IsArray())
+ {
+ Out << '[';
+ bool First = true;
+ for (CbFieldView Elem : Field.AsArrayView().CreateViewIterator())
+ {
+ if (!First)
+ {
+ Out << ", "sv;
+ }
+ First = false;
+ AppendJsonishValue(Elem, Out, Localized, Depth + 1);
+ }
+ Out << ']';
+ return;
+ }
+ RenderPrimitive(Field, Out);
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ //
+ // Template tokenizer + renderer.
+ //
+
+ void RenderTemplateInto(std::string_view Template, CbObjectView Fields, StringBuilderBase& Out, bool Localized, size_t Depth)
+ {
+ if (Depth >= kMaxRecursionDepth)
+ {
+ Out << "…"sv;
+ return;
+ }
+
+ size_t i = 0;
+ while (i < Template.size())
+ {
+ const char C = Template[i];
+
+ // Localized escape: ` followed by {, }, or ` → literal.
+ if (Localized && C == '`' && i + 1 < Template.size())
+ {
+ const char Next = Template[i + 1];
+ if (Next == '{' || Next == '}' || Next == '`')
+ {
+ Out << Next;
+ i += 2;
+ continue;
+ }
+ }
+
+ // Non-localized escape: {{ or }} → literal { or }.
+ if (!Localized && C == '{' && i + 1 < Template.size() && Template[i + 1] == '{')
+ {
+ Out << '{';
+ i += 2;
+ continue;
+ }
+ if (!Localized && C == '}' && i + 1 < Template.size() && Template[i + 1] == '}')
+ {
+ Out << '}';
+ i += 2;
+ continue;
+ }
+
+ if (C == '{')
+ {
+ // Placeholder: scan until matching '}'.
+ const size_t End = Template.find('}', i + 1);
+ if (End == std::string_view::npos)
+ {
+ // Unterminated placeholder — emit the rest literally so we
+ // don't silently drop data. UE would have asserted at emit.
+ Out << Template.substr(i);
+ return;
+ }
+ const std::string_view Path = Template.substr(i + 1, End - i - 1);
+ const CbFieldView Resolved = ResolvePath(Fields, Path);
+ if (Resolved)
+ {
+ RenderValue(Resolved, Out, Localized, Depth + 1);
+ }
+ // Missing placeholder: emit nothing. (UE asserts at emit time,
+ // so in well-formed input this never fires.)
+ i = End + 1;
+ continue;
+ }
+
+ Out << C;
+ ++i;
+ }
+ }
+
+} // namespace
+
+void
+RenderLogTemplate(std::string_view Template, CbObjectView Fields, StringBuilderBase& Out, bool Localized)
+{
+ RenderTemplateInto(Template, Fields, Out, Localized, 0);
+}
+
+} // namespace zen
diff --git a/src/zenserver/sessions/logtemplate.h b/src/zenserver/sessions/logtemplate.h
new file mode 100644
index 000000000..e8b07e63d
--- /dev/null
+++ b/src/zenserver/sessions/logtemplate.h
@@ -0,0 +1,42 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinary.h>
+#include <zencore/string.h>
+
+#include <string_view>
+
+namespace zen {
+
+/// Render a UE structured-log template (as produced by UE_LOGFMT) against a
+/// fields bag, writing the result into a caller-provided builder. Grammar:
+///
+/// format := (text | escape | placeholder)*
+/// escape := '{{' | '}}' (non-localized, default)
+/// | '`{' | '`}' | '``' (localized, $locformat)
+/// placeholder := '{' field_path '}'
+/// field_path := name ('.' name | '[' digits ']')*
+/// name := [A-Za-z0-9] [A-Za-z0-9_]* (leading '_' reserved)
+///
+/// There are NO inline format specs — `{Name:spec}` is not part of the
+/// grammar. Formatting control lives on the value side via nested
+/// objects carrying `$text` / `$format` / `$locformat` (see the value-
+/// rendering rules in logtemplate.cpp).
+///
+/// Missing paths render as empty (UE asserts these at emit time, so in
+/// practice they don't occur; the empty-render is defensive). Unknown
+/// primitive CbField types render as empty.
+///
+/// Typical use: pass an `ExtendableStringBuilder<256>` so typical messages
+/// render on the stack with no heap allocation. The builder is appended to,
+/// not cleared, so callers can compose multiple writes if they want.
+///
+/// @param Template The format string.
+/// @param Fields The top-level fields bag referenced by placeholders.
+/// @param Out Builder to append the rendered text to.
+/// @param Localized True for $locformat templates (backtick escapes);
+/// false (default) for the top-level `format` field.
+void RenderLogTemplate(std::string_view Template, CbObjectView Fields, StringBuilderBase& Out, bool Localized = false);
+
+} // namespace zen
diff --git a/src/zenserver/sessions/sessions.cpp b/src/zenserver/sessions/sessions.cpp
index 9d4e3120c..470117c6a 100644
--- a/src/zenserver/sessions/sessions.cpp
+++ b/src/zenserver/sessions/sessions.cpp
@@ -3,59 +3,733 @@
#include "sessions.h"
#include <zencore/basicfile.h>
+#include <zencore/compactbinarybuilder.h>
#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
#include <zencore/logging.h>
+#include <mutex>
+
namespace zen {
using namespace std::literals;
+namespace {
+
+ // Per-session log file layout:
+ // [LogFileHeader][Record]*
+ // where Record = [uint32_t ByteLength][CbObject bytes of ByteLength].
+ // Records are written in order. A partial trailing record (short write after
+ // crash) is ignored on load.
+ constexpr uint32_t kLogFileMagic = 0x5A534C47u; // 'ZSLG'
+ constexpr uint32_t kLogFileVersion = 1;
+
+#pragma pack(push, 1)
+ struct LogFileHeader
+ {
+ uint32_t Magic;
+ uint32_t Version;
+ };
+#pragma pack(pop)
+
+ constexpr uint64_t kLogFileHeaderSize = sizeof(LogFileHeader);
+
+ void WriteLogEntryFields(CbObjectWriter& Writer, const SessionsService::LogEntryInput& Entry)
+ {
+ Writer << "ts" << Entry.Timestamp;
+ if (Entry.Level != logging::Off)
+ {
+ // Store as a small integer (int8_t range) rather than a string —
+ // fixed-width, one byte in the serialized CbObject.
+ Writer << "lvl" << static_cast<int32_t>(Entry.Level);
+ }
+ if (!Entry.LoggerName.empty())
+ {
+ Writer << "cat" << Entry.LoggerName;
+ }
+ if (!Entry.Message.empty())
+ {
+ Writer << "msg" << Entry.Message;
+ }
+ // Structured-log template + fields. Only present for UE_LOGFMT-shaped
+ // entries; Message already holds the rendered text for those so a
+ // reader that ignores these two can still display the line.
+ if (!Entry.Format.empty())
+ {
+ Writer << "fmt" << Entry.Format;
+ if (Entry.Fields.GetSize() > 0)
+ {
+ Writer.AddObject("flds"sv, Entry.Fields);
+ }
+ }
+ }
+
+ // Parse a serialized record into an input form. The string_views in
+ // the result borrow from `Obj`'s underlying buffer; the caller must
+ // keep that buffer alive (or arena-copy via PreloadEntries) before
+ // the views are used.
+ bool ReadLogEntry(CbObjectView Obj, SessionsService::LogEntryInput& OutInput)
+ {
+ CbFieldView TsField = Obj["ts"sv];
+ if (!TsField)
+ {
+ return false;
+ }
+ OutInput.Timestamp = TsField.AsDateTime();
+
+ // New format: integer. Legacy format (pre-refactor log.bin files on
+ // this same branch): string. Accept either so existing persisted
+ // entries keep their level when loaded.
+ CbFieldView LvlField = Obj["lvl"sv];
+ if (LvlField.IsInteger())
+ {
+ const int32_t Lvl = LvlField.AsInt32();
+ if (Lvl >= 0 && Lvl < logging::LogLevelCount)
+ {
+ OutInput.Level = static_cast<logging::LogLevel>(Lvl);
+ }
+ }
+ else if (LvlField.IsString())
+ {
+ OutInput.Level = logging::ParseLogLevelString(LvlField.AsString());
+ }
+
+ OutInput.LoggerName = Obj["cat"sv].AsString();
+ OutInput.Message = Obj["msg"sv].AsString();
+ OutInput.Format = Obj["fmt"sv].AsString();
+ if (CbObjectView FieldsView = Obj["flds"sv].AsObjectView(); FieldsView.GetSize() > 0)
+ {
+ OutInput.Fields = CbObject::Clone(FieldsView);
+ }
+ return true;
+ }
+
+ void WriteSessionInfoFields(CbObjectWriter& Writer, const SessionsService::SessionInfo& Info)
+ {
+ Writer << "id" << Info.Id;
+ if (!Info.AppName.empty())
+ {
+ Writer << "app" << Info.AppName;
+ }
+ if (!Info.Mode.empty())
+ {
+ Writer << "mode" << Info.Mode;
+ }
+ if (!Info.Platform.empty())
+ {
+ Writer << "plat" << Info.Platform;
+ }
+ if (Info.ClientPid != 0)
+ {
+ Writer << "pid" << Info.ClientPid;
+ }
+ if (Info.ParentSessionId != Oid::Zero)
+ {
+ Writer << "parent_session_id" << Info.ParentSessionId;
+ }
+ if (Info.JobId != Oid::Zero)
+ {
+ Writer << "jobid" << Info.JobId;
+ }
+ Writer << "ca" << Info.CreatedAt;
+ Writer << "ua" << Info.UpdatedAt;
+ if (Info.EndedAt.GetTicks() != 0)
+ {
+ Writer << "ea" << Info.EndedAt;
+ }
+ if (Info.Metadata.GetSize() > 0)
+ {
+ Writer.AddObject("meta"sv, Info.Metadata);
+ }
+ }
+
+ bool ReadSessionInfo(CbObjectView Obj, SessionsService::SessionInfo& OutInfo)
+ {
+ OutInfo.Id = Obj["id"sv].AsObjectId();
+ if (OutInfo.Id == Oid::Zero)
+ {
+ return false;
+ }
+ OutInfo.AppName = std::string(Obj["app"sv].AsString());
+ OutInfo.Mode = std::string(Obj["mode"sv].AsString());
+ OutInfo.Platform = std::string(Obj["plat"sv].AsString());
+ OutInfo.ClientPid = Obj["pid"sv].AsUInt32();
+ OutInfo.ParentSessionId = Obj["parent_session_id"sv].AsObjectId();
+ OutInfo.JobId = Obj["jobid"sv].AsObjectId();
+ OutInfo.CreatedAt = Obj["ca"sv].AsDateTime();
+ OutInfo.UpdatedAt = Obj["ua"sv].AsDateTime();
+ OutInfo.EndedAt = Obj["ea"sv].AsDateTime(DateTime{0});
+ CbObjectView MetaView = Obj["meta"sv].AsObjectView();
+ if (MetaView.GetSize() > 0)
+ {
+ OutInfo.Metadata = CbObject::Clone(MetaView);
+ }
+ return true;
+ }
+
+ std::filesystem::path SessionDir(const std::filesystem::path& Root, const Oid& Id) { return Root / Id.ToString(); }
+
+#if ZEN_PLATFORM_WINDOWS
+ // Turn a Windows process exit code into a human-friendly termination
+ // reason. Most abnormal terminations surface as NTSTATUS values (high
+ // bit set); the ones below are what you'll actually encounter in the
+ // wild. Anything we don't recognize falls through to a formatted hex
+ // (NTSTATUS-shaped) or decimal (application-level) exit code.
+ std::string DescribeWindowsExitCode(uint32_t ExitCode)
+ {
+ struct Named
+ {
+ uint32_t Code;
+ std::string_view Name;
+ };
+ using namespace std::literals;
+ static constexpr Named kKnown[] = {
+ {0xC000013Au, "interrupted (Ctrl-C)"sv},
+ {0xC0000005u, "access violation"sv},
+ {0xC000001Du, "illegal instruction"sv},
+ {0xC0000094u, "integer divide by zero"sv},
+ {0xC0000096u, "privileged instruction"sv},
+ {0xC00000FDu, "stack overflow"sv},
+ {0xC0000409u, "stack buffer overrun"sv},
+ {0xC0000374u, "heap corruption"sv},
+ {0xC0000135u, "DLL not found"sv},
+ {0xC0000142u, "DLL initialization failed"sv},
+ {0xC000007Bu, "invalid image format"sv},
+ {0xC0000420u, "assertion failure"sv},
+ {0xC0000008u, "invalid handle"sv},
+ {0xC000008Eu, "float divide by zero"sv},
+ {0xC0000091u, "float overflow"sv},
+ {0xC0000093u, "float underflow"sv},
+ {0x80000003u, "breakpoint"sv},
+ {0x40000015u, "fatal app exit"sv},
+ };
+ for (const Named& Entry : kKnown)
+ {
+ if (Entry.Code == ExitCode)
+ {
+ return fmt::format("process exited ({}, exit code 0x{:08X})", Entry.Name, ExitCode);
+ }
+ }
+ // NTSTATUS-shaped codes have the high bit set; show them as hex so
+ // they're recognizable (and matchable against Microsoft's doc tables).
+ if ((ExitCode & 0x80000000u) != 0)
+ {
+ return fmt::format("process exited (exit code 0x{:08X})", ExitCode);
+ }
+ return fmt::format("process exited (exit code {})", ExitCode);
+ }
+#endif
+
+} // namespace
+
class SessionLog : public TRefCounted<SessionLog>
{
public:
- SessionLog(std::filesystem::path LogFilePath) { m_LogFile.Open(LogFilePath, BasicFile::Mode::kWrite); }
+ explicit SessionLog(std::filesystem::path LogFilePath);
+
+ void Append(const SessionsService::LogEntryInput& Entry);
+
+ // LoadTail returns input-form entries: their string_views borrow
+ // from m_OwnedBuffers (held internally) so the caller's PreloadEntries
+ // can intern/copy them into the session arena before the buffers are
+ // dropped at LoadResult destruction.
+ struct LoadResult
+ {
+ std::vector<SessionsService::LogEntryInput> TailEntries;
+ // Backing memory for the views in TailEntries. Each ParsedRecord
+ // keeps a CbObject alive whose payload bytes back the strings.
+ std::vector<CbObject> OwnedBuffers;
+ uint64_t TotalCount = 0;
+ };
+ LoadResult LoadTail(size_t MaxEntries);
private:
- BasicFile m_LogFile;
+ static LoggerRef Log()
+ {
+ static LoggerRef L(logging::Get("sessions"));
+ return L;
+ }
+
+ std::filesystem::path m_Path;
+ std::mutex m_Mutex;
+ BasicFile m_File;
+ uint64_t m_WriteOffset = 0;
+ bool m_Enabled = false;
};
-class SessionLogStore
+SessionLog::SessionLog(std::filesystem::path LogFilePath) : m_Path(std::move(LogFilePath))
{
-public:
- SessionLogStore(std::filesystem::path StoragePath) : m_StoragePath(std::move(StoragePath)) {}
+ std::error_code Ec;
+ std::filesystem::create_directories(m_Path.parent_path(), Ec);
- ~SessionLogStore() = default;
+ m_File.Open(m_Path, BasicFile::Mode::kWrite, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Session log '{}' could not be opened: {} - persistence disabled", m_Path, Ec.message());
+ return;
+ }
- Ref<SessionLog> GetLogForSession(const Oid& SessionId)
+ const uint64_t Size = m_File.FileSize(Ec);
+ if (Ec)
{
- // For now, just return a new log for each session. We can implement actual log storage and retrieval later.
- return Ref(new SessionLog(m_StoragePath / (SessionId.ToString() + ".log")));
+ m_File.Close();
+ ZEN_WARN("Session log '{}' could not be sized: {} - persistence disabled", m_Path, Ec.message());
+ return;
}
- Ref<SessionLog> CreateLogForSession(const Oid& SessionId)
+ LogFileHeader Header{};
+ bool NeedsInit = Size < kLogFileHeaderSize;
+ if (!NeedsInit)
+ {
+ // Read is throwing-only; guard so a read failure doesn't escape.
+ try
+ {
+ m_File.Read(&Header, sizeof(Header), 0);
+ }
+ catch (const std::exception& E)
+ {
+ ZEN_WARN("Session log '{}' header read failed: {} - reinitializing", m_Path, E.what());
+ NeedsInit = true;
+ }
+ if (!NeedsInit && (Header.Magic != kLogFileMagic || Header.Version != kLogFileVersion))
+ {
+ NeedsInit = true;
+ }
+ }
+
+ if (NeedsInit)
+ {
+ m_File.SetFileSize(0);
+ Header = LogFileHeader{.Magic = kLogFileMagic, .Version = kLogFileVersion};
+ m_File.Write(&Header, sizeof(Header), 0, Ec);
+ if (Ec)
+ {
+ m_File.Close();
+ ZEN_WARN("Session log '{}' header write failed: {} - persistence disabled", m_Path, Ec.message());
+ return;
+ }
+ m_WriteOffset = kLogFileHeaderSize;
+ }
+ else
+ {
+ m_WriteOffset = Size;
+ }
+
+ m_Enabled = true;
+}
+
+void
+SessionLog::Append(const SessionsService::LogEntryInput& Entry)
+{
+ if (!m_Enabled)
{
- // For now, just return a new log for each session. We can implement actual log storage and retrieval later.
- return Ref(new SessionLog(m_StoragePath / (SessionId.ToString() + ".log")));
+ return;
}
+ CbObjectWriter Writer;
+ WriteLogEntryFields(Writer, Entry);
+ CbObject Obj = Writer.Save();
+
+ // Write directly from the CbObject's owned buffer — no need to allocate
+ // a fresh UniqueBuffer and memcpy just to hand the bytes to BasicFile::Write.
+ const MemoryView View = Obj.GetView();
+ const uint64_t ObjSize = View.GetSize();
+ if (ObjSize == 0 || ObjSize > std::numeric_limits<uint32_t>::max())
+ {
+ return;
+ }
+ const uint32_t Len = static_cast<uint32_t>(ObjSize);
+
+ std::lock_guard<std::mutex> Lock(m_Mutex);
+ std::error_code Ec;
+ m_File.Write(&Len, sizeof(Len), m_WriteOffset, Ec);
+ if (Ec)
+ {
+ return;
+ }
+ m_File.Write(View.GetData(), ObjSize, m_WriteOffset + sizeof(Len), Ec);
+ if (Ec)
+ {
+ return;
+ }
+ m_WriteOffset += sizeof(Len) + ObjSize;
+}
+
+SessionLog::LoadResult
+SessionLog::LoadTail(size_t MaxEntries)
+{
+ std::lock_guard<std::mutex> Lock(m_Mutex);
+ LoadResult Result;
+
+ if (!m_Enabled)
+ {
+ return Result;
+ }
+
+ std::error_code Ec;
+ const uint64_t Size = m_File.FileSize(Ec);
+ if (Ec || Size <= kLogFileHeaderSize)
+ {
+ return Result;
+ }
+
+ IoBuffer Buffer;
+ try
+ {
+ Buffer = m_File.ReadRange(kLogFileHeaderSize, Size - kLogFileHeaderSize);
+ }
+ catch (const std::exception& E)
+ {
+ ZEN_WARN("Session log '{}' tail read failed: {}", m_Path, E.what());
+ return Result;
+ }
+ const uint8_t* Data = reinterpret_cast<const uint8_t*>(Buffer.GetData());
+ const uint64_t DataSize = Buffer.GetSize();
+
+ // Walk all valid record positions, ignoring any partial trailing record.
+ struct RecRef
+ {
+ const uint8_t* Ptr;
+ uint32_t Len;
+ };
+ std::vector<RecRef> Records;
+ uint64_t Pos = 0;
+ while (Pos + sizeof(uint32_t) <= DataSize)
+ {
+ uint32_t RecLen = 0;
+ std::memcpy(&RecLen, Data + Pos, sizeof(RecLen));
+ const uint64_t Next = Pos + sizeof(uint32_t) + RecLen;
+ if (RecLen == 0 || Next > DataSize)
+ {
+ break;
+ }
+ Records.push_back(RecRef{.Ptr = Data + Pos + sizeof(uint32_t), .Len = RecLen});
+ Pos = Next;
+ }
+
+ // Parse every record so TotalCount reflects how many actually decode
+ // — cursors anchor to this number, and counting CB-corrupt records
+ // would shift subsequent cursor math off. Only the trailing window
+ // of size MaxEntries is materialized into TailEntries; head records
+ // are parsed purely for the count and discarded.
+ const size_t TailStart = Records.size() > MaxEntries ? Records.size() - MaxEntries : 0;
+ Result.TailEntries.reserve(Records.size() - TailStart);
+ Result.OwnedBuffers.reserve(Records.size() - TailStart);
+
+ for (size_t i = 0; i < Records.size(); ++i)
+ {
+ try
+ {
+ IoBuffer RecBuf = IoBufferBuilder::MakeCloneFromMemory(MemoryView(Records[i].Ptr, Records[i].Len));
+ CbObject Obj = LoadCompactBinaryObject(std::move(RecBuf));
+
+ SessionsService::LogEntryInput Input;
+ if (ReadLogEntry(Obj, Input))
+ {
+ ++Result.TotalCount;
+ if (i >= TailStart)
+ {
+ // Keep the CbObject alive for as long as the views
+ // in the input are needed — PreloadEntries will copy
+ // the strings into the session arena and we drop the
+ // buffer set when LoadResult is destroyed.
+ Result.TailEntries.push_back(std::move(Input));
+ Result.OwnedBuffers.push_back(std::move(Obj));
+ }
+ }
+ }
+ catch (const std::exception&)
+ {
+ // Skip malformed record — does not contribute to TotalCount.
+ }
+ }
+
+ return Result;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+class SessionLogStore
+{
+public:
+ explicit SessionLogStore(std::filesystem::path StoragePath);
+
+ Ref<SessionLog> GetOrCreateLogForSession(const Oid& SessionId);
+ void WriteSessionInfoFile(const SessionsService::SessionInfo& Info);
+ void DeleteSession(const Oid& SessionId);
+ uint64_t GetSessionSize(const Oid& SessionId) const;
+
+ struct PersistedSession
+ {
+ SessionsService::SessionInfo Info;
+ std::filesystem::path LogPath;
+ };
+ std::vector<PersistedSession> Scan() const;
+
private:
+ static LoggerRef Log()
+ {
+ static LoggerRef L(logging::Get("sessions"));
+ return L;
+ }
+
std::filesystem::path m_StoragePath;
};
-SessionsService::Session::Session(const SessionInfo& Info) : m_Info(Info)
+SessionLogStore::SessionLogStore(std::filesystem::path StoragePath) : m_StoragePath(std::move(StoragePath))
{
+ std::error_code Ec;
+ std::filesystem::create_directories(m_StoragePath, Ec);
+}
+
+Ref<SessionLog>
+SessionLogStore::GetOrCreateLogForSession(const Oid& SessionId)
+{
+ const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId);
+ std::error_code Ec;
+ std::filesystem::create_directories(Dir, Ec);
+ return Ref(new SessionLog(Dir / "log.bin"));
+}
+
+void
+SessionLogStore::DeleteSession(const Oid& SessionId)
+{
+ const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId);
+ std::error_code Ec;
+ std::filesystem::remove_all(Dir, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to remove session directory '{}': {}", Dir, Ec.message());
+ }
+}
+
+uint64_t
+SessionLogStore::GetSessionSize(const Oid& SessionId) const
+{
+ const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId);
+ std::error_code Ec;
+ uint64_t Total = 0;
+ std::filesystem::directory_iterator It{Dir, Ec};
+ if (Ec)
+ {
+ return 0;
+ }
+ for (const std::filesystem::directory_entry& Entry : It)
+ {
+ std::error_code FileEc;
+ if (Entry.is_regular_file(FileEc))
+ {
+ const uintmax_t Size = Entry.file_size(FileEc);
+ if (!FileEc)
+ {
+ Total += uint64_t(Size);
+ }
+ }
+ }
+ return Total;
}
-SessionsService::Session::~Session() = default;
void
-SessionsService::Session::AppendLog(LogEntry Entry)
+SessionLogStore::WriteSessionInfoFile(const SessionsService::SessionInfo& Info)
{
+ const std::filesystem::path Dir = SessionDir(m_StoragePath, Info.Id);
+ std::error_code Ec;
+ std::filesystem::create_directories(Dir, Ec);
+
+ CbObjectWriter Writer;
+ WriteSessionInfoFields(Writer, Info);
+ CbObject Obj = Writer.Save();
+ const MemoryView View = Obj.GetView();
+ if (View.GetSize() == 0)
+ {
+ return;
+ }
+ TemporaryFile::SafeWriteFile(Dir / "info.cb", View, Ec);
+}
+
+std::vector<SessionLogStore::PersistedSession>
+SessionLogStore::Scan() const
+{
+ std::vector<PersistedSession> Result;
+ std::error_code Ec;
+
+ if (!std::filesystem::exists(m_StoragePath, Ec))
+ {
+ return Result;
+ }
+
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator{m_StoragePath, Ec})
+ {
+ if (Ec || !Entry.is_directory(Ec))
+ {
+ continue;
+ }
+
+ const std::filesystem::path InfoPath = Entry.path() / "info.cb";
+ const std::filesystem::path LogPath = Entry.path() / "log.bin";
+
+ if (!std::filesystem::exists(InfoPath, Ec))
+ {
+ continue;
+ }
+
+ try
+ {
+ BasicFile InfoFile;
+ std::error_code OpenEc;
+ InfoFile.Open(InfoPath, BasicFile::Mode::kRead, OpenEc);
+ if (OpenEc)
+ {
+ continue;
+ }
+ IoBuffer InfoBuf = InfoFile.ReadAll();
+ if (InfoBuf.GetSize() == 0)
+ {
+ continue;
+ }
+ CbObject InfoObj = LoadCompactBinaryObject(std::move(InfoBuf));
+ PersistedSession PS{.Info = SessionsService::SessionInfo{
+ .Id = Oid::Zero,
+ .CreatedAt = DateTime{0},
+ .UpdatedAt = DateTime{0},
+ }};
+ if (!ReadSessionInfo(InfoObj, PS.Info))
+ {
+ continue;
+ }
+ PS.LogPath = LogPath;
+ Result.push_back(std::move(PS));
+ }
+ catch (const std::exception& E)
+ {
+ ZEN_WARN("Skipping session directory '{}': {}", Entry.path(), E.what());
+ }
+ }
+
+ return Result;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+SessionsService::Session::Session(const SessionInfo& Info, Ref<SessionLog> Log, ProcessHandle ClientProcess)
+: m_Info(Info)
+, m_Log(std::move(Log))
+, m_ClientProcess(std::move(ClientProcess))
+{
+}
+SessionsService::Session::~Session() = default;
+
+const char*
+SessionsService::Session::InternLoggerNameLocked(std::string_view Name)
+{
+ if (Name.empty())
+ {
+ return "";
+ }
+ if (auto It = m_InternedLoggerNames.find(Name); It != m_InternedLoggerNames.end())
+ {
+ return It->second;
+ }
+ const char* Arena = m_LogArena.DuplicateString(Name);
+ // The map's key view borrows from Arena (which lives as long as the
+ // session does), so it's safe to outlive `Name`.
+ m_InternedLoggerNames.emplace(std::string_view{Arena, Name.size()}, Arena);
+ return Arena;
+}
+
+const char*
+SessionsService::Session::AllocateLogStringLocked(std::string_view Str)
+{
+ if (Str.empty())
+ {
+ return "";
+ }
+ return m_LogArena.DuplicateString(Str);
+}
+
+uint64_t
+SessionsService::Session::AppendLog(LogEntryInput Input)
+{
+ // Persist first (outside the deque lock ordering) so disk I/O doesn't
+ // starve cursor readers holding the shared lock. The SessionLog has its
+ // own internal mutex that serializes file writes.
+ if (m_Log)
+ {
+ m_Log->Append(Input);
+ }
+
RwLock::ExclusiveLockScope Lock(m_LogLock);
- m_LogEntries.push_back(std::move(Entry));
- ++m_TotalAppended;
+ m_LogEntries.emplace_back();
+ LogEntry& Entry = m_LogEntries.back();
+ Entry.Timestamp = Input.Timestamp;
+ Entry.Level = Input.Level;
+ Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName);
+ Entry.Message = AllocateLogStringLocked(Input.Message);
+ Entry.Format = AllocateLogStringLocked(Input.Format);
+ Entry.Fields = std::move(Input.Fields);
+ const uint64_t NewCursor = ++m_TotalAppended;
while (m_LogEntries.size() > MaxLogEntries)
{
m_LogEntries.pop_front();
}
+ return NewCursor;
+}
+
+uint64_t
+SessionsService::Session::AppendLogBatch(std::span<LogEntryInput> Inputs)
+{
+ if (Inputs.empty())
+ {
+ return 0;
+ }
+
+ // Persist first (per-entry; SessionLog's internal mutex serializes
+ // these writes). We do this outside m_LogLock so file I/O doesn't
+ // stall cursor readers.
+ if (m_Log)
+ {
+ for (LogEntryInput& Input : Inputs)
+ {
+ m_Log->Append(Input);
+ }
+ }
+
+ RwLock::ExclusiveLockScope Lock(m_LogLock);
+ for (LogEntryInput& Input : Inputs)
+ {
+ m_LogEntries.emplace_back();
+ LogEntry& Entry = m_LogEntries.back();
+ Entry.Timestamp = Input.Timestamp;
+ Entry.Level = Input.Level;
+ Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName);
+ Entry.Message = AllocateLogStringLocked(Input.Message);
+ Entry.Format = AllocateLogStringLocked(Input.Format);
+ Entry.Fields = std::move(Input.Fields);
+ ++m_TotalAppended;
+ }
+ while (m_LogEntries.size() > MaxLogEntries)
+ {
+ m_LogEntries.pop_front();
+ }
+ return m_TotalAppended;
+}
+
+void
+SessionsService::Session::PreloadEntries(std::span<const LogEntryInput> Tail, uint64_t TotalCount)
+{
+ RwLock::ExclusiveLockScope Lock(m_LogLock);
+ m_LogEntries.clear();
+ for (const LogEntryInput& Input : Tail)
+ {
+ m_LogEntries.emplace_back();
+ LogEntry& Entry = m_LogEntries.back();
+ Entry.Timestamp = Input.Timestamp;
+ Entry.Level = Input.Level;
+ Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName);
+ Entry.Message = AllocateLogStringLocked(Input.Message);
+ Entry.Format = AllocateLogStringLocked(Input.Format);
+ Entry.Fields = Input.Fields;
+ }
+ m_TotalAppended = TotalCount;
}
std::vector<SessionsService::LogEntry>
@@ -118,17 +792,134 @@ SessionsService::Session::GetLogEntriesAfter(uint64_t AfterCursor) const
};
}
+uint64_t
+SessionsService::AppendLog(const Oid& SessionId, LogEntryInput Input)
+{
+ // Resolve the session without holding any external lock — GetSession
+ // acquires m_Lock shared briefly and returns a ref-counted handle.
+ Ref<Session> Target = GetSession(SessionId);
+ if (!Target)
+ {
+ return 0;
+ }
+
+ const uint64_t NewCursor = Target->AppendLog(std::move(Input));
+
+ // Fire after Session::m_LogLock is released (inside AppendLog) so the
+ // callback can safely call back into this service (e.g. to resolve
+ // the session again and fetch the delta for its subscribers) without
+ // any nested-lock concerns.
+ if (m_LogAppendedCallback)
+ {
+ m_LogAppendedCallback(SessionId, NewCursor);
+ }
+ return NewCursor;
+}
+
+uint64_t
+SessionsService::AppendLogBatch(const Oid& SessionId, std::span<LogEntryInput> Inputs)
+{
+ if (Inputs.empty())
+ {
+ return 0;
+ }
+ Ref<Session> Target = GetSession(SessionId);
+ if (!Target)
+ {
+ return 0;
+ }
+
+ const uint64_t NewCursor = Target->AppendLogBatch(Inputs);
+
+ // One callback fires for the whole batch — subscribers see all
+ // entries in a single delta rather than N separate deltas. Fired
+ // after Session::m_LogLock is released for the same reason as the
+ // single-entry path.
+ if (m_LogAppendedCallback)
+ {
+ m_LogAppendedCallback(SessionId, NewCursor);
+ }
+ return NewCursor;
+}
+
+void
+SessionsService::SetLogAppendedCallback(LogAppendedCallback Callback)
+{
+ m_LogAppendedCallback = std::move(Callback);
+}
+
//////////////////////////////////////////////////////////////////////////
-SessionsService::SessionsService() : m_Log(logging::Get("sessions"))
+SessionsService::SessionsService(std::filesystem::path StorageRoot) : m_Log(logging::Get("sessions"))
{
+ if (StorageRoot.empty())
+ {
+ return;
+ }
+
+ m_SessionLogs = std::make_unique<SessionLogStore>(StorageRoot);
+
+ // Load all previously-persisted sessions as ended sessions. Their log
+ // tails are preloaded into the in-memory deque so the UI can view them.
+ std::vector<SessionLogStore::PersistedSession> Persisted = m_SessionLogs->Scan();
+ for (SessionLogStore::PersistedSession& PS : Persisted)
+ {
+ // Sessions that were active at shutdown need a synthetic ended time so
+ // they sort correctly in the UI.
+ if (PS.Info.EndedAt.GetTicks() == 0)
+ {
+ PS.Info.EndedAt = PS.Info.UpdatedAt;
+ }
+
+ Ref<Session> S = Ref(new Session(PS.Info));
+
+ // Load the log tail (no SessionLog attached — historical sessions do
+ // not receive new appends and the file handle is released here).
+ // PreloadEntries copies/interns each input's strings into the
+ // session's arena, after which Loaded.OwnedBuffers can be
+ // released along with the rest of the LoadResult.
+ SessionLog Reader(PS.LogPath);
+ SessionLog::LoadResult Loaded = Reader.LoadTail(Session::MaxLogEntries);
+ S->PreloadEntries(Loaded.TailEntries, Loaded.TotalCount);
+
+ m_EndedSessions.push_back(std::move(S));
+ }
+
+ ZEN_INFO("Sessions service loaded {} persisted session(s) from '{}'", m_EndedSessions.size(), StorageRoot);
}
SessionsService::~SessionsService() = default;
bool
-SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, const Oid& JobId, CbObjectView Metadata)
+SessionsService::RegisterSession(const Oid& SessionId,
+ std::string AppName,
+ std::string Mode,
+ std::string Platform,
+ uint32_t ClientPid,
+ const Oid& ParentSessionId,
+ const Oid& JobId,
+ CbObjectView Metadata)
{
+ // Open a process handle eagerly — BEFORE any pid-reuse window opens. On
+ // Windows the handle is tied to the specific process instance, so a
+ // later pid recycle can't fool later liveness checks. Do the syscall
+ // outside the service lock (it's a kernel round-trip). On POSIX this is
+ // effectively a no-op (just stores the pid).
+ ProcessHandle ClientProcess;
+ if (ClientPid != 0)
+ {
+ std::error_code Ec;
+ ClientProcess.Initialize(static_cast<int>(ClientPid), Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Session {} registered with pid {} but OpenProcess failed: {} — liveness tracking disabled",
+ SessionId,
+ ClientPid,
+ Ec.message());
+ }
+ }
+
+ SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}};
// Log outside the lock scope - InProcSessionLogSink calls back into
// GetSession() which acquires m_Lock shared, so logging while holding
// m_Lock exclusively would deadlock.
@@ -141,35 +932,69 @@ SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, std:
}
const DateTime Now = DateTime::Now();
- m_Sessions.emplace(SessionId,
- Ref(new Session(SessionInfo{.Id = SessionId,
- .AppName = AppName,
- .Mode = Mode,
- .JobId = JobId,
- .Metadata = CbObject::Clone(Metadata),
- .CreatedAt = Now,
- .UpdatedAt = Now})));
+ SessionInfo Info{.Id = SessionId,
+ .AppName = AppName,
+ .Mode = Mode,
+ .Platform = Platform,
+ .ClientPid = ClientPid,
+ .ParentSessionId = ParentSessionId,
+ .JobId = JobId,
+ .Metadata = CbObject::Clone(Metadata),
+ .CreatedAt = Now,
+ .UpdatedAt = Now};
+
+ Ref<SessionLog> Log;
+ if (m_SessionLogs)
+ {
+ Log = m_SessionLogs->GetOrCreateLogForSession(SessionId);
+ }
+ m_Sessions.emplace(SessionId, Ref(new Session(Info, std::move(Log), std::move(ClientProcess))));
+ PersistedInfo = std::move(Info);
+ }
+
+ if (m_SessionLogs)
+ {
+ m_SessionLogs->WriteSessionInfoFile(PersistedInfo);
}
- ZEN_INFO("Session {} registered (AppName: {}, Mode: {}, JobId: {})", SessionId, AppName, Mode, JobId);
+ // Include the tracked pid so the log makes it obvious whether the client
+ // opted into liveness tracking (and whether our OpenProcess succeeded).
+ // "pid: 0" = no liveness tracking (remote or client didn't report);
+ // "pid: N" with an immediately-prior warning = OpenProcess failed.
+ ZEN_INFO("Session {} registered (AppName: {}, Mode: {}, Platform: {}, Pid: {}, ParentSessionId: {}, JobId: {})",
+ SessionId,
+ AppName,
+ Mode,
+ Platform,
+ ClientPid,
+ ParentSessionId,
+ JobId);
return true;
}
bool
SessionsService::UpdateSession(const Oid& SessionId, CbObjectView Metadata)
{
- RwLock::ExclusiveLockScope Lock(m_Lock);
-
- auto It = m_Sessions.find(SessionId);
- if (It == m_Sessions.end())
+ SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}};
{
- return false;
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+
+ auto It = m_Sessions.find(SessionId);
+ if (It == m_Sessions.end())
+ {
+ return false;
+ }
+
+ It.value()->UpdateMetadata(Metadata);
+ PersistedInfo = It.value()->Info();
}
- It.value()->UpdateMetadata(Metadata);
+ if (m_SessionLogs)
+ {
+ m_SessionLogs->WriteSessionInfoFile(PersistedInfo);
+ }
- const SessionInfo& Info = It.value()->Info();
- ZEN_DEBUG("Session {} updated (AppName: {}, JobId: {})", SessionId, Info.AppName, Info.JobId);
+ ZEN_DEBUG("Session {} updated (AppName: {}, JobId: {})", SessionId, PersistedInfo.AppName, PersistedInfo.JobId);
return true;
}
@@ -178,13 +1003,23 @@ SessionsService::GetSession(const Oid& SessionId) const
{
RwLock::SharedLockScope Lock(m_Lock);
- auto It = m_Sessions.find(SessionId);
- if (It == m_Sessions.end())
+ if (auto It = m_Sessions.find(SessionId); It != m_Sessions.end())
{
- return {};
+ return It->second;
+ }
+
+ // Fall back to ended sessions so HTTP consumers can fetch logs/metadata
+ // for sessions that have finished (including sessions loaded from disk on
+ // startup as historical ended sessions).
+ for (const Ref<Session>& Ended : m_EndedSessions)
+ {
+ if (Ended->Info().Id == SessionId)
+ {
+ return Ended;
+ }
}
- return It->second;
+ return {};
}
std::vector<Ref<SessionsService::Session>>
@@ -202,10 +1037,14 @@ SessionsService::GetSessions() const
}
bool
-SessionsService::RemoveSession(const Oid& SessionId)
+SessionsService::RemoveSession(const Oid& SessionId, std::string_view Reason)
{
- std::string RemovedAppName;
- Oid RemovedJobId;
+ std::string RemovedAppName;
+ Oid RemovedJobId;
+ SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}};
+ bool Persist = false;
+ Ref<Session> Ended;
+ const DateTime EndTime = DateTime::Now();
{
RwLock::ExclusiveLockScope Lock(m_Lock);
@@ -219,13 +1058,50 @@ SessionsService::RemoveSession(const Oid& SessionId)
RemovedAppName = It.value()->Info().AppName;
RemovedJobId = It.value()->Info().JobId;
- Ref<Session> Ended = It.value();
- Ended->SetEndedAt(DateTime::Now());
- m_EndedSessions.push_back(std::move(Ended));
+ Ended = It.value();
+ Ended->SetEndedAt(EndTime);
+ if (m_SessionLogs)
+ {
+ PersistedInfo = Ended->Info();
+ Persist = true;
+ }
+ m_EndedSessions.push_back(Ended);
m_Sessions.erase(It);
}
+ // Synthetic "Session ended" entry is appended *after* m_Lock is
+ // released. AppendLog hits disk via SessionLog::Append, so doing it
+ // inside the exclusive scope would block every other session lookup
+ // / registration / list while the I/O completes. The callback that
+ // pushes the delta to WS subscribers also fires from outside the
+ // lock — same self-deadlock concern as the GetSession path.
+ uint64_t SyntheticEndCursor = 0;
+ {
+ ExtendableStringBuilder<128> MsgBuilder;
+ MsgBuilder << "Session ended"sv;
+ if (!Reason.empty())
+ {
+ MsgBuilder << ": "sv << Reason;
+ }
+ SyntheticEndCursor = Ended->AppendLog(LogEntryInput{
+ .Timestamp = EndTime,
+ .Level = logging::Info,
+ .LoggerName = "sessions"sv,
+ .Message = MsgBuilder.ToView(),
+ });
+ }
+
+ if (m_LogAppendedCallback)
+ {
+ m_LogAppendedCallback(SessionId, SyntheticEndCursor);
+ }
+
+ if (Persist)
+ {
+ m_SessionLogs->WriteSessionInfoFile(PersistedInfo);
+ }
+
ZEN_INFO("Session {} removed (AppName: {}, JobId: {})", SessionId, RemovedAppName, RemovedJobId);
return true;
}
@@ -244,4 +1120,202 @@ SessionsService::GetSessionCount() const
return m_Sessions.size();
}
+// The "when was this session last relevant" timestamp used for age checks
+// and count-based eviction ordering: EndedAt if set, otherwise UpdatedAt.
+static DateTime
+ReferenceTime(const SessionsService::SessionInfo& Info)
+{
+ return Info.EndedAt.GetTicks() != 0 ? Info.EndedAt : Info.UpdatedAt;
+}
+
+size_t
+SessionsService::CheckProcessLiveness()
+{
+ // Snapshot active sessions under a shared lock, then probe liveness and
+ // drop dead ones without holding the service lock (IsRunning() is a
+ // kernel round-trip and RemoveSession() takes the lock exclusively).
+ std::vector<Ref<Session>> Candidates;
+ {
+ RwLock::SharedLockScope Lock(m_Lock);
+ Candidates.reserve(m_Sessions.size());
+ for (const auto& [Id, SessionRef] : m_Sessions)
+ {
+ if (SessionRef->GetClientProcess().IsValid())
+ {
+ Candidates.push_back(SessionRef);
+ }
+ }
+ }
+
+ size_t Ended = 0;
+ for (const Ref<Session>& S : Candidates)
+ {
+ // m_ClientProcess is set once at construction and never mutated, so
+ // reading it here without synchronization is safe.
+ if (!S->GetClientProcess().IsRunning())
+ {
+ // Build the termination reason. On Windows, GetExitCode() returns
+ // the real OS exit code and is cheap right after !IsRunning(); we
+ // map the common NTSTATUS codes (Ctrl-C, access violation, DLL
+ // init failure, …) to human-readable names. On POSIX the exit
+ // code is only populated via Wait() which we never call, so
+ // stick with the plain reason.
+ std::string Reason = "process exited";
+#if ZEN_PLATFORM_WINDOWS
+ const uint32_t ExitCode = static_cast<uint32_t>(S->GetClientProcess().GetExitCode());
+ if (ExitCode != 0)
+ {
+ Reason = DescribeWindowsExitCode(ExitCode);
+ }
+#endif
+ if (RemoveSession(S->Info().Id, Reason))
+ {
+ ++Ended;
+ }
+ }
+ }
+ return Ended;
+}
+
+SessionsService::PruneResult
+SessionsService::PruneExpired(TimeSpan MaxAge, size_t MaxCount, uint64_t MaxStorageBytes)
+{
+ PruneResult Result;
+ std::vector<Oid> ToDelete;
+
+ // Phase 1: age + count pruning (fast; purely in-memory).
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+
+ const uint64_t NowTicks = DateTime::Now().GetTicks();
+ const uint64_t CutoffTicks = NowTicks > MaxAge.GetTicks() ? NowTicks - MaxAge.GetTicks() : 0;
+ const DateTime Cutoff{CutoffTicks};
+
+ // Age-based pruning: drop ended sessions whose reference time is
+ // older than Cutoff.
+ auto ExpiredIt = std::remove_if(m_EndedSessions.begin(), m_EndedSessions.end(), [&](const Ref<Session>& S) {
+ if (ReferenceTime(S->Info()) < Cutoff)
+ {
+ ToDelete.push_back(S->Info().Id);
+ return true;
+ }
+ return false;
+ });
+ Result.ExpiredByAge = size_t(m_EndedSessions.end() - ExpiredIt);
+ m_EndedSessions.erase(ExpiredIt, m_EndedSessions.end());
+
+ // Count-based pruning: keep at most MaxCount sessions total. Active
+ // sessions are never touched; if there are already >= MaxCount active
+ // sessions then all ended sessions get evicted.
+ const size_t ActiveCount = m_Sessions.size();
+ const size_t EndedTarget = MaxCount > ActiveCount ? MaxCount - ActiveCount : 0;
+ if (m_EndedSessions.size() > EndedTarget)
+ {
+ const size_t ToRemove = m_EndedSessions.size() - EndedTarget;
+ // Move the `ToRemove` oldest entries to the front.
+ std::partial_sort(
+ m_EndedSessions.begin(),
+ m_EndedSessions.begin() + ToRemove,
+ m_EndedSessions.end(),
+ [](const Ref<Session>& A, const Ref<Session>& B) { return ReferenceTime(A->Info()) < ReferenceTime(B->Info()); });
+ for (size_t i = 0; i < ToRemove; ++i)
+ {
+ ToDelete.push_back(m_EndedSessions[i]->Info().Id);
+ }
+ m_EndedSessions.erase(m_EndedSessions.begin(), m_EndedSessions.begin() + ToRemove);
+ Result.ExpiredByCount = ToRemove;
+ }
+ }
+
+ // Phase 1 disk deletion, outside the service lock.
+ if (m_SessionLogs)
+ {
+ for (const Oid& Id : ToDelete)
+ {
+ m_SessionLogs->DeleteSession(Id);
+ }
+ }
+
+ // Phase 2: storage-footprint pruning. Snapshot remaining ended sessions
+ // (id + reference time) under a shared lock, then stat each directory
+ // outside the lock so we don't hold writers off during filesystem calls.
+ if (!m_SessionLogs)
+ {
+ return Result;
+ }
+
+ struct Candidate
+ {
+ Oid Id;
+ DateTime RefTime;
+ uint64_t Size = 0;
+ };
+ std::vector<Candidate> Candidates;
+ {
+ RwLock::SharedLockScope Lock(m_Lock);
+ Candidates.reserve(m_EndedSessions.size());
+ for (const Ref<Session>& S : m_EndedSessions)
+ {
+ Candidates.push_back(Candidate{.Id = S->Info().Id, .RefTime = ReferenceTime(S->Info())});
+ }
+ }
+
+ uint64_t TotalBytes = 0;
+ for (Candidate& C : Candidates)
+ {
+ C.Size = m_SessionLogs->GetSessionSize(C.Id);
+ TotalBytes += C.Size;
+ }
+
+ if (TotalBytes <= MaxStorageBytes)
+ {
+ return Result;
+ }
+
+ // Oldest first so we evict in chronological order.
+ std::sort(Candidates.begin(), Candidates.end(), [](const Candidate& A, const Candidate& B) { return A.RefTime < B.RefTime; });
+
+ std::vector<Oid> StorageDelete;
+ uint64_t Reclaimed = 0;
+ const uint64_t NeedBytes = TotalBytes - MaxStorageBytes;
+ for (const Candidate& C : Candidates)
+ {
+ if (Reclaimed >= NeedBytes)
+ {
+ break;
+ }
+ StorageDelete.push_back(C.Id);
+ Reclaimed += C.Size;
+ }
+
+ if (StorageDelete.empty())
+ {
+ return Result;
+ }
+
+ // Erase from m_EndedSessions under exclusive lock. Concurrent RemoveSession
+ // calls between the snapshot and here will have inserted new entries at
+ // the back, which we safely leave alone.
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ tsl::robin_map<Oid, uint8_t, Oid::Hasher> IdSet;
+ for (const Oid& Id : StorageDelete)
+ {
+ IdSet[Id] = 1;
+ }
+ auto It = std::remove_if(m_EndedSessions.begin(), m_EndedSessions.end(), [&](const Ref<Session>& S) {
+ return IdSet.contains(S->Info().Id);
+ });
+ m_EndedSessions.erase(It, m_EndedSessions.end());
+ }
+
+ for (const Oid& Id : StorageDelete)
+ {
+ m_SessionLogs->DeleteSession(Id);
+ }
+ Result.ExpiredByStorage = StorageDelete.size();
+
+ return Result;
+}
+
} // namespace zen
diff --git a/src/zenserver/sessions/sessions.h b/src/zenserver/sessions/sessions.h
index a84ca6506..a722704e0 100644
--- a/src/zenserver/sessions/sessions.h
+++ b/src/zenserver/sessions/sessions.h
@@ -4,6 +4,8 @@
#include <zencore/compactbinary.h>
#include <zencore/logbase.h>
+#include <zencore/memory/memoryarena.h>
+#include <zencore/process.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
@@ -11,7 +13,10 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <EASTL/deque.h>
#include <tsl/robin_map.h>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <filesystem>
+#include <functional>
#include <optional>
+#include <span>
#include <string>
#include <vector>
@@ -34,25 +39,62 @@ public:
Oid Id;
std::string AppName;
std::string Mode;
- Oid JobId;
- CbObject Metadata;
- DateTime CreatedAt;
- DateTime UpdatedAt;
- DateTime EndedAt{0};
+ std::string Platform; // Reported by the client, e.g. "windows", "linux", "macos"
+ uint32_t ClientPid = 0; // Non-zero = local PID to probe for liveness. 0 = don't track.
+ Oid ParentSessionId;
+ // Optional task/action identifier used to associate this session with a
+ // specific unit of work. Distinct from ParentSessionId, which records
+ // process/session ancestry.
+ Oid JobId;
+ CbObject Metadata;
+ DateTime CreatedAt;
+ DateTime UpdatedAt;
+ DateTime EndedAt{0};
};
+ /// Stored form of a log entry. The string fields are arena-borrowed
+ /// `const char*` — they live in the owning Session's MemoryArena and
+ /// are valid only for that Session's lifetime. Default copy is
+ /// intentionally shallow (string pointers are shared with the source);
+ /// callers must not let copies outlive the originating Session.
+ ///
+ /// Build entries via `LogEntryInput` and route them through
+ /// `Session::AppendLog` / `AppendLogBatch`, which intern logger names
+ /// and arena-allocate the other strings before storing.
struct LogEntry
{
- DateTime Timestamp;
- std::string Level;
- std::string Message;
- CbObject Data;
+ DateTime Timestamp{0};
+ // Sentinel: Off means "no level set" (e.g. plain-text POSTed entries
+ // where the client didn't include a level). Real log entries use
+ // Trace..Critical, so Off is free to reuse as "omit on serialize".
+ logging::LogLevel Level = logging::Off;
+ // Arena pointers (null-terminated). Empty string is the default
+ // — never null, so callers don't need to guard.
+ const char* LoggerName = ""; // Interned: one canonical copy per unique name across the session.
+ const char* Message = ""; // For structured entries: the rendered form (populated at intake).
+ const char* Format = ""; // UE_LOGFMT template; "" for plain entries.
+ CbObject Fields; // Present only when Format is non-empty.
+ };
+
+ /// Input form used to build an entry on the way into a Session. The
+ /// string_view fields are caller-borrowed; AppendLog interns/copies
+ /// them into the Session's arena before any LogEntry is built. Use
+ /// this struct rather than constructing LogEntry directly so the
+ /// arena ownership invariant stays one-sided.
+ struct LogEntryInput
+ {
+ DateTime Timestamp{0};
+ logging::LogLevel Level = logging::Off;
+ std::string_view LoggerName;
+ std::string_view Message;
+ std::string_view Format;
+ CbObject Fields;
};
class Session : public TRefCounted<Session>
{
public:
- Session(const SessionInfo& Info);
+ Session(const SessionInfo& Info, Ref<SessionLog> Log = {}, ProcessHandle ClientProcess = {});
~Session();
Session(Session&&) = delete;
@@ -67,12 +109,29 @@ public:
void SetEndedAt(DateTime When) { m_Info.EndedAt = When; }
- void AppendLog(LogEntry Entry);
+ /// Appends an entry to the in-memory deque and to the persisted
+ /// log. Returns the new cursor value (m_TotalAppended post-
+ /// increment). Logger name is interned, message and format are
+ /// arena-allocated — the input's string_views may safely be
+ /// caller-stack-bound.
+ uint64_t AppendLog(LogEntryInput Input);
+
+ /// Append-many counterpart that takes the deque lock exactly once
+ /// for the whole batch. Use this when an inbound HTTP POST carries
+ /// multiple entries — single-lock semantics keep entries from one
+ /// caller contiguous on the wire even when other appends race in,
+ /// and the WS-push observer can fire just once for the whole batch.
+ /// Returns the new cursor (the value at the tail of the batch).
+ uint64_t AppendLogBatch(std::span<LogEntryInput> Inputs);
+
std::vector<LogEntry> GetLogEntries(uint32_t Limit = 0, uint32_t Offset = 0) const;
uint64_t GetLogCount() const;
/// Returns entries appended after the given cursor and the new cursor value.
/// A cursor of 0 returns all entries currently in the deque.
+ /// The returned LogEntries borrow strings from this Session's
+ /// arena — callers must hold a Ref<Session> for as long as they
+ /// keep the result.
struct CursorResult
{
std::vector<LogEntry> Entries;
@@ -81,26 +140,118 @@ public:
};
CursorResult GetLogEntriesAfter(uint64_t AfterCursor) const;
+ // Seed this session with pre-existing log entries (e.g. loaded from disk
+ // on startup). Sets the total-appended counter to reflect what was on
+ // disk so cursors remain meaningful for historical sessions. The inputs
+ // are interned/arena-allocated into this session.
+ void PreloadEntries(std::span<const LogEntryInput> Tail, uint64_t TotalCount);
+
+ /// Process handle used for client-liveness checks. Acquired at
+ /// registration time (while the pid is known to refer to the reporting
+ /// process) and held for the session's lifetime; on Windows this is a
+ /// real HANDLE tied to the specific process instance and is immune to
+ /// pid reuse. Invalid (IsValid() == false) for remote sessions or when
+ /// OpenProcess() failed. Set once at construction — no synchronization
+ /// needed for readers.
+ const ProcessHandle& GetClientProcess() const { return m_ClientProcess; }
+ ProcessHandle& GetClientProcess() { return m_ClientProcess; }
+
+ static constexpr uint32_t MaxLogEntries = 10000;
+
private:
+ // Intern a logger name into m_LogArena and return the canonical
+ // pointer for that name. Subsequent calls with the same string
+ // return the same pointer. Caller must hold m_LogLock exclusive.
+ const char* InternLoggerNameLocked(std::string_view Name);
+
+ // Allocate a copy of Str into m_LogArena and return a null-
+ // terminated pointer. No deduplication. Caller must hold m_LogLock
+ // exclusive. Empty input returns "" (no allocation).
+ const char* AllocateLogStringLocked(std::string_view Str);
+
SessionInfo m_Info;
Ref<SessionLog> m_Log;
+ ProcessHandle m_ClientProcess;
mutable RwLock m_LogLock;
eastl::deque<LogEntry> m_LogEntries;
uint64_t m_TotalAppended = 0; // monotonically increasing counter
-
- static constexpr uint32_t MaxLogEntries = 10000;
+ // String storage for the in-memory deque. LoggerName is interned
+ // (one canonical copy per unique name); Message and Format are
+ // duplicated per entry. Both die with the Session — so the
+ // LogEntry pointers do too. Sized to fit a typical session's
+ // strings in one chunk; spills to additional chunks otherwise.
+ MemoryArena m_LogArena{4096};
+ tsl::robin_map<std::string_view, const char*> m_InternedLoggerNames;
};
- SessionsService();
+ /// Construct a SessionsService. If StorageRoot is non-empty, session
+ /// metadata and logs are persisted under that directory (one subdirectory
+ /// per session id) and previously-persisted sessions are loaded as ended.
+ explicit SessionsService(std::filesystem::path StorageRoot = {});
~SessionsService();
- bool RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, const Oid& JobId, CbObjectView Metadata);
- bool UpdateSession(const Oid& SessionId, CbObjectView Metadata);
- Ref<Session> GetSession(const Oid& SessionId) const;
+ bool RegisterSession(const Oid& SessionId,
+ std::string AppName,
+ std::string Mode,
+ std::string Platform,
+ uint32_t ClientPid,
+ const Oid& ParentSessionId,
+ const Oid& JobId,
+ CbObjectView Metadata);
+ bool UpdateSession(const Oid& SessionId, CbObjectView Metadata);
+ Ref<Session> GetSession(const Oid& SessionId) const;
std::vector<Ref<Session>> GetSessions() const;
std::vector<Ref<Session>> GetEndedSessions() const;
- bool RemoveSession(const Oid& SessionId);
- uint64_t GetSessionCount() const;
+ /// Ends a session. If Reason is non-empty, a synthetic log line is
+ /// appended to the session log before it's moved to ended so the
+ /// historical log has a clear closing event.
+ bool RemoveSession(const Oid& SessionId, std::string_view Reason = {});
+ uint64_t GetSessionCount() const;
+
+ /// Appends a log entry to `SessionId` and, if the session exists,
+ /// invokes the log-appended callback with the new cursor so downstream
+ /// push subscribers (e.g. the HTTP WS broadcast) can deliver the delta
+ /// without polling. Returns the new cursor, or 0 if the session is
+ /// unknown. Fires the callback AFTER any internal locks are released
+ /// so the callback can safely call back into this service.
+ uint64_t AppendLog(const Oid& SessionId, LogEntryInput Input);
+
+ /// Batch counterpart of AppendLog. Atomic with respect to other
+ /// appends to the same session — entries land contiguously on the
+ /// wire and persist in order — and fires exactly one push-callback
+ /// for the whole batch. Empty batches and unknown sessions are
+ /// no-ops returning 0.
+ uint64_t AppendLogBatch(const Oid& SessionId, std::span<LogEntryInput> Inputs);
+
+ /// Observer fired after an entry is appended to any session. Replaces
+ /// any previously set callback. Pass {} to clear. Only one listener is
+ /// supported — the single consumer today is the HTTP WebSocket push.
+ using LogAppendedCallback = std::function<void(const Oid& SessionId, uint64_t NewCursor)>;
+ void SetLogAppendedCallback(LogAppendedCallback Callback);
+
+ /// Drop ended sessions that are too old, that push us over the count
+ /// limit, or that push the on-disk footprint over the byte budget, and
+ /// delete their persisted directories. Active sessions are never
+ /// pruned. Returns the number removed by each criterion.
+ struct PruneResult
+ {
+ size_t ExpiredByAge = 0;
+ size_t ExpiredByCount = 0;
+ size_t ExpiredByStorage = 0;
+ };
+ PruneResult PruneExpired(TimeSpan MaxAge, size_t MaxCount, uint64_t MaxStorageBytes);
+
+ /// End any active session whose tracked client process is no longer
+ /// running. Sessions with an invalid ProcessHandle (remote, or
+ /// OpenProcess failed at registration) are skipped. Returns the number
+ /// of sessions ended by this pass.
+ size_t CheckProcessLiveness();
+
+ // Tuning defaults. Expressed in whole days / bytes so they're easy to
+ // override from a future command-line flag without touching internals.
+ static constexpr int kDefaultMaxSessionAgeDays = 365;
+ static constexpr size_t kDefaultMaxSessionCount = 1000;
+ static constexpr uint64_t kDefaultMaxStorageBytes = 50ull * 1024 * 1024; // 50 MiB
private:
LoggerRef& Log() { return m_Log; }
@@ -110,6 +261,10 @@ private:
tsl::robin_map<Oid, Ref<Session>, Oid::Hasher> m_Sessions;
std::vector<Ref<Session>> m_EndedSessions;
std::unique_ptr<SessionLogStore> m_SessionLogs;
+ // Set once at wiring-time (single consumer), never reassigned while
+ // hot, so no dedicated lock — just a plain member. Copy-on-call
+ // guards against the theoretical re-register race below.
+ LogAppendedCallback m_LogAppendedCallback;
};
} // namespace zen
diff --git a/src/zenserver/stats/statsreporter.cpp b/src/zenserver/stats/statsreporter.cpp
index ff055cf18..4898f3ad5 100644
--- a/src/zenserver/stats/statsreporter.cpp
+++ b/src/zenserver/stats/statsreporter.cpp
@@ -3,12 +3,85 @@
#include "statsreporter.h"
#include <zencore/logging.h>
+#include <zencore/profiling/counterstrace.h>
#include <zencore/trace.h>
#include <zennet/statsdclient.h>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <EASTL/functional.h>
+#include <EASTL/hash_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <memory>
+#include <string>
+
namespace zen {
-StatsReporter::StatsReporter()
+// Decorator that forwards every metric call to an inner StatsMetrics (statsd
+// in production) and additionally emits a counter trace point so the metric
+// shows up in the .utrace stream / `zen trace serve` viewer.
+//
+// Counter ids are allocated lazily on first sighting of a metric name. The
+// name string is owned by the lookup map so the TraceCounter keeps a stable
+// pointer to it for late-init re-emission.
+class TracingStatsMetrics : public StatsMetrics
+{
+public:
+ void Rebind(StatsMetrics* Inner) { m_Inner = Inner; }
+
+ void Increment(std::string_view Metric) override
+ {
+ if (m_Inner)
+ m_Inner->Increment(Metric);
+ Counter(Metric).Increment();
+ }
+
+ void Decrement(std::string_view Metric) override
+ {
+ if (m_Inner)
+ m_Inner->Decrement(Metric);
+ Counter(Metric).Decrement();
+ }
+
+ void Count(std::string_view Metric, int64_t CountDelta) override
+ {
+ if (m_Inner)
+ m_Inner->Count(Metric, CountDelta);
+ Counter(Metric).Add(CountDelta);
+ }
+
+ void Gauge(std::string_view Metric, uint64_t CurrentValue) override
+ {
+ if (m_Inner)
+ m_Inner->Gauge(Metric, CurrentValue);
+ Counter(Metric).SetAlways(int64_t(CurrentValue));
+ }
+
+ void Meter(std::string_view Metric, uint64_t IncrementValue) override
+ {
+ if (m_Inner)
+ m_Inner->Meter(Metric, IncrementValue);
+ Counter(Metric).Add(int64_t(IncrementValue));
+ }
+
+private:
+ TraceCounterInt& Counter(std::string_view Metric)
+ {
+ if (auto It = m_Counters.find_as(Metric, std::hash<std::string_view>(), eastl::equal_to_2<std::string, std::string_view>());
+ It != m_Counters.end())
+ {
+ return *It->second;
+ }
+ auto [Inserted, _] = m_Counters.try_emplace(std::string(Metric), nullptr);
+ Inserted->second = std::make_unique<TraceCounterInt>(Inserted->first.c_str(), TraceCounterDisplayHint::None);
+ return *Inserted->second;
+ }
+
+ StatsMetrics* m_Inner = nullptr;
+ eastl::hash_map<std::string, std::unique_ptr<TraceCounterInt>, std::hash<std::string>, std::equal_to<std::string>> m_Counters;
+};
+
+StatsReporter::StatsReporter() : m_TracingMetrics(std::make_unique<TracingStatsMetrics>())
{
}
@@ -52,13 +125,18 @@ void
StatsReporter::ReportStats()
{
RwLock::ExclusiveLockScope _(m_Lock);
- if (m_Statsd)
+
+ // Always run providers through the tracing decorator so counter trace
+ // points fire even when statsd is disabled. The decorator no-ops the
+ // inner forward when m_Statsd is null.
+ m_TracingMetrics->Rebind(m_Statsd.get());
+ for (StatsProvider* Provider : m_Providers)
{
- for (StatsProvider* Provider : m_Providers)
- {
- Provider->ReportMetrics(*m_Statsd);
- }
+ Provider->ReportMetrics(*m_TracingMetrics);
+ }
+ if (m_Statsd)
+ {
m_Statsd->Flush();
}
}
diff --git a/src/zenserver/stats/statsreporter.h b/src/zenserver/stats/statsreporter.h
index b4174073c..82219bc14 100644
--- a/src/zenserver/stats/statsreporter.h
+++ b/src/zenserver/stats/statsreporter.h
@@ -10,6 +10,7 @@
namespace zen {
class StatsDaemonClient;
+class TracingStatsMetrics;
class StatsReporter
{
@@ -26,9 +27,10 @@ public:
void AddProvider(StatsProvider* Provider);
private:
- RwLock m_Lock;
- std::unique_ptr<StatsDaemonClient> m_Statsd;
- std::vector<StatsProvider*> m_Providers;
+ RwLock m_Lock;
+ std::unique_ptr<StatsDaemonClient> m_Statsd;
+ std::unique_ptr<TracingStatsMetrics> m_TracingMetrics; // owns counter id table; rebinds to m_Statsd
+ std::vector<StatsProvider*> m_Providers;
};
} // namespace zen
diff --git a/src/zenserver/storage/storageconfig.cpp b/src/zenserver/storage/storageconfig.cpp
index bb4f053e4..b4d97257d 100644
--- a/src/zenserver/storage/storageconfig.cpp
+++ b/src/zenserver/storage/storageconfig.cpp
@@ -379,7 +379,6 @@ ZenStorageServerConfigurator::AddConfigOptions(LuaConfig::Options& LuaOptions)
////// server
LuaOptions.AddOption("server.pluginsconfigfile"sv, ServerOptions.PluginsConfigFile, "plugins-config"sv);
- LuaOptions.AddOption("sessions.url"sv, ServerOptions.SessionsTargetUrl, "sessions-url"sv);
////// objectstore
LuaOptions.AddOption("server.objectstore.enabled"sv, ServerOptions.ObjectStoreEnabled, "objectstore-enabled"sv);
@@ -630,8 +629,6 @@ ZenStorageServerCmdLineOptions::AddCliOptions(cxxopts::Options& options, ZenStor
cxxopts::value(ServerOptions.ScrubOptions)->implicit_value("yes"),
"(nocas,nogc,nodelete,yes,no)*");
- options.add_options()("sessions-url", "URL of remote zenserver to announce session to", cxxopts::value<std::string>(SessionsTargetUrl));
-
AddSecurityOptions(options, ServerOptions);
AddCacheOptions(options, ServerOptions);
AddGcOptions(options, ServerOptions);
@@ -1088,7 +1085,6 @@ ZenStorageServerCmdLineOptions::ApplyOptions(cxxopts::Options& options, ZenStora
{.Name = OpenIdProviderName, .Url = OpenIdProviderUrl, .ClientId = OpenIdClientId});
}
- ServerOptions.SessionsTargetUrl = SessionsTargetUrl;
ServerOptions.ObjectStoreConfig = ParseBucketConfigs(BucketConfigs);
ServerOptions.OidcTokenExecutable = MakeSafeAbsolutePath(OidcTokenExecutable);
}
diff --git a/src/zenserver/storage/storageconfig.h b/src/zenserver/storage/storageconfig.h
index fec8fd70b..bb6c929a9 100644
--- a/src/zenserver/storage/storageconfig.h
+++ b/src/zenserver/storage/storageconfig.h
@@ -163,7 +163,6 @@ struct ZenStorageServerConfig : public ZenServerConfig
bool RestrictContentTypes = false;
std::filesystem::path OidcTokenExecutable;
bool AllowExternalOidcTokenExe = true;
- std::string SessionsTargetUrl;
};
struct ZenStorageServerCmdLineOptions
@@ -185,8 +184,6 @@ struct ZenStorageServerCmdLineOptions
void AddSecurityOptions(cxxopts::Options& options, ZenStorageServerConfig& ServerOptions);
- std::string SessionsTargetUrl;
-
std::string UpstreamCachePolicyOptions;
void AddCacheOptions(cxxopts::Options& options, ZenStorageServerConfig& ServerOptions);
diff --git a/src/zenserver/storage/zenstorageserver.cpp b/src/zenserver/storage/zenstorageserver.cpp
index 44291395a..e7561e446 100644
--- a/src/zenserver/storage/zenstorageserver.cpp
+++ b/src/zenserver/storage/zenstorageserver.cpp
@@ -249,10 +249,11 @@ ZenStorageServer::InitializeServices(const ZenStorageServerConfig& ServerOptions
if (!ServerOptions.SessionsTargetUrl.empty())
{
m_SessionsClient = std::make_unique<SessionsServiceClient>(SessionsServiceClient::Options{
- .TargetUrl = ServerOptions.SessionsTargetUrl,
- .AppName = "zenserver",
- .Mode = GetServerMode(),
- .SessionId = GetSessionId(),
+ .TargetUrl = ServerOptions.SessionsTargetUrl,
+ .AppName = "zenserver",
+ .Mode = GetServerMode(),
+ .SessionId = GetSessionId(),
+ .ParentSessionId = GetParentSessionId(),
});
}
@@ -846,10 +847,14 @@ ZenStorageServer::Run()
SetNewState(kRunning);
- OnReady();
-
+ // Register the self-session and replay the backlog into it BEFORE
+ // OnReady disables the backlog — otherwise the in-proc session sink
+ // attaches against a disabled backlog and shows nothing from the
+ // startup window.
StartSelfSession("zenserver");
+ OnReady();
+
if (m_SessionsClient)
{
m_SessionsClient->Announce();
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 6bf22eef8..44fa01ea4 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -15,6 +15,7 @@
#include <zencore/logging.h>
#include <zencore/logging/broadcastsink.h>
#include <zencore/memory/fmalloc.h>
+#include <zencore/process.h>
#include <zencore/scopeguard.h>
#include <zencore/sentryintegration.h>
#include <zencore/session.h>
@@ -190,10 +191,10 @@ ZenServerBase::Initialize(const ZenServerConfig& ServerOptions, ZenServerState::
m_Http->RegisterService(m_StatsService);
m_StatsReporter.Initialize(ServerOptions.StatsConfig);
- if (ServerOptions.StatsConfig.Enabled)
- {
- EnqueueStatsReportingTimer();
- }
+ // Run the reporting timer unconditionally: even when statsd is disabled
+ // the StatsReporter still fans out to providers so counter trace points
+ // fire into the active .utrace stream (see TracingStatsMetrics).
+ EnqueueStatsReportingTimer();
// clang-format off
HealthServiceInfo HealthInfo {
@@ -232,7 +233,7 @@ ZenServerBase::Initialize(const ZenServerConfig& ServerOptions, ZenServerState::
LogSettingsSummary(ServerOptions);
- InitializeSessions();
+ InitializeSessions(ServerOptions.UseInProcSessionLogging);
return EffectiveBasePort;
}
@@ -265,7 +266,7 @@ ZenServerBase::ShutdownServices()
if (m_SessionsService)
{
- m_SessionsService->RemoveSession(GetSessionId());
+ m_SessionsService->RemoveSession(GetSessionId(), "server shutdown");
}
m_HttpSessionsService.reset();
@@ -279,15 +280,31 @@ ZenServerBase::ShutdownServices()
}
void
-ZenServerBase::InitializeSessions()
+ZenServerBase::InitializeSessions(bool UseInProcSessionLogging)
{
- m_SessionsService = std::make_unique<SessionsService>();
+ // Persist session metadata and logs under <DataRoot>/sessions. If no data
+ // root is configured (e.g. some test contexts) fall back to in-memory only.
+ std::filesystem::path SessionsRoot;
+ if (!m_DataRoot.empty())
+ {
+ SessionsRoot = m_DataRoot / "sessions";
+ }
+
+ m_SessionsService = std::make_unique<SessionsService>(std::move(SessionsRoot));
m_HttpSessionsService = std::make_unique<HttpSessionsService>(m_StatusService, m_StatsService, *m_SessionsService, m_IoContext);
m_HttpSessionsService->SetSelfSessionId(GetSessionId());
- m_InProcSessionLogSink = logging::SinkPtr(new InProcSessionLogSink(*m_SessionsService));
- m_InProcSessionLogSink->SetLevel(logging::Info);
- GetDefaultBroadcastSink()->AddSink(m_InProcSessionLogSink);
+ if (UseInProcSessionLogging)
+ {
+ // Create the sink up front but don't attach it to the broadcast
+ // yet — the self-session isn't registered with the service until
+ // StartSelfSession runs, and a sink that fires Log() before then
+ // silently drops every line because Service.AppendLog can't
+ // resolve the session id. The attach (with backlog replay) is
+ // performed once StartSelfSession has registered.
+ m_InProcSessionLogSink = logging::SinkPtr(new InProcSessionLogSink(*m_SessionsService));
+ m_InProcSessionLogSink->SetLevel(logging::Info);
+ }
}
void
@@ -295,7 +312,29 @@ ZenServerBase::StartSelfSession(std::string_view AppName)
{
if (m_SessionsService)
{
- m_SessionsService->RegisterSession(GetSessionId(), std::string(AppName), GetServerMode(), Oid::Zero, {});
+ m_SessionsService->RegisterSession(GetSessionId(),
+ std::string(AppName),
+ GetServerMode(),
+ std::string(GetRuntimePlatformName()),
+ // Report our own pid so it's visible in `zen sessions
+ // ls` and the dashboard. The liveness sweep will probe
+ // this entry and always find it alive — that's
+ // consistent (we're running iff the sweep is running)
+ // and the extra IsRunning check is cheap.
+ static_cast<uint32_t>(GetCurrentProcessId()),
+ GetParentSessionId(),
+ Oid::Zero,
+ {});
+
+ // Now that the self-session exists in the service, attach the
+ // in-proc sink and replay everything that was buffered into the
+ // log backlog up to this point. This brings every line emitted
+ // since process start into the self-session's persisted log,
+ // even though the session itself was only registered just now.
+ if (m_InProcSessionLogSink)
+ {
+ AttachSinkWithBacklogReplay(m_InProcSessionLogSink);
+ }
}
}
@@ -345,6 +384,12 @@ ZenServerBase::EnsureIoRunner()
void
ZenServerBase::OnReady()
{
+ // Bootstrap window is closed: every sink that's going to attach has
+ // attached, the run loop is up. Drop the captured early-startup
+ // backlog so it doesn't pin ~256KB of memory for the rest of the
+ // process. Subsequent log calls bypass the backlog cheaply.
+ DisableLogBacklog();
+
if (m_ServerEntry)
{
m_ServerEntry->SignalReady();
diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h
index 995ff054f..b7c82ca28 100644
--- a/src/zenserver/zenserver.h
+++ b/src/zenserver/zenserver.h
@@ -146,7 +146,7 @@ protected:
virtual void HandleStatusRequest(HttpServerRequest& Request) override;
private:
- void InitializeSessions();
+ void InitializeSessions(bool UseInProcSessionLogging);
void InitializeSecuritySettings(const ZenServerConfig& ServerOptions);
};
class ZenServerMain