aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/sessions/sessions.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-05-05 15:47:48 +0200
committerGitHub Enterprise <[email protected]>2026-05-05 15:47:48 +0200
commit01286c6233347d561064fc9e6cf9deaf2087ceb7 (patch)
treebdbfdf01725baa2d2dd3d73727e6506b41421dff /src/zenserver/sessions/sessions.cpp
parenthub async s3 client (#1024) (diff)
downloadarchived-zen-main.tar.xz
archived-zen-main.zip
sessions: persist to disk, prune, track client liveness, accept UE_LOGFMT (#1014)HEADmain
Branch started as a sessions-service overhaul (persistence, client liveness, UE_LOGFMT intake) and grew to pick up adjacent infrastructure work: an early-startup log backlog, a hardened `MemoryArena`, the `zen trace serve` viewer gaining a counter view + compact timeline + tabbed callsite panel, defensive fixes in the third-party `tourist` trace parser, a series of allocation reductions across the HTTP and compact-binary hot paths, and a new `zen sessions` CLI command tree. ## Sessions service **Persistence.** Each session lives on disk under `<DataRoot>/sessions/<id>/` as `info.cb` (metadata) plus `log.bin` (length-prefixed CbObject log records). On startup the service scans that directory and loads prior sessions as ended sessions, preloading the tail of each log so historical views work after a restart. `SessionLog` is noexcept-constructed and falls back to a disabled state on disk errors, so a bad disk can't take down `RegisterSession`. `GetSession` falls back to the ended-sessions list (fixes historical log fetches over HTTP). `LoadTail` counts only successfully-parsed records. **Pruning.** Periodic cleanup task drops ended sessions once any of three caps is exceeded: age (default 1 year), count (default 1000), or total on-disk footprint (default 50 MiB). Runs 30 s after startup, hourly thereafter. Active sessions never pruned; disk removal and directory stat happen outside the exclusive lock so a slow filesystem can't stall lookups. **Client liveness.** Sessions carry a `ProcessHandle` for the client-reported pid, captured at registration time so Windows pid recycling can't produce false positives. A 30 s asio timer probes liveness and ends dead sessions through the normal remove path, producing a synthetic `Session ended: process exited (...)` line persisted to `log.bin`. Windows decodes common NTSTATUS exit codes to human names (Ctrl-C, access violation, stack overflow, ...); POSIX stays at plain `process exited`. Clients auto-fill `ClientPid` only for local targets (unix socket / loopback); the server defensively accepts pids only from `IsLocalMachineRequest()` peers. zenserver also reports its own pid when registering its self-session, so it shows up with a real pid in the dashboard and `zen sessions ls`. **Synthetic end-of-session line.** `RemoveSession` takes an optional reason; before the session moves to the ended list it appends an Info-level `Session ended[: reason]` entry through the normal log path (released outside `m_Lock`). Current reasons: `client request` (HTTP DELETE), `server shutdown` (self-session), `process exited (...)` (liveness). **UE_LOGFMT structured entries.** `POST /sessions/{id}/log` now accepts `{level, logger, format, fields}` alongside the existing `{level, logger, message}` shape. New `logtemplate.{h,cpp}` implements UE's `StructuredLog.cpp` template grammar (field paths with `.name` / `[N]`, `{{`/`}}` escapes, `$text` / `$format` / `$locformat` object conventions, bounded recursion). Renders to a displayable message at intake while persisting raw format + fields so a future UI can drill into fields without another schema bump. Hot path is zero-alloc — renders into `ExtendableStringBuilder<256>` using stack-buffered `Oid::ToString` / `IoHash::ToHexString` overloads. UI shows a `{…}` marker with the raw template + JSON-pretty fields on hover. **Parent sessions.** `SessionInfo` gains `parent_session_id`; hub-managed storage server child processes inherit the hub's session id via `--parent-session=<id>`. `ZEN_SESSIONS_URL` env var becomes a fallback for `--sessions-url` / config when neither is provided. The in-process session log sink is disabled when a remote sessions target is configured (logs flow through `SessionsServiceClient` instead). The sessions UI groups child sessions under their parent (collapsible/expandable, sorts as a unit, supports nesting). **Platform reporting.** `SessionInfo` gains `Platform`, flowed end-to-end: client auto-fills via `GetRuntimePlatformName()`, server persists in `info.cb` (`plat`) and emits on GET. UI renders as a SimpleIcons-style inline SVG (windows / macOS / iOS / linux / wine / android / playstation / xbox / nintendo) with case-insensitive alias resolution (Win32/Win64, PS4/PS5, XSX/XSS, NintendoSwitch, iPhone/iPad, Darwin/OSX). Unknown values fall back to text; sorting runs on the underlying string. **WebSocket log streaming.** Sessions UI moves from 2 s polling to a WebSocket push model. New `WsSubscriber` has a stable id + helper methods. UI caps the log-line DOM at 5 000 entries with a shared cursor-regression helper, factored out of two call sites. Per-broadcast allocations trimmed on the push path; fixed a stack overrun in the WS log broadcast hex-id buffer. **Log memory.** `LogEntry::Level` is now `logging::LogLevel` (1 byte) instead of `std::string` (~32 B) — saves ~310 KB per full 10 k-entry deque and eliminates a per-message allocation in the in-proc sink. On-disk format writes an int32 and accepts either int or legacy string on read. `LogEntry` strings now live in a `MemoryArena`; logger names are interned across the deque. `SessionLog::Append` and `WriteSessionInfoFile` drop their `UniqueBuffer` round-trip and write `CbObject::GetView()` straight through `BasicFile` / `SafeWriteFile`. Multi-entry `POST /log` batched under one lock + one push. **In-proc log timestamps.** `InProcSessionLogSink::TimePointToDateTime` previously preserved only whole seconds, so every in-proc entry rendered at `.000` ms in the dashboard and `zen sessions tail`. It now adds the sub-second part (nanoseconds → 100 ns ticks) to keep ms precision end-to-end. **UI.** Side "Session Details" panel is gone — its info is inline in the table (appname, mode, platform, id, timestamps, this/log pills, active dot). Bottom panel is a tabbed `Log | Metadata` view with a right-side "Session Information" panel beside metadata; log-only controls (filter, newest-first, follow, log-level filter, expand/collapse) hide when Metadata is active, polling keeps running across tab switches. Wide-mode toggle fills the viewport edge-to-edge. Log lines show the logger category; timestamps render in 24 h with zero-padded fields regardless of locale. Sessions list defaults to All / 10 per page / created-desc, gains click-to-sort headers on the full dataset, a header filter box, and a pager aligned to the table's right edge. Duplicate auto-injected `<h1>Sessions</h1>` removed. ## `zen sessions` CLI New command tree on the `zen` client for inspecting the sessions service from the terminal: - **`zen sessions ls`** — lists sessions (active first, ended next; newest-first within each group) with id, status, app/mode, pid, created, duration, and log count. Supports `--status active|ended|all` (default `all`). - **`zen sessions status`** — prints the sessions service summary: self id, active / ended counts, and the read/write/delete/list/request/bad-request counters from `/stats/sessions`. - **`zen sessions tail [session]`** — tails a session's log. With no argument it tails zenserver's own session (resolved via `/sessions/list`'s `self_id`); an explicit 24-hex id targets any session, including ended ones (historical replay). `--lines N` (default 50, 0 = all buffered) trims the initial dump client-side. `--follow` prefers a WebSocket push subscription on `/sessions/ws` for sub-second latency; on upgrade failure (older server, blocked port, unix-socket transport) it falls back to HTTP cursor polling at `--interval-ms` (default 500), with sleeps chunked to 50 ms so Ctrl-C reacts quickly. Output matches `zen::logging::FullFormatter` (`[YY-MM-DD HH:MM:SS.mmm] [lvl] [logger] message`); on a TTY the level is colored and the logger is bold, with continuation lines indented under the message column using the *visible* prefix width. 404 surfaces as `(session ended)` and connection errors as `(server gone)` — both clean exits, so stopping the server mid-tail no longer prints a stack trace. - **`zen sessions ui`** — opens `<host>/dashboard/?page=sessions` in the user's default browser. Rejects unix-socket hosts. A small `ZenServiceClient::IsUnixSocket()` helper now wraps the unix-socket check used by `ui`, `sessions tail` (WS path), and `sessions ui`. ## Logging `BacklogSink` captures early-startup log entries in a fixed-capacity ring so late-attached sinks (session sink, file sink) can replay them. Detaches from the broadcast list when disabled; backed by destructor-only cleanup (no `unique_ptr` indirection per entry). Tuned defaults so the backlog covers typical bring-up without unbounded growth. ## `zen trace serve` viewer - Compact timeline mode for high-density views. - New `TRACE_INT_VALUE` / `TRACE_FLOAT_VALUE` counter trace points + a counters page in the viewer. - Callsite tables collapsed into a single tabbed panel. - Lossless `Oid <-> Guid` bridge for trace session ids; trace `SessionId` plumbed through. - `tourist` parser hardening: bounds-check `BufferStream::read`, validate `Type::info_size` before `patch()`, convert `parse_important_aux` to a loop (avoids deep recursion), widen `ParserPool` index to `uint32`, bounds-check field offsets in the dispatcher, pin `Types::parse` buffer up-front. ## `MemoryArena` Configurable chunk size, inline chunk list, oversize requests routed to truly-dedicated chunks (no slack waste, no fragmentation when one allocation is much larger than the chunk). ## Allocation cleanups across hot paths - `zenhttp::HttpRequestRouter::HandleRequest` and `FormatPackageMessageInternal`: drop heap allocations. - Compact-binary validation: `eastl::fixed_vector` + `eastl::sort`; eliminate `std::vector` churn. - `zenserverprocess`: trim transient allocations in spawn paths. - Sessions HTTP intake / broadcast: drop transient `std::string` allocs.
Diffstat (limited to 'src/zenserver/sessions/sessions.cpp')
-rw-r--r--src/zenserver/sessions/sessions.cpp1166
1 files changed, 1120 insertions, 46 deletions
diff --git a/src/zenserver/sessions/sessions.cpp b/src/zenserver/sessions/sessions.cpp
index 9d4e3120c..470117c6a 100644
--- a/src/zenserver/sessions/sessions.cpp
+++ b/src/zenserver/sessions/sessions.cpp
@@ -3,59 +3,733 @@
#include "sessions.h"
#include <zencore/basicfile.h>
+#include <zencore/compactbinarybuilder.h>
#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
#include <zencore/logging.h>
+#include <mutex>
+
namespace zen {
using namespace std::literals;
+namespace {
+
+ // Per-session log file layout:
+ // [LogFileHeader][Record]*
+ // where Record = [uint32_t ByteLength][CbObject bytes of ByteLength].
+ // Records are written in order. A partial trailing record (short write after
+ // crash) is ignored on load.
+ constexpr uint32_t kLogFileMagic = 0x5A534C47u; // 'ZSLG'
+ constexpr uint32_t kLogFileVersion = 1;
+
+#pragma pack(push, 1)
+ struct LogFileHeader
+ {
+ uint32_t Magic;
+ uint32_t Version;
+ };
+#pragma pack(pop)
+
+ constexpr uint64_t kLogFileHeaderSize = sizeof(LogFileHeader);
+
+ void WriteLogEntryFields(CbObjectWriter& Writer, const SessionsService::LogEntryInput& Entry)
+ {
+ Writer << "ts" << Entry.Timestamp;
+ if (Entry.Level != logging::Off)
+ {
+ // Store as a small integer (int8_t range) rather than a string —
+ // fixed-width, one byte in the serialized CbObject.
+ Writer << "lvl" << static_cast<int32_t>(Entry.Level);
+ }
+ if (!Entry.LoggerName.empty())
+ {
+ Writer << "cat" << Entry.LoggerName;
+ }
+ if (!Entry.Message.empty())
+ {
+ Writer << "msg" << Entry.Message;
+ }
+ // Structured-log template + fields. Only present for UE_LOGFMT-shaped
+ // entries; Message already holds the rendered text for those so a
+ // reader that ignores these two can still display the line.
+ if (!Entry.Format.empty())
+ {
+ Writer << "fmt" << Entry.Format;
+ if (Entry.Fields.GetSize() > 0)
+ {
+ Writer.AddObject("flds"sv, Entry.Fields);
+ }
+ }
+ }
+
+ // Parse a serialized record into an input form. The string_views in
+ // the result borrow from `Obj`'s underlying buffer; the caller must
+ // keep that buffer alive (or arena-copy via PreloadEntries) before
+ // the views are used.
+ bool ReadLogEntry(CbObjectView Obj, SessionsService::LogEntryInput& OutInput)
+ {
+ CbFieldView TsField = Obj["ts"sv];
+ if (!TsField)
+ {
+ return false;
+ }
+ OutInput.Timestamp = TsField.AsDateTime();
+
+ // New format: integer. Legacy format (pre-refactor log.bin files on
+ // this same branch): string. Accept either so existing persisted
+ // entries keep their level when loaded.
+ CbFieldView LvlField = Obj["lvl"sv];
+ if (LvlField.IsInteger())
+ {
+ const int32_t Lvl = LvlField.AsInt32();
+ if (Lvl >= 0 && Lvl < logging::LogLevelCount)
+ {
+ OutInput.Level = static_cast<logging::LogLevel>(Lvl);
+ }
+ }
+ else if (LvlField.IsString())
+ {
+ OutInput.Level = logging::ParseLogLevelString(LvlField.AsString());
+ }
+
+ OutInput.LoggerName = Obj["cat"sv].AsString();
+ OutInput.Message = Obj["msg"sv].AsString();
+ OutInput.Format = Obj["fmt"sv].AsString();
+ if (CbObjectView FieldsView = Obj["flds"sv].AsObjectView(); FieldsView.GetSize() > 0)
+ {
+ OutInput.Fields = CbObject::Clone(FieldsView);
+ }
+ return true;
+ }
+
+ void WriteSessionInfoFields(CbObjectWriter& Writer, const SessionsService::SessionInfo& Info)
+ {
+ Writer << "id" << Info.Id;
+ if (!Info.AppName.empty())
+ {
+ Writer << "app" << Info.AppName;
+ }
+ if (!Info.Mode.empty())
+ {
+ Writer << "mode" << Info.Mode;
+ }
+ if (!Info.Platform.empty())
+ {
+ Writer << "plat" << Info.Platform;
+ }
+ if (Info.ClientPid != 0)
+ {
+ Writer << "pid" << Info.ClientPid;
+ }
+ if (Info.ParentSessionId != Oid::Zero)
+ {
+ Writer << "parent_session_id" << Info.ParentSessionId;
+ }
+ if (Info.JobId != Oid::Zero)
+ {
+ Writer << "jobid" << Info.JobId;
+ }
+ Writer << "ca" << Info.CreatedAt;
+ Writer << "ua" << Info.UpdatedAt;
+ if (Info.EndedAt.GetTicks() != 0)
+ {
+ Writer << "ea" << Info.EndedAt;
+ }
+ if (Info.Metadata.GetSize() > 0)
+ {
+ Writer.AddObject("meta"sv, Info.Metadata);
+ }
+ }
+
+ bool ReadSessionInfo(CbObjectView Obj, SessionsService::SessionInfo& OutInfo)
+ {
+ OutInfo.Id = Obj["id"sv].AsObjectId();
+ if (OutInfo.Id == Oid::Zero)
+ {
+ return false;
+ }
+ OutInfo.AppName = std::string(Obj["app"sv].AsString());
+ OutInfo.Mode = std::string(Obj["mode"sv].AsString());
+ OutInfo.Platform = std::string(Obj["plat"sv].AsString());
+ OutInfo.ClientPid = Obj["pid"sv].AsUInt32();
+ OutInfo.ParentSessionId = Obj["parent_session_id"sv].AsObjectId();
+ OutInfo.JobId = Obj["jobid"sv].AsObjectId();
+ OutInfo.CreatedAt = Obj["ca"sv].AsDateTime();
+ OutInfo.UpdatedAt = Obj["ua"sv].AsDateTime();
+ OutInfo.EndedAt = Obj["ea"sv].AsDateTime(DateTime{0});
+ CbObjectView MetaView = Obj["meta"sv].AsObjectView();
+ if (MetaView.GetSize() > 0)
+ {
+ OutInfo.Metadata = CbObject::Clone(MetaView);
+ }
+ return true;
+ }
+
+ std::filesystem::path SessionDir(const std::filesystem::path& Root, const Oid& Id) { return Root / Id.ToString(); }
+
+#if ZEN_PLATFORM_WINDOWS
+ // Turn a Windows process exit code into a human-friendly termination
+ // reason. Most abnormal terminations surface as NTSTATUS values (high
+ // bit set); the ones below are what you'll actually encounter in the
+ // wild. Anything we don't recognize falls through to a formatted hex
+ // (NTSTATUS-shaped) or decimal (application-level) exit code.
+ std::string DescribeWindowsExitCode(uint32_t ExitCode)
+ {
+ struct Named
+ {
+ uint32_t Code;
+ std::string_view Name;
+ };
+ using namespace std::literals;
+ static constexpr Named kKnown[] = {
+ {0xC000013Au, "interrupted (Ctrl-C)"sv},
+ {0xC0000005u, "access violation"sv},
+ {0xC000001Du, "illegal instruction"sv},
+ {0xC0000094u, "integer divide by zero"sv},
+ {0xC0000096u, "privileged instruction"sv},
+ {0xC00000FDu, "stack overflow"sv},
+ {0xC0000409u, "stack buffer overrun"sv},
+ {0xC0000374u, "heap corruption"sv},
+ {0xC0000135u, "DLL not found"sv},
+ {0xC0000142u, "DLL initialization failed"sv},
+ {0xC000007Bu, "invalid image format"sv},
+ {0xC0000420u, "assertion failure"sv},
+ {0xC0000008u, "invalid handle"sv},
+ {0xC000008Eu, "float divide by zero"sv},
+ {0xC0000091u, "float overflow"sv},
+ {0xC0000093u, "float underflow"sv},
+ {0x80000003u, "breakpoint"sv},
+ {0x40000015u, "fatal app exit"sv},
+ };
+ for (const Named& Entry : kKnown)
+ {
+ if (Entry.Code == ExitCode)
+ {
+ return fmt::format("process exited ({}, exit code 0x{:08X})", Entry.Name, ExitCode);
+ }
+ }
+ // NTSTATUS-shaped codes have the high bit set; show them as hex so
+ // they're recognizable (and matchable against Microsoft's doc tables).
+ if ((ExitCode & 0x80000000u) != 0)
+ {
+ return fmt::format("process exited (exit code 0x{:08X})", ExitCode);
+ }
+ return fmt::format("process exited (exit code {})", ExitCode);
+ }
+#endif
+
+} // namespace
+
class SessionLog : public TRefCounted<SessionLog>
{
public:
- SessionLog(std::filesystem::path LogFilePath) { m_LogFile.Open(LogFilePath, BasicFile::Mode::kWrite); }
+ explicit SessionLog(std::filesystem::path LogFilePath);
+
+ void Append(const SessionsService::LogEntryInput& Entry);
+
+ // LoadTail returns input-form entries: their string_views borrow
+ // from m_OwnedBuffers (held internally) so the caller's PreloadEntries
+ // can intern/copy them into the session arena before the buffers are
+ // dropped at LoadResult destruction.
+ struct LoadResult
+ {
+ std::vector<SessionsService::LogEntryInput> TailEntries;
+ // Backing memory for the views in TailEntries. Each ParsedRecord
+ // keeps a CbObject alive whose payload bytes back the strings.
+ std::vector<CbObject> OwnedBuffers;
+ uint64_t TotalCount = 0;
+ };
+ LoadResult LoadTail(size_t MaxEntries);
private:
- BasicFile m_LogFile;
+ static LoggerRef Log()
+ {
+ static LoggerRef L(logging::Get("sessions"));
+ return L;
+ }
+
+ std::filesystem::path m_Path;
+ std::mutex m_Mutex;
+ BasicFile m_File;
+ uint64_t m_WriteOffset = 0;
+ bool m_Enabled = false;
};
-class SessionLogStore
+SessionLog::SessionLog(std::filesystem::path LogFilePath) : m_Path(std::move(LogFilePath))
{
-public:
- SessionLogStore(std::filesystem::path StoragePath) : m_StoragePath(std::move(StoragePath)) {}
+ std::error_code Ec;
+ std::filesystem::create_directories(m_Path.parent_path(), Ec);
- ~SessionLogStore() = default;
+ m_File.Open(m_Path, BasicFile::Mode::kWrite, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Session log '{}' could not be opened: {} - persistence disabled", m_Path, Ec.message());
+ return;
+ }
- Ref<SessionLog> GetLogForSession(const Oid& SessionId)
+ const uint64_t Size = m_File.FileSize(Ec);
+ if (Ec)
{
- // For now, just return a new log for each session. We can implement actual log storage and retrieval later.
- return Ref(new SessionLog(m_StoragePath / (SessionId.ToString() + ".log")));
+ m_File.Close();
+ ZEN_WARN("Session log '{}' could not be sized: {} - persistence disabled", m_Path, Ec.message());
+ return;
}
- Ref<SessionLog> CreateLogForSession(const Oid& SessionId)
+ LogFileHeader Header{};
+ bool NeedsInit = Size < kLogFileHeaderSize;
+ if (!NeedsInit)
+ {
+ // Read is throwing-only; guard so a read failure doesn't escape.
+ try
+ {
+ m_File.Read(&Header, sizeof(Header), 0);
+ }
+ catch (const std::exception& E)
+ {
+ ZEN_WARN("Session log '{}' header read failed: {} - reinitializing", m_Path, E.what());
+ NeedsInit = true;
+ }
+ if (!NeedsInit && (Header.Magic != kLogFileMagic || Header.Version != kLogFileVersion))
+ {
+ NeedsInit = true;
+ }
+ }
+
+ if (NeedsInit)
+ {
+ m_File.SetFileSize(0);
+ Header = LogFileHeader{.Magic = kLogFileMagic, .Version = kLogFileVersion};
+ m_File.Write(&Header, sizeof(Header), 0, Ec);
+ if (Ec)
+ {
+ m_File.Close();
+ ZEN_WARN("Session log '{}' header write failed: {} - persistence disabled", m_Path, Ec.message());
+ return;
+ }
+ m_WriteOffset = kLogFileHeaderSize;
+ }
+ else
+ {
+ m_WriteOffset = Size;
+ }
+
+ m_Enabled = true;
+}
+
+void
+SessionLog::Append(const SessionsService::LogEntryInput& Entry)
+{
+ if (!m_Enabled)
{
- // For now, just return a new log for each session. We can implement actual log storage and retrieval later.
- return Ref(new SessionLog(m_StoragePath / (SessionId.ToString() + ".log")));
+ return;
}
+ CbObjectWriter Writer;
+ WriteLogEntryFields(Writer, Entry);
+ CbObject Obj = Writer.Save();
+
+ // Write directly from the CbObject's owned buffer — no need to allocate
+ // a fresh UniqueBuffer and memcpy just to hand the bytes to BasicFile::Write.
+ const MemoryView View = Obj.GetView();
+ const uint64_t ObjSize = View.GetSize();
+ if (ObjSize == 0 || ObjSize > std::numeric_limits<uint32_t>::max())
+ {
+ return;
+ }
+ const uint32_t Len = static_cast<uint32_t>(ObjSize);
+
+ std::lock_guard<std::mutex> Lock(m_Mutex);
+ std::error_code Ec;
+ m_File.Write(&Len, sizeof(Len), m_WriteOffset, Ec);
+ if (Ec)
+ {
+ return;
+ }
+ m_File.Write(View.GetData(), ObjSize, m_WriteOffset + sizeof(Len), Ec);
+ if (Ec)
+ {
+ return;
+ }
+ m_WriteOffset += sizeof(Len) + ObjSize;
+}
+
+SessionLog::LoadResult
+SessionLog::LoadTail(size_t MaxEntries)
+{
+ std::lock_guard<std::mutex> Lock(m_Mutex);
+ LoadResult Result;
+
+ if (!m_Enabled)
+ {
+ return Result;
+ }
+
+ std::error_code Ec;
+ const uint64_t Size = m_File.FileSize(Ec);
+ if (Ec || Size <= kLogFileHeaderSize)
+ {
+ return Result;
+ }
+
+ IoBuffer Buffer;
+ try
+ {
+ Buffer = m_File.ReadRange(kLogFileHeaderSize, Size - kLogFileHeaderSize);
+ }
+ catch (const std::exception& E)
+ {
+ ZEN_WARN("Session log '{}' tail read failed: {}", m_Path, E.what());
+ return Result;
+ }
+ const uint8_t* Data = reinterpret_cast<const uint8_t*>(Buffer.GetData());
+ const uint64_t DataSize = Buffer.GetSize();
+
+ // Walk all valid record positions, ignoring any partial trailing record.
+ struct RecRef
+ {
+ const uint8_t* Ptr;
+ uint32_t Len;
+ };
+ std::vector<RecRef> Records;
+ uint64_t Pos = 0;
+ while (Pos + sizeof(uint32_t) <= DataSize)
+ {
+ uint32_t RecLen = 0;
+ std::memcpy(&RecLen, Data + Pos, sizeof(RecLen));
+ const uint64_t Next = Pos + sizeof(uint32_t) + RecLen;
+ if (RecLen == 0 || Next > DataSize)
+ {
+ break;
+ }
+ Records.push_back(RecRef{.Ptr = Data + Pos + sizeof(uint32_t), .Len = RecLen});
+ Pos = Next;
+ }
+
+ // Parse every record so TotalCount reflects how many actually decode
+ // — cursors anchor to this number, and counting CB-corrupt records
+ // would shift subsequent cursor math off. Only the trailing window
+ // of size MaxEntries is materialized into TailEntries; head records
+ // are parsed purely for the count and discarded.
+ const size_t TailStart = Records.size() > MaxEntries ? Records.size() - MaxEntries : 0;
+ Result.TailEntries.reserve(Records.size() - TailStart);
+ Result.OwnedBuffers.reserve(Records.size() - TailStart);
+
+ for (size_t i = 0; i < Records.size(); ++i)
+ {
+ try
+ {
+ IoBuffer RecBuf = IoBufferBuilder::MakeCloneFromMemory(MemoryView(Records[i].Ptr, Records[i].Len));
+ CbObject Obj = LoadCompactBinaryObject(std::move(RecBuf));
+
+ SessionsService::LogEntryInput Input;
+ if (ReadLogEntry(Obj, Input))
+ {
+ ++Result.TotalCount;
+ if (i >= TailStart)
+ {
+ // Keep the CbObject alive for as long as the views
+ // in the input are needed — PreloadEntries will copy
+ // the strings into the session arena and we drop the
+ // buffer set when LoadResult is destroyed.
+ Result.TailEntries.push_back(std::move(Input));
+ Result.OwnedBuffers.push_back(std::move(Obj));
+ }
+ }
+ }
+ catch (const std::exception&)
+ {
+ // Skip malformed record — does not contribute to TotalCount.
+ }
+ }
+
+ return Result;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+class SessionLogStore
+{
+public:
+ explicit SessionLogStore(std::filesystem::path StoragePath);
+
+ Ref<SessionLog> GetOrCreateLogForSession(const Oid& SessionId);
+ void WriteSessionInfoFile(const SessionsService::SessionInfo& Info);
+ void DeleteSession(const Oid& SessionId);
+ uint64_t GetSessionSize(const Oid& SessionId) const;
+
+ struct PersistedSession
+ {
+ SessionsService::SessionInfo Info;
+ std::filesystem::path LogPath;
+ };
+ std::vector<PersistedSession> Scan() const;
+
private:
+ static LoggerRef Log()
+ {
+ static LoggerRef L(logging::Get("sessions"));
+ return L;
+ }
+
std::filesystem::path m_StoragePath;
};
-SessionsService::Session::Session(const SessionInfo& Info) : m_Info(Info)
+SessionLogStore::SessionLogStore(std::filesystem::path StoragePath) : m_StoragePath(std::move(StoragePath))
{
+ std::error_code Ec;
+ std::filesystem::create_directories(m_StoragePath, Ec);
+}
+
+Ref<SessionLog>
+SessionLogStore::GetOrCreateLogForSession(const Oid& SessionId)
+{
+ const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId);
+ std::error_code Ec;
+ std::filesystem::create_directories(Dir, Ec);
+ return Ref(new SessionLog(Dir / "log.bin"));
+}
+
+void
+SessionLogStore::DeleteSession(const Oid& SessionId)
+{
+ const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId);
+ std::error_code Ec;
+ std::filesystem::remove_all(Dir, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to remove session directory '{}': {}", Dir, Ec.message());
+ }
+}
+
+uint64_t
+SessionLogStore::GetSessionSize(const Oid& SessionId) const
+{
+ const std::filesystem::path Dir = SessionDir(m_StoragePath, SessionId);
+ std::error_code Ec;
+ uint64_t Total = 0;
+ std::filesystem::directory_iterator It{Dir, Ec};
+ if (Ec)
+ {
+ return 0;
+ }
+ for (const std::filesystem::directory_entry& Entry : It)
+ {
+ std::error_code FileEc;
+ if (Entry.is_regular_file(FileEc))
+ {
+ const uintmax_t Size = Entry.file_size(FileEc);
+ if (!FileEc)
+ {
+ Total += uint64_t(Size);
+ }
+ }
+ }
+ return Total;
}
-SessionsService::Session::~Session() = default;
void
-SessionsService::Session::AppendLog(LogEntry Entry)
+SessionLogStore::WriteSessionInfoFile(const SessionsService::SessionInfo& Info)
{
+ const std::filesystem::path Dir = SessionDir(m_StoragePath, Info.Id);
+ std::error_code Ec;
+ std::filesystem::create_directories(Dir, Ec);
+
+ CbObjectWriter Writer;
+ WriteSessionInfoFields(Writer, Info);
+ CbObject Obj = Writer.Save();
+ const MemoryView View = Obj.GetView();
+ if (View.GetSize() == 0)
+ {
+ return;
+ }
+ TemporaryFile::SafeWriteFile(Dir / "info.cb", View, Ec);
+}
+
+std::vector<SessionLogStore::PersistedSession>
+SessionLogStore::Scan() const
+{
+ std::vector<PersistedSession> Result;
+ std::error_code Ec;
+
+ if (!std::filesystem::exists(m_StoragePath, Ec))
+ {
+ return Result;
+ }
+
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator{m_StoragePath, Ec})
+ {
+ if (Ec || !Entry.is_directory(Ec))
+ {
+ continue;
+ }
+
+ const std::filesystem::path InfoPath = Entry.path() / "info.cb";
+ const std::filesystem::path LogPath = Entry.path() / "log.bin";
+
+ if (!std::filesystem::exists(InfoPath, Ec))
+ {
+ continue;
+ }
+
+ try
+ {
+ BasicFile InfoFile;
+ std::error_code OpenEc;
+ InfoFile.Open(InfoPath, BasicFile::Mode::kRead, OpenEc);
+ if (OpenEc)
+ {
+ continue;
+ }
+ IoBuffer InfoBuf = InfoFile.ReadAll();
+ if (InfoBuf.GetSize() == 0)
+ {
+ continue;
+ }
+ CbObject InfoObj = LoadCompactBinaryObject(std::move(InfoBuf));
+ PersistedSession PS{.Info = SessionsService::SessionInfo{
+ .Id = Oid::Zero,
+ .CreatedAt = DateTime{0},
+ .UpdatedAt = DateTime{0},
+ }};
+ if (!ReadSessionInfo(InfoObj, PS.Info))
+ {
+ continue;
+ }
+ PS.LogPath = LogPath;
+ Result.push_back(std::move(PS));
+ }
+ catch (const std::exception& E)
+ {
+ ZEN_WARN("Skipping session directory '{}': {}", Entry.path(), E.what());
+ }
+ }
+
+ return Result;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+SessionsService::Session::Session(const SessionInfo& Info, Ref<SessionLog> Log, ProcessHandle ClientProcess)
+: m_Info(Info)
+, m_Log(std::move(Log))
+, m_ClientProcess(std::move(ClientProcess))
+{
+}
+SessionsService::Session::~Session() = default;
+
+const char*
+SessionsService::Session::InternLoggerNameLocked(std::string_view Name)
+{
+ if (Name.empty())
+ {
+ return "";
+ }
+ if (auto It = m_InternedLoggerNames.find(Name); It != m_InternedLoggerNames.end())
+ {
+ return It->second;
+ }
+ const char* Arena = m_LogArena.DuplicateString(Name);
+ // The map's key view borrows from Arena (which lives as long as the
+ // session does), so it's safe to outlive `Name`.
+ m_InternedLoggerNames.emplace(std::string_view{Arena, Name.size()}, Arena);
+ return Arena;
+}
+
+const char*
+SessionsService::Session::AllocateLogStringLocked(std::string_view Str)
+{
+ if (Str.empty())
+ {
+ return "";
+ }
+ return m_LogArena.DuplicateString(Str);
+}
+
+uint64_t
+SessionsService::Session::AppendLog(LogEntryInput Input)
+{
+ // Persist first (outside the deque lock ordering) so disk I/O doesn't
+ // starve cursor readers holding the shared lock. The SessionLog has its
+ // own internal mutex that serializes file writes.
+ if (m_Log)
+ {
+ m_Log->Append(Input);
+ }
+
RwLock::ExclusiveLockScope Lock(m_LogLock);
- m_LogEntries.push_back(std::move(Entry));
- ++m_TotalAppended;
+ m_LogEntries.emplace_back();
+ LogEntry& Entry = m_LogEntries.back();
+ Entry.Timestamp = Input.Timestamp;
+ Entry.Level = Input.Level;
+ Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName);
+ Entry.Message = AllocateLogStringLocked(Input.Message);
+ Entry.Format = AllocateLogStringLocked(Input.Format);
+ Entry.Fields = std::move(Input.Fields);
+ const uint64_t NewCursor = ++m_TotalAppended;
while (m_LogEntries.size() > MaxLogEntries)
{
m_LogEntries.pop_front();
}
+ return NewCursor;
+}
+
+uint64_t
+SessionsService::Session::AppendLogBatch(std::span<LogEntryInput> Inputs)
+{
+ if (Inputs.empty())
+ {
+ return 0;
+ }
+
+ // Persist first (per-entry; SessionLog's internal mutex serializes
+ // these writes). We do this outside m_LogLock so file I/O doesn't
+ // stall cursor readers.
+ if (m_Log)
+ {
+ for (LogEntryInput& Input : Inputs)
+ {
+ m_Log->Append(Input);
+ }
+ }
+
+ RwLock::ExclusiveLockScope Lock(m_LogLock);
+ for (LogEntryInput& Input : Inputs)
+ {
+ m_LogEntries.emplace_back();
+ LogEntry& Entry = m_LogEntries.back();
+ Entry.Timestamp = Input.Timestamp;
+ Entry.Level = Input.Level;
+ Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName);
+ Entry.Message = AllocateLogStringLocked(Input.Message);
+ Entry.Format = AllocateLogStringLocked(Input.Format);
+ Entry.Fields = std::move(Input.Fields);
+ ++m_TotalAppended;
+ }
+ while (m_LogEntries.size() > MaxLogEntries)
+ {
+ m_LogEntries.pop_front();
+ }
+ return m_TotalAppended;
+}
+
+void
+SessionsService::Session::PreloadEntries(std::span<const LogEntryInput> Tail, uint64_t TotalCount)
+{
+ RwLock::ExclusiveLockScope Lock(m_LogLock);
+ m_LogEntries.clear();
+ for (const LogEntryInput& Input : Tail)
+ {
+ m_LogEntries.emplace_back();
+ LogEntry& Entry = m_LogEntries.back();
+ Entry.Timestamp = Input.Timestamp;
+ Entry.Level = Input.Level;
+ Entry.LoggerName = InternLoggerNameLocked(Input.LoggerName);
+ Entry.Message = AllocateLogStringLocked(Input.Message);
+ Entry.Format = AllocateLogStringLocked(Input.Format);
+ Entry.Fields = Input.Fields;
+ }
+ m_TotalAppended = TotalCount;
}
std::vector<SessionsService::LogEntry>
@@ -118,17 +792,134 @@ SessionsService::Session::GetLogEntriesAfter(uint64_t AfterCursor) const
};
}
+uint64_t
+SessionsService::AppendLog(const Oid& SessionId, LogEntryInput Input)
+{
+ // Resolve the session without holding any external lock — GetSession
+ // acquires m_Lock shared briefly and returns a ref-counted handle.
+ Ref<Session> Target = GetSession(SessionId);
+ if (!Target)
+ {
+ return 0;
+ }
+
+ const uint64_t NewCursor = Target->AppendLog(std::move(Input));
+
+ // Fire after Session::m_LogLock is released (inside AppendLog) so the
+ // callback can safely call back into this service (e.g. to resolve
+ // the session again and fetch the delta for its subscribers) without
+ // any nested-lock concerns.
+ if (m_LogAppendedCallback)
+ {
+ m_LogAppendedCallback(SessionId, NewCursor);
+ }
+ return NewCursor;
+}
+
+uint64_t
+SessionsService::AppendLogBatch(const Oid& SessionId, std::span<LogEntryInput> Inputs)
+{
+ if (Inputs.empty())
+ {
+ return 0;
+ }
+ Ref<Session> Target = GetSession(SessionId);
+ if (!Target)
+ {
+ return 0;
+ }
+
+ const uint64_t NewCursor = Target->AppendLogBatch(Inputs);
+
+ // One callback fires for the whole batch — subscribers see all
+ // entries in a single delta rather than N separate deltas. Fired
+ // after Session::m_LogLock is released for the same reason as the
+ // single-entry path.
+ if (m_LogAppendedCallback)
+ {
+ m_LogAppendedCallback(SessionId, NewCursor);
+ }
+ return NewCursor;
+}
+
+void
+SessionsService::SetLogAppendedCallback(LogAppendedCallback Callback)
+{
+ m_LogAppendedCallback = std::move(Callback);
+}
+
//////////////////////////////////////////////////////////////////////////
-SessionsService::SessionsService() : m_Log(logging::Get("sessions"))
+SessionsService::SessionsService(std::filesystem::path StorageRoot) : m_Log(logging::Get("sessions"))
{
+ if (StorageRoot.empty())
+ {
+ return;
+ }
+
+ m_SessionLogs = std::make_unique<SessionLogStore>(StorageRoot);
+
+ // Load all previously-persisted sessions as ended sessions. Their log
+ // tails are preloaded into the in-memory deque so the UI can view them.
+ std::vector<SessionLogStore::PersistedSession> Persisted = m_SessionLogs->Scan();
+ for (SessionLogStore::PersistedSession& PS : Persisted)
+ {
+ // Sessions that were active at shutdown need a synthetic ended time so
+ // they sort correctly in the UI.
+ if (PS.Info.EndedAt.GetTicks() == 0)
+ {
+ PS.Info.EndedAt = PS.Info.UpdatedAt;
+ }
+
+ Ref<Session> S = Ref(new Session(PS.Info));
+
+ // Load the log tail (no SessionLog attached — historical sessions do
+ // not receive new appends and the file handle is released here).
+ // PreloadEntries copies/interns each input's strings into the
+ // session's arena, after which Loaded.OwnedBuffers can be
+ // released along with the rest of the LoadResult.
+ SessionLog Reader(PS.LogPath);
+ SessionLog::LoadResult Loaded = Reader.LoadTail(Session::MaxLogEntries);
+ S->PreloadEntries(Loaded.TailEntries, Loaded.TotalCount);
+
+ m_EndedSessions.push_back(std::move(S));
+ }
+
+ ZEN_INFO("Sessions service loaded {} persisted session(s) from '{}'", m_EndedSessions.size(), StorageRoot);
}
SessionsService::~SessionsService() = default;
bool
-SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, const Oid& JobId, CbObjectView Metadata)
+SessionsService::RegisterSession(const Oid& SessionId,
+ std::string AppName,
+ std::string Mode,
+ std::string Platform,
+ uint32_t ClientPid,
+ const Oid& ParentSessionId,
+ const Oid& JobId,
+ CbObjectView Metadata)
{
+ // Open a process handle eagerly — BEFORE any pid-reuse window opens. On
+ // Windows the handle is tied to the specific process instance, so a
+ // later pid recycle can't fool later liveness checks. Do the syscall
+ // outside the service lock (it's a kernel round-trip). On POSIX this is
+ // effectively a no-op (just stores the pid).
+ ProcessHandle ClientProcess;
+ if (ClientPid != 0)
+ {
+ std::error_code Ec;
+ ClientProcess.Initialize(static_cast<int>(ClientPid), Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Session {} registered with pid {} but OpenProcess failed: {} — liveness tracking disabled",
+ SessionId,
+ ClientPid,
+ Ec.message());
+ }
+ }
+
+ SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}};
// Log outside the lock scope - InProcSessionLogSink calls back into
// GetSession() which acquires m_Lock shared, so logging while holding
// m_Lock exclusively would deadlock.
@@ -141,35 +932,69 @@ SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, std:
}
const DateTime Now = DateTime::Now();
- m_Sessions.emplace(SessionId,
- Ref(new Session(SessionInfo{.Id = SessionId,
- .AppName = AppName,
- .Mode = Mode,
- .JobId = JobId,
- .Metadata = CbObject::Clone(Metadata),
- .CreatedAt = Now,
- .UpdatedAt = Now})));
+ SessionInfo Info{.Id = SessionId,
+ .AppName = AppName,
+ .Mode = Mode,
+ .Platform = Platform,
+ .ClientPid = ClientPid,
+ .ParentSessionId = ParentSessionId,
+ .JobId = JobId,
+ .Metadata = CbObject::Clone(Metadata),
+ .CreatedAt = Now,
+ .UpdatedAt = Now};
+
+ Ref<SessionLog> Log;
+ if (m_SessionLogs)
+ {
+ Log = m_SessionLogs->GetOrCreateLogForSession(SessionId);
+ }
+ m_Sessions.emplace(SessionId, Ref(new Session(Info, std::move(Log), std::move(ClientProcess))));
+ PersistedInfo = std::move(Info);
+ }
+
+ if (m_SessionLogs)
+ {
+ m_SessionLogs->WriteSessionInfoFile(PersistedInfo);
}
- ZEN_INFO("Session {} registered (AppName: {}, Mode: {}, JobId: {})", SessionId, AppName, Mode, JobId);
+ // Include the tracked pid so the log makes it obvious whether the client
+ // opted into liveness tracking (and whether our OpenProcess succeeded).
+ // "pid: 0" = no liveness tracking (remote or client didn't report);
+ // "pid: N" with an immediately-prior warning = OpenProcess failed.
+ ZEN_INFO("Session {} registered (AppName: {}, Mode: {}, Platform: {}, Pid: {}, ParentSessionId: {}, JobId: {})",
+ SessionId,
+ AppName,
+ Mode,
+ Platform,
+ ClientPid,
+ ParentSessionId,
+ JobId);
return true;
}
bool
SessionsService::UpdateSession(const Oid& SessionId, CbObjectView Metadata)
{
- RwLock::ExclusiveLockScope Lock(m_Lock);
-
- auto It = m_Sessions.find(SessionId);
- if (It == m_Sessions.end())
+ SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}};
{
- return false;
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+
+ auto It = m_Sessions.find(SessionId);
+ if (It == m_Sessions.end())
+ {
+ return false;
+ }
+
+ It.value()->UpdateMetadata(Metadata);
+ PersistedInfo = It.value()->Info();
}
- It.value()->UpdateMetadata(Metadata);
+ if (m_SessionLogs)
+ {
+ m_SessionLogs->WriteSessionInfoFile(PersistedInfo);
+ }
- const SessionInfo& Info = It.value()->Info();
- ZEN_DEBUG("Session {} updated (AppName: {}, JobId: {})", SessionId, Info.AppName, Info.JobId);
+ ZEN_DEBUG("Session {} updated (AppName: {}, JobId: {})", SessionId, PersistedInfo.AppName, PersistedInfo.JobId);
return true;
}
@@ -178,13 +1003,23 @@ SessionsService::GetSession(const Oid& SessionId) const
{
RwLock::SharedLockScope Lock(m_Lock);
- auto It = m_Sessions.find(SessionId);
- if (It == m_Sessions.end())
+ if (auto It = m_Sessions.find(SessionId); It != m_Sessions.end())
{
- return {};
+ return It->second;
+ }
+
+ // Fall back to ended sessions so HTTP consumers can fetch logs/metadata
+ // for sessions that have finished (including sessions loaded from disk on
+ // startup as historical ended sessions).
+ for (const Ref<Session>& Ended : m_EndedSessions)
+ {
+ if (Ended->Info().Id == SessionId)
+ {
+ return Ended;
+ }
}
- return It->second;
+ return {};
}
std::vector<Ref<SessionsService::Session>>
@@ -202,10 +1037,14 @@ SessionsService::GetSessions() const
}
bool
-SessionsService::RemoveSession(const Oid& SessionId)
+SessionsService::RemoveSession(const Oid& SessionId, std::string_view Reason)
{
- std::string RemovedAppName;
- Oid RemovedJobId;
+ std::string RemovedAppName;
+ Oid RemovedJobId;
+ SessionInfo PersistedInfo{.Id = Oid::Zero, .CreatedAt = DateTime{0}, .UpdatedAt = DateTime{0}};
+ bool Persist = false;
+ Ref<Session> Ended;
+ const DateTime EndTime = DateTime::Now();
{
RwLock::ExclusiveLockScope Lock(m_Lock);
@@ -219,13 +1058,50 @@ SessionsService::RemoveSession(const Oid& SessionId)
RemovedAppName = It.value()->Info().AppName;
RemovedJobId = It.value()->Info().JobId;
- Ref<Session> Ended = It.value();
- Ended->SetEndedAt(DateTime::Now());
- m_EndedSessions.push_back(std::move(Ended));
+ Ended = It.value();
+ Ended->SetEndedAt(EndTime);
+ if (m_SessionLogs)
+ {
+ PersistedInfo = Ended->Info();
+ Persist = true;
+ }
+ m_EndedSessions.push_back(Ended);
m_Sessions.erase(It);
}
+ // Synthetic "Session ended" entry is appended *after* m_Lock is
+ // released. AppendLog hits disk via SessionLog::Append, so doing it
+ // inside the exclusive scope would block every other session lookup
+ // / registration / list while the I/O completes. The callback that
+ // pushes the delta to WS subscribers also fires from outside the
+ // lock — same self-deadlock concern as the GetSession path.
+ uint64_t SyntheticEndCursor = 0;
+ {
+ ExtendableStringBuilder<128> MsgBuilder;
+ MsgBuilder << "Session ended"sv;
+ if (!Reason.empty())
+ {
+ MsgBuilder << ": "sv << Reason;
+ }
+ SyntheticEndCursor = Ended->AppendLog(LogEntryInput{
+ .Timestamp = EndTime,
+ .Level = logging::Info,
+ .LoggerName = "sessions"sv,
+ .Message = MsgBuilder.ToView(),
+ });
+ }
+
+ if (m_LogAppendedCallback)
+ {
+ m_LogAppendedCallback(SessionId, SyntheticEndCursor);
+ }
+
+ if (Persist)
+ {
+ m_SessionLogs->WriteSessionInfoFile(PersistedInfo);
+ }
+
ZEN_INFO("Session {} removed (AppName: {}, JobId: {})", SessionId, RemovedAppName, RemovedJobId);
return true;
}
@@ -244,4 +1120,202 @@ SessionsService::GetSessionCount() const
return m_Sessions.size();
}
+// The "when was this session last relevant" timestamp used for age checks
+// and count-based eviction ordering: EndedAt if set, otherwise UpdatedAt.
+static DateTime
+ReferenceTime(const SessionsService::SessionInfo& Info)
+{
+ return Info.EndedAt.GetTicks() != 0 ? Info.EndedAt : Info.UpdatedAt;
+}
+
+size_t
+SessionsService::CheckProcessLiveness()
+{
+ // Snapshot active sessions under a shared lock, then probe liveness and
+ // drop dead ones without holding the service lock (IsRunning() is a
+ // kernel round-trip and RemoveSession() takes the lock exclusively).
+ std::vector<Ref<Session>> Candidates;
+ {
+ RwLock::SharedLockScope Lock(m_Lock);
+ Candidates.reserve(m_Sessions.size());
+ for (const auto& [Id, SessionRef] : m_Sessions)
+ {
+ if (SessionRef->GetClientProcess().IsValid())
+ {
+ Candidates.push_back(SessionRef);
+ }
+ }
+ }
+
+ size_t Ended = 0;
+ for (const Ref<Session>& S : Candidates)
+ {
+ // m_ClientProcess is set once at construction and never mutated, so
+ // reading it here without synchronization is safe.
+ if (!S->GetClientProcess().IsRunning())
+ {
+ // Build the termination reason. On Windows, GetExitCode() returns
+ // the real OS exit code and is cheap right after !IsRunning(); we
+ // map the common NTSTATUS codes (Ctrl-C, access violation, DLL
+ // init failure, …) to human-readable names. On POSIX the exit
+ // code is only populated via Wait() which we never call, so
+ // stick with the plain reason.
+ std::string Reason = "process exited";
+#if ZEN_PLATFORM_WINDOWS
+ const uint32_t ExitCode = static_cast<uint32_t>(S->GetClientProcess().GetExitCode());
+ if (ExitCode != 0)
+ {
+ Reason = DescribeWindowsExitCode(ExitCode);
+ }
+#endif
+ if (RemoveSession(S->Info().Id, Reason))
+ {
+ ++Ended;
+ }
+ }
+ }
+ return Ended;
+}
+
+SessionsService::PruneResult
+SessionsService::PruneExpired(TimeSpan MaxAge, size_t MaxCount, uint64_t MaxStorageBytes)
+{
+ PruneResult Result;
+ std::vector<Oid> ToDelete;
+
+ // Phase 1: age + count pruning (fast; purely in-memory).
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+
+ const uint64_t NowTicks = DateTime::Now().GetTicks();
+ const uint64_t CutoffTicks = NowTicks > MaxAge.GetTicks() ? NowTicks - MaxAge.GetTicks() : 0;
+ const DateTime Cutoff{CutoffTicks};
+
+ // Age-based pruning: drop ended sessions whose reference time is
+ // older than Cutoff.
+ auto ExpiredIt = std::remove_if(m_EndedSessions.begin(), m_EndedSessions.end(), [&](const Ref<Session>& S) {
+ if (ReferenceTime(S->Info()) < Cutoff)
+ {
+ ToDelete.push_back(S->Info().Id);
+ return true;
+ }
+ return false;
+ });
+ Result.ExpiredByAge = size_t(m_EndedSessions.end() - ExpiredIt);
+ m_EndedSessions.erase(ExpiredIt, m_EndedSessions.end());
+
+ // Count-based pruning: keep at most MaxCount sessions total. Active
+ // sessions are never touched; if there are already >= MaxCount active
+ // sessions then all ended sessions get evicted.
+ const size_t ActiveCount = m_Sessions.size();
+ const size_t EndedTarget = MaxCount > ActiveCount ? MaxCount - ActiveCount : 0;
+ if (m_EndedSessions.size() > EndedTarget)
+ {
+ const size_t ToRemove = m_EndedSessions.size() - EndedTarget;
+ // Move the `ToRemove` oldest entries to the front.
+ std::partial_sort(
+ m_EndedSessions.begin(),
+ m_EndedSessions.begin() + ToRemove,
+ m_EndedSessions.end(),
+ [](const Ref<Session>& A, const Ref<Session>& B) { return ReferenceTime(A->Info()) < ReferenceTime(B->Info()); });
+ for (size_t i = 0; i < ToRemove; ++i)
+ {
+ ToDelete.push_back(m_EndedSessions[i]->Info().Id);
+ }
+ m_EndedSessions.erase(m_EndedSessions.begin(), m_EndedSessions.begin() + ToRemove);
+ Result.ExpiredByCount = ToRemove;
+ }
+ }
+
+ // Phase 1 disk deletion, outside the service lock.
+ if (m_SessionLogs)
+ {
+ for (const Oid& Id : ToDelete)
+ {
+ m_SessionLogs->DeleteSession(Id);
+ }
+ }
+
+ // Phase 2: storage-footprint pruning. Snapshot remaining ended sessions
+ // (id + reference time) under a shared lock, then stat each directory
+ // outside the lock so we don't hold writers off during filesystem calls.
+ if (!m_SessionLogs)
+ {
+ return Result;
+ }
+
+ struct Candidate
+ {
+ Oid Id;
+ DateTime RefTime;
+ uint64_t Size = 0;
+ };
+ std::vector<Candidate> Candidates;
+ {
+ RwLock::SharedLockScope Lock(m_Lock);
+ Candidates.reserve(m_EndedSessions.size());
+ for (const Ref<Session>& S : m_EndedSessions)
+ {
+ Candidates.push_back(Candidate{.Id = S->Info().Id, .RefTime = ReferenceTime(S->Info())});
+ }
+ }
+
+ uint64_t TotalBytes = 0;
+ for (Candidate& C : Candidates)
+ {
+ C.Size = m_SessionLogs->GetSessionSize(C.Id);
+ TotalBytes += C.Size;
+ }
+
+ if (TotalBytes <= MaxStorageBytes)
+ {
+ return Result;
+ }
+
+ // Oldest first so we evict in chronological order.
+ std::sort(Candidates.begin(), Candidates.end(), [](const Candidate& A, const Candidate& B) { return A.RefTime < B.RefTime; });
+
+ std::vector<Oid> StorageDelete;
+ uint64_t Reclaimed = 0;
+ const uint64_t NeedBytes = TotalBytes - MaxStorageBytes;
+ for (const Candidate& C : Candidates)
+ {
+ if (Reclaimed >= NeedBytes)
+ {
+ break;
+ }
+ StorageDelete.push_back(C.Id);
+ Reclaimed += C.Size;
+ }
+
+ if (StorageDelete.empty())
+ {
+ return Result;
+ }
+
+ // Erase from m_EndedSessions under exclusive lock. Concurrent RemoveSession
+ // calls between the snapshot and here will have inserted new entries at
+ // the back, which we safely leave alone.
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ tsl::robin_map<Oid, uint8_t, Oid::Hasher> IdSet;
+ for (const Oid& Id : StorageDelete)
+ {
+ IdSet[Id] = 1;
+ }
+ auto It = std::remove_if(m_EndedSessions.begin(), m_EndedSessions.end(), [&](const Ref<Session>& S) {
+ return IdSet.contains(S->Info().Id);
+ });
+ m_EndedSessions.erase(It, m_EndedSessions.end());
+ }
+
+ for (const Oid& Id : StorageDelete)
+ {
+ m_SessionLogs->DeleteSession(Id);
+ }
+ Result.ExpiredByStorage = StorageDelete.size();
+
+ return Result;
+}
+
} // namespace zen