diff options
| author | Stefan Boberg <[email protected]> | 2026-03-23 14:19:57 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-23 14:19:57 +0100 |
| commit | 2a445406e09328cb4cf320300f2678997d6775b7 (patch) | |
| tree | a92f02d94c92144cb6ae32160397298533e4c822 /src/zenutil/sessionsclient.cpp | |
| parent | add hub instance crash recovery (#885) (diff) | |
| download | zen-2a445406e09328cb4cf320300f2678997d6775b7.tar.xz zen-2a445406e09328cb4cf320300f2678997d6775b7.zip | |
Dashboard refresh (logs, storage, network, object store, docs) (#835)
## Summary
This PR adds a session management service, several new dashboard pages, and a number of infrastructure improvements.
### Sessions Service
- `SessionsServiceClient` in `zenutil` announces sessions to a remote zenserver with a 15s heartbeat (POST/PUT/DELETE lifecycle)
- Storage server registers itself with its own local sessions service on startup
- Session mode attribute coupled to server mode (Compute, Proxy, Hub, etc.)
- Ended sessions tracked with `ended_at` timestamp; status filtering (Active/Ended/All)
- `--sessions-url` config option for remote session announcement
- In-process log sink (`InProcSessionLogSink`) forwards server log output to the server's own session, visible in the dashboard
### Session Log Viewer
- POST/GET endpoints for session logs (`/sessions/{id}/log`) supporting raw text and structured JSON/CbObject with batch `entries` array
- In-memory log storage per session (capped at 10k entries) with cursor-based pagination for efficient incremental fetching
- Log panel in the sessions dashboard with incremental DOM updates, auto-scroll (Follow toggle), newest-first toggle, text filter, and log-level coloring
- Auto-selects the server's own session on page load
### TCP Log Streaming
- `LogStreamListener` and `TcpLogStreamSink` for log delivery over TCP
- Sequence numbers on each message with drop detection and synthetic "dropped" notice on gaps
- Gathered buffer writes to reduce syscall overhead when flushing batches
- Tests covering basic delivery, multi-line splitting, drop detection, and sequencing
### New Dashboard Pages
- **Sessions**: master-detail layout with selectable rows, metadata panel, live WebSocket updates, paging, abbreviated date formatting, and "this" pill for the local session
- **Object Store**: summary stats tiles and bucket table with click-to-expand inline object listing (`GET /obj/`)
- **Storage**: per-volume disk usage breakdown (`GET /admin/storage`), Garbage Collection status section (next-run countdown, last-run stats), and GC History table with paginated rows and expandable detail panels
- **Network**: overview tiles, per-service request table, proxy connections, and live WebSocket updates; distinct client IPs and session counts via HyperLogLog
### Documentation Page
- In-dashboard Docs page with sidebar navigation, markdown rendering (via `marked`), Mermaid diagram support (theme-aware), collapsible sections, text filtering with highlighting, and cross-document linking
- New user-facing docs: `overview.md` (with architecture and per-mode diagrams), `sessions.md`, `cache.md`, `projects.md`; updated `compute.md`
- Dev docs moved to `docs/dev/`
### Infrastructure & Bug Fixes
- **Deflate compression** for the embedded frontend zip (~3.4MB → ~950KB); zlib inflate support added to `ZipFs` with cached decompressed buffers
- **Local IP addresses**: `GetLocalIpAddresses()` (Windows via `GetAdaptersAddresses`, Linux/Mac via `getifaddrs`); surfaced in `/status/status`, `/health/info`, and the dashboard banner
- **Dashboard nav**: unified into `zen-nav` web component with `MutationObserver` for dynamically added links, CSS `::part()` to merge banner/nav border radii, and prefix-based active link detection
- Stats broadcast refactored from manual JSON string concatenation to `CbObjectWriter`; `CbObject`-to-JS conversion improved for `TimeSpan`, `DateTime`, and large integers
- Stats WebSocket boilerplate consolidated into `ZenPage.connect_stats_ws()`
Diffstat (limited to 'src/zenutil/sessionsclient.cpp')
| -rw-r--r-- | src/zenutil/sessionsclient.cpp | 377 |
1 files changed, 377 insertions, 0 deletions
diff --git a/src/zenutil/sessionsclient.cpp b/src/zenutil/sessionsclient.cpp new file mode 100644 index 000000000..c62cc4099 --- /dev/null +++ b/src/zenutil/sessionsclient.cpp @@ -0,0 +1,377 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/sessionsclient.h> + +#include <zencore/blockingqueue.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> +#include <zencore/logging/logmsg.h> +#include <zencore/thread.h> +#include <zenhttp/httpclient.h> + +#include <thread> +#include <vector> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +////////////////////////////////////////////////////////////////////////// +// +// SessionLogSink — batching log sink that forwards to /sessions/{id}/log +// + +static const char* +LogLevelToString(logging::LogLevel Level) +{ + switch (Level) + { + case logging::Trace: + return "trace"; + case logging::Debug: + return "debug"; + case logging::Info: + return "info"; + case logging::Warn: + return "warn"; + case logging::Err: + return "error"; + case logging::Critical: + return "critical"; + default: + return "info"; + } +} + +struct BufferedLogEntry +{ + enum class Type : uint8_t + { + Log, + Flush, + Shutdown + }; + + Type Type = Type::Log; + std::string Level; + std::string Message; +}; + +class SessionLogSink final : public logging::Sink +{ +public: + SessionLogSink(std::string TargetUrl, std::string LogPath) : m_LogPath(std::move(LogPath)) + { + HttpClientSettings Settings; + Settings.ConnectTimeout = std::chrono::milliseconds(3000); + m_Http = std::make_unique<HttpClient>(std::move(TargetUrl), Settings); + + SetLevel(logging::Info); + + m_WorkerThread = std::thread([this]() { + zen::SetCurrentThreadName("SessionLog"); + WorkerLoop(); + }); + } + + ~SessionLogSink() override + { + BufferedLogEntry ShutdownMsg; + ShutdownMsg.Type = BufferedLogEntry::Type::Shutdown; + m_Queue.Enqueue(std::move(ShutdownMsg)); + + if (m_WorkerThread.joinable()) + { + m_WorkerThread.join(); + } + } + + void Log(const logging::LogMessage& Msg) override + { + BufferedLogEntry Entry; + Entry.Type = BufferedLogEntry::Type::Log; + Entry.Level = LogLevelToString(Msg.GetLevel()); + Entry.Message = std::string(Msg.GetPayload()); + m_Queue.Enqueue(std::move(Entry)); + } + + void Flush() override + { + // Best-effort: enqueue a flush marker so the worker sends any pending entries + BufferedLogEntry FlushMsg; + FlushMsg.Type = BufferedLogEntry::Type::Flush; + m_Queue.Enqueue(std::move(FlushMsg)); + } + + void SetFormatter(std::unique_ptr<logging::Formatter> /*InFormatter*/) override + { + // No formatting needed — we send raw message text + } + +private: + static constexpr size_t BatchSize = 50; + + void WorkerLoop() + { + std::vector<BufferedLogEntry> Batch; + Batch.reserve(BatchSize); + + BufferedLogEntry Msg; + while (m_Queue.WaitAndDequeue(Msg)) + { + if (Msg.Type == BufferedLogEntry::Type::Shutdown) + { + // Drain remaining log entries + BufferedLogEntry Remaining; + while (m_Queue.WaitAndDequeue(Remaining)) + { + if (Remaining.Type == BufferedLogEntry::Type::Log) + { + Batch.push_back(std::move(Remaining)); + } + } + if (!Batch.empty()) + { + SendBatch(Batch); + } + return; + } + + if (Msg.Type == BufferedLogEntry::Type::Flush) + { + if (!Batch.empty()) + { + SendBatch(Batch); + Batch.clear(); + } + continue; + } + + // Log entry + Batch.push_back(std::move(Msg)); + + if (Batch.size() >= BatchSize) + { + SendBatch(Batch); + Batch.clear(); + } + else + { + // Drain any additional queued entries without blocking + while (Batch.size() < BatchSize && m_Queue.Size() > 0) + { + BufferedLogEntry Extra; + if (m_Queue.WaitAndDequeue(Extra)) + { + if (Extra.Type == BufferedLogEntry::Type::Shutdown) + { + if (!Batch.empty()) + { + SendBatch(Batch); + } + // Drain remaining + while (m_Queue.WaitAndDequeue(Extra)) + { + if (Extra.Type == BufferedLogEntry::Type::Log) + { + Batch.push_back(std::move(Extra)); + } + } + if (!Batch.empty()) + { + SendBatch(Batch); + } + return; + } + if (Extra.Type == BufferedLogEntry::Type::Log) + { + Batch.push_back(std::move(Extra)); + } + else if (Extra.Type == BufferedLogEntry::Type::Flush) + { + break; + } + } + } + + if (!Batch.empty()) + { + SendBatch(Batch); + Batch.clear(); + } + } + } + } + + void SendBatch(const std::vector<BufferedLogEntry>& Batch) + { + try + { + CbObjectWriter Writer; + Writer.BeginArray("entries"); + for (const BufferedLogEntry& Entry : Batch) + { + Writer.BeginObject(); + Writer << "level" << Entry.Level; + Writer << "message" << Entry.Message; + Writer.EndObject(); + } + Writer.EndArray(); + + HttpClient::Response Result = m_Http->Post(m_LogPath, Writer.Save()); + (void)Result; // Best-effort + } + catch (const std::exception&) + { + // Best-effort — silently discard on failure + } + } + + std::string m_LogPath; + std::unique_ptr<HttpClient> m_Http; + BlockingQueue<BufferedLogEntry> m_Queue; + std::thread m_WorkerThread; +}; + +SessionsServiceClient::SessionsServiceClient(Options Opts) +: m_Log(logging::Get("sessionsclient")) +, m_Options(std::move(Opts)) +, m_SessionPath(fmt::format("sessions/{}", m_Options.SessionId)) +{ + HttpClientSettings Settings; + Settings.ConnectTimeout = std::chrono::milliseconds(3000); + m_Http = std::make_unique<HttpClient>(m_Options.TargetUrl, Settings); +} + +SessionsServiceClient::~SessionsServiceClient() = default; + +CbObject +SessionsServiceClient::BuildRequestBody(CbObjectView Metadata) const +{ + CbObjectWriter Writer; + Writer << "appname" << m_Options.AppName; + if (!m_Options.Mode.empty()) + { + Writer << "mode" << m_Options.Mode; + } + if (m_Options.JobId != Oid::Zero) + { + Writer << "jobid" << m_Options.JobId; + } + if (Metadata.GetSize() > 0) + { + Writer.AddObject("metadata", Metadata); + } + return Writer.Save(); +} + +bool +SessionsServiceClient::Announce(CbObjectView Metadata) +{ + try + { + CbObject Body = BuildRequestBody(Metadata); + + HttpClient::Response Result = m_Http->Post(m_SessionPath, std::move(Body)); + + if (Result.Error) + { + ZEN_WARN("sessions announce failed for '{}': HTTP error {} - {}", + m_Options.TargetUrl, + static_cast<int>(Result.Error->ErrorCode), + Result.Error->ErrorMessage); + return false; + } + if (!IsHttpOk(Result.StatusCode)) + { + ZEN_WARN("sessions announce failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode)); + return false; + } + + ZEN_INFO("session announced to '{}'", m_Options.TargetUrl); + return true; + } + catch (const std::exception& Ex) + { + ZEN_WARN("sessions announce failed for '{}': {}", m_Options.TargetUrl, Ex.what()); + return false; + } +} + +bool +SessionsServiceClient::UpdateMetadata(CbObjectView Metadata) +{ + try + { + CbObject Body = BuildRequestBody(Metadata); + + MemoryView View = Body.GetView(); + IoBuffer Payload = IoBufferBuilder::MakeCloneFromMemory(View, ZenContentType::kCbObject); + + HttpClient::Response Result = m_Http->Put(m_SessionPath, Payload); + + if (Result.Error) + { + ZEN_WARN("sessions update failed for '{}': HTTP error {} - {}", + m_Options.TargetUrl, + static_cast<int>(Result.Error->ErrorCode), + Result.Error->ErrorMessage); + return false; + } + if (!IsHttpOk(Result.StatusCode)) + { + ZEN_WARN("sessions update failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode)); + return false; + } + + return true; + } + catch (const std::exception& Ex) + { + ZEN_WARN("sessions update failed for '{}': {}", m_Options.TargetUrl, Ex.what()); + return false; + } +} + +bool +SessionsServiceClient::Remove() +{ + try + { + HttpClient::Response Result = m_Http->Delete(m_SessionPath); + + if (Result.Error) + { + ZEN_WARN("sessions remove failed for '{}': HTTP error {} - {}", + m_Options.TargetUrl, + static_cast<int>(Result.Error->ErrorCode), + Result.Error->ErrorMessage); + return false; + } + if (!IsHttpOk(Result.StatusCode)) + { + ZEN_WARN("sessions remove failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode)); + return false; + } + + ZEN_INFO("session removed from '{}'", m_Options.TargetUrl); + return true; + } + catch (const std::exception& Ex) + { + ZEN_WARN("sessions remove failed for '{}': {}", m_Options.TargetUrl, Ex.what()); + return false; + } +} + +logging::SinkPtr +SessionsServiceClient::CreateLogSink() +{ + std::string LogPath = m_SessionPath + "/log"; + return Ref(new SessionLogSink(m_Options.TargetUrl, std::move(LogPath))); +} + +} // namespace zen |