aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/sessionsclient.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-23 14:19:57 +0100
committerGitHub Enterprise <[email protected]>2026-03-23 14:19:57 +0100
commit2a445406e09328cb4cf320300f2678997d6775b7 (patch)
treea92f02d94c92144cb6ae32160397298533e4c822 /src/zenutil/sessionsclient.cpp
parentadd hub instance crash recovery (#885) (diff)
downloadzen-2a445406e09328cb4cf320300f2678997d6775b7.tar.xz
zen-2a445406e09328cb4cf320300f2678997d6775b7.zip
Dashboard refresh (logs, storage, network, object store, docs) (#835)
## Summary This PR adds a session management service, several new dashboard pages, and a number of infrastructure improvements. ### Sessions Service - `SessionsServiceClient` in `zenutil` announces sessions to a remote zenserver with a 15s heartbeat (POST/PUT/DELETE lifecycle) - Storage server registers itself with its own local sessions service on startup - Session mode attribute coupled to server mode (Compute, Proxy, Hub, etc.) - Ended sessions tracked with `ended_at` timestamp; status filtering (Active/Ended/All) - `--sessions-url` config option for remote session announcement - In-process log sink (`InProcSessionLogSink`) forwards server log output to the server's own session, visible in the dashboard ### Session Log Viewer - POST/GET endpoints for session logs (`/sessions/{id}/log`) supporting raw text and structured JSON/CbObject with batch `entries` array - In-memory log storage per session (capped at 10k entries) with cursor-based pagination for efficient incremental fetching - Log panel in the sessions dashboard with incremental DOM updates, auto-scroll (Follow toggle), newest-first toggle, text filter, and log-level coloring - Auto-selects the server's own session on page load ### TCP Log Streaming - `LogStreamListener` and `TcpLogStreamSink` for log delivery over TCP - Sequence numbers on each message with drop detection and synthetic "dropped" notice on gaps - Gathered buffer writes to reduce syscall overhead when flushing batches - Tests covering basic delivery, multi-line splitting, drop detection, and sequencing ### New Dashboard Pages - **Sessions**: master-detail layout with selectable rows, metadata panel, live WebSocket updates, paging, abbreviated date formatting, and "this" pill for the local session - **Object Store**: summary stats tiles and bucket table with click-to-expand inline object listing (`GET /obj/`) - **Storage**: per-volume disk usage breakdown (`GET /admin/storage`), Garbage Collection status section (next-run countdown, last-run stats), and GC History table with paginated rows and expandable detail panels - **Network**: overview tiles, per-service request table, proxy connections, and live WebSocket updates; distinct client IPs and session counts via HyperLogLog ### Documentation Page - In-dashboard Docs page with sidebar navigation, markdown rendering (via `marked`), Mermaid diagram support (theme-aware), collapsible sections, text filtering with highlighting, and cross-document linking - New user-facing docs: `overview.md` (with architecture and per-mode diagrams), `sessions.md`, `cache.md`, `projects.md`; updated `compute.md` - Dev docs moved to `docs/dev/` ### Infrastructure & Bug Fixes - **Deflate compression** for the embedded frontend zip (~3.4MB → ~950KB); zlib inflate support added to `ZipFs` with cached decompressed buffers - **Local IP addresses**: `GetLocalIpAddresses()` (Windows via `GetAdaptersAddresses`, Linux/Mac via `getifaddrs`); surfaced in `/status/status`, `/health/info`, and the dashboard banner - **Dashboard nav**: unified into `zen-nav` web component with `MutationObserver` for dynamically added links, CSS `::part()` to merge banner/nav border radii, and prefix-based active link detection - Stats broadcast refactored from manual JSON string concatenation to `CbObjectWriter`; `CbObject`-to-JS conversion improved for `TimeSpan`, `DateTime`, and large integers - Stats WebSocket boilerplate consolidated into `ZenPage.connect_stats_ws()`
Diffstat (limited to 'src/zenutil/sessionsclient.cpp')
-rw-r--r--src/zenutil/sessionsclient.cpp377
1 files changed, 377 insertions, 0 deletions
diff --git a/src/zenutil/sessionsclient.cpp b/src/zenutil/sessionsclient.cpp
new file mode 100644
index 000000000..c62cc4099
--- /dev/null
+++ b/src/zenutil/sessionsclient.cpp
@@ -0,0 +1,377 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/sessionsclient.h>
+
+#include <zencore/blockingqueue.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
+#include <zencore/logging/logmsg.h>
+#include <zencore/thread.h>
+#include <zenhttp/httpclient.h>
+
+#include <thread>
+#include <vector>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+//
+// SessionLogSink — batching log sink that forwards to /sessions/{id}/log
+//
+
+static const char*
+LogLevelToString(logging::LogLevel Level)
+{
+ switch (Level)
+ {
+ case logging::Trace:
+ return "trace";
+ case logging::Debug:
+ return "debug";
+ case logging::Info:
+ return "info";
+ case logging::Warn:
+ return "warn";
+ case logging::Err:
+ return "error";
+ case logging::Critical:
+ return "critical";
+ default:
+ return "info";
+ }
+}
+
+struct BufferedLogEntry
+{
+ enum class Type : uint8_t
+ {
+ Log,
+ Flush,
+ Shutdown
+ };
+
+ Type Type = Type::Log;
+ std::string Level;
+ std::string Message;
+};
+
+class SessionLogSink final : public logging::Sink
+{
+public:
+ SessionLogSink(std::string TargetUrl, std::string LogPath) : m_LogPath(std::move(LogPath))
+ {
+ HttpClientSettings Settings;
+ Settings.ConnectTimeout = std::chrono::milliseconds(3000);
+ m_Http = std::make_unique<HttpClient>(std::move(TargetUrl), Settings);
+
+ SetLevel(logging::Info);
+
+ m_WorkerThread = std::thread([this]() {
+ zen::SetCurrentThreadName("SessionLog");
+ WorkerLoop();
+ });
+ }
+
+ ~SessionLogSink() override
+ {
+ BufferedLogEntry ShutdownMsg;
+ ShutdownMsg.Type = BufferedLogEntry::Type::Shutdown;
+ m_Queue.Enqueue(std::move(ShutdownMsg));
+
+ if (m_WorkerThread.joinable())
+ {
+ m_WorkerThread.join();
+ }
+ }
+
+ void Log(const logging::LogMessage& Msg) override
+ {
+ BufferedLogEntry Entry;
+ Entry.Type = BufferedLogEntry::Type::Log;
+ Entry.Level = LogLevelToString(Msg.GetLevel());
+ Entry.Message = std::string(Msg.GetPayload());
+ m_Queue.Enqueue(std::move(Entry));
+ }
+
+ void Flush() override
+ {
+ // Best-effort: enqueue a flush marker so the worker sends any pending entries
+ BufferedLogEntry FlushMsg;
+ FlushMsg.Type = BufferedLogEntry::Type::Flush;
+ m_Queue.Enqueue(std::move(FlushMsg));
+ }
+
+ void SetFormatter(std::unique_ptr<logging::Formatter> /*InFormatter*/) override
+ {
+ // No formatting needed — we send raw message text
+ }
+
+private:
+ static constexpr size_t BatchSize = 50;
+
+ void WorkerLoop()
+ {
+ std::vector<BufferedLogEntry> Batch;
+ Batch.reserve(BatchSize);
+
+ BufferedLogEntry Msg;
+ while (m_Queue.WaitAndDequeue(Msg))
+ {
+ if (Msg.Type == BufferedLogEntry::Type::Shutdown)
+ {
+ // Drain remaining log entries
+ BufferedLogEntry Remaining;
+ while (m_Queue.WaitAndDequeue(Remaining))
+ {
+ if (Remaining.Type == BufferedLogEntry::Type::Log)
+ {
+ Batch.push_back(std::move(Remaining));
+ }
+ }
+ if (!Batch.empty())
+ {
+ SendBatch(Batch);
+ }
+ return;
+ }
+
+ if (Msg.Type == BufferedLogEntry::Type::Flush)
+ {
+ if (!Batch.empty())
+ {
+ SendBatch(Batch);
+ Batch.clear();
+ }
+ continue;
+ }
+
+ // Log entry
+ Batch.push_back(std::move(Msg));
+
+ if (Batch.size() >= BatchSize)
+ {
+ SendBatch(Batch);
+ Batch.clear();
+ }
+ else
+ {
+ // Drain any additional queued entries without blocking
+ while (Batch.size() < BatchSize && m_Queue.Size() > 0)
+ {
+ BufferedLogEntry Extra;
+ if (m_Queue.WaitAndDequeue(Extra))
+ {
+ if (Extra.Type == BufferedLogEntry::Type::Shutdown)
+ {
+ if (!Batch.empty())
+ {
+ SendBatch(Batch);
+ }
+ // Drain remaining
+ while (m_Queue.WaitAndDequeue(Extra))
+ {
+ if (Extra.Type == BufferedLogEntry::Type::Log)
+ {
+ Batch.push_back(std::move(Extra));
+ }
+ }
+ if (!Batch.empty())
+ {
+ SendBatch(Batch);
+ }
+ return;
+ }
+ if (Extra.Type == BufferedLogEntry::Type::Log)
+ {
+ Batch.push_back(std::move(Extra));
+ }
+ else if (Extra.Type == BufferedLogEntry::Type::Flush)
+ {
+ break;
+ }
+ }
+ }
+
+ if (!Batch.empty())
+ {
+ SendBatch(Batch);
+ Batch.clear();
+ }
+ }
+ }
+ }
+
+ void SendBatch(const std::vector<BufferedLogEntry>& Batch)
+ {
+ try
+ {
+ CbObjectWriter Writer;
+ Writer.BeginArray("entries");
+ for (const BufferedLogEntry& Entry : Batch)
+ {
+ Writer.BeginObject();
+ Writer << "level" << Entry.Level;
+ Writer << "message" << Entry.Message;
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+
+ HttpClient::Response Result = m_Http->Post(m_LogPath, Writer.Save());
+ (void)Result; // Best-effort
+ }
+ catch (const std::exception&)
+ {
+ // Best-effort — silently discard on failure
+ }
+ }
+
+ std::string m_LogPath;
+ std::unique_ptr<HttpClient> m_Http;
+ BlockingQueue<BufferedLogEntry> m_Queue;
+ std::thread m_WorkerThread;
+};
+
+SessionsServiceClient::SessionsServiceClient(Options Opts)
+: m_Log(logging::Get("sessionsclient"))
+, m_Options(std::move(Opts))
+, m_SessionPath(fmt::format("sessions/{}", m_Options.SessionId))
+{
+ HttpClientSettings Settings;
+ Settings.ConnectTimeout = std::chrono::milliseconds(3000);
+ m_Http = std::make_unique<HttpClient>(m_Options.TargetUrl, Settings);
+}
+
+SessionsServiceClient::~SessionsServiceClient() = default;
+
+CbObject
+SessionsServiceClient::BuildRequestBody(CbObjectView Metadata) const
+{
+ CbObjectWriter Writer;
+ Writer << "appname" << m_Options.AppName;
+ if (!m_Options.Mode.empty())
+ {
+ Writer << "mode" << m_Options.Mode;
+ }
+ if (m_Options.JobId != Oid::Zero)
+ {
+ Writer << "jobid" << m_Options.JobId;
+ }
+ if (Metadata.GetSize() > 0)
+ {
+ Writer.AddObject("metadata", Metadata);
+ }
+ return Writer.Save();
+}
+
+bool
+SessionsServiceClient::Announce(CbObjectView Metadata)
+{
+ try
+ {
+ CbObject Body = BuildRequestBody(Metadata);
+
+ HttpClient::Response Result = m_Http->Post(m_SessionPath, std::move(Body));
+
+ if (Result.Error)
+ {
+ ZEN_WARN("sessions announce failed for '{}': HTTP error {} - {}",
+ m_Options.TargetUrl,
+ static_cast<int>(Result.Error->ErrorCode),
+ Result.Error->ErrorMessage);
+ return false;
+ }
+ if (!IsHttpOk(Result.StatusCode))
+ {
+ ZEN_WARN("sessions announce failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode));
+ return false;
+ }
+
+ ZEN_INFO("session announced to '{}'", m_Options.TargetUrl);
+ return true;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("sessions announce failed for '{}': {}", m_Options.TargetUrl, Ex.what());
+ return false;
+ }
+}
+
+bool
+SessionsServiceClient::UpdateMetadata(CbObjectView Metadata)
+{
+ try
+ {
+ CbObject Body = BuildRequestBody(Metadata);
+
+ MemoryView View = Body.GetView();
+ IoBuffer Payload = IoBufferBuilder::MakeCloneFromMemory(View, ZenContentType::kCbObject);
+
+ HttpClient::Response Result = m_Http->Put(m_SessionPath, Payload);
+
+ if (Result.Error)
+ {
+ ZEN_WARN("sessions update failed for '{}': HTTP error {} - {}",
+ m_Options.TargetUrl,
+ static_cast<int>(Result.Error->ErrorCode),
+ Result.Error->ErrorMessage);
+ return false;
+ }
+ if (!IsHttpOk(Result.StatusCode))
+ {
+ ZEN_WARN("sessions update failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode));
+ return false;
+ }
+
+ return true;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("sessions update failed for '{}': {}", m_Options.TargetUrl, Ex.what());
+ return false;
+ }
+}
+
+bool
+SessionsServiceClient::Remove()
+{
+ try
+ {
+ HttpClient::Response Result = m_Http->Delete(m_SessionPath);
+
+ if (Result.Error)
+ {
+ ZEN_WARN("sessions remove failed for '{}': HTTP error {} - {}",
+ m_Options.TargetUrl,
+ static_cast<int>(Result.Error->ErrorCode),
+ Result.Error->ErrorMessage);
+ return false;
+ }
+ if (!IsHttpOk(Result.StatusCode))
+ {
+ ZEN_WARN("sessions remove failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode));
+ return false;
+ }
+
+ ZEN_INFO("session removed from '{}'", m_Options.TargetUrl);
+ return true;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("sessions remove failed for '{}': {}", m_Options.TargetUrl, Ex.what());
+ return false;
+ }
+}
+
+logging::SinkPtr
+SessionsServiceClient::CreateLogSink()
+{
+ std::string LogPath = m_SessionPath + "/log";
+ return Ref(new SessionLogSink(m_Options.TargetUrl, std::move(LogPath)));
+}
+
+} // namespace zen