diff options
| author | Stefan Boberg <[email protected]> | 2026-03-23 14:19:57 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-23 14:19:57 +0100 |
| commit | 2a445406e09328cb4cf320300f2678997d6775b7 (patch) | |
| tree | a92f02d94c92144cb6ae32160397298533e4c822 /src/zenserver/sessions/httpsessions.cpp | |
| parent | add hub instance crash recovery (#885) (diff) | |
| download | zen-2a445406e09328cb4cf320300f2678997d6775b7.tar.xz zen-2a445406e09328cb4cf320300f2678997d6775b7.zip | |
Dashboard refresh (logs, storage, network, object store, docs) (#835)
## Summary
This PR adds a session management service, several new dashboard pages, and a number of infrastructure improvements.
### Sessions Service
- `SessionsServiceClient` in `zenutil` announces sessions to a remote zenserver with a 15s heartbeat (POST/PUT/DELETE lifecycle)
- Storage server registers itself with its own local sessions service on startup
- Session mode attribute coupled to server mode (Compute, Proxy, Hub, etc.)
- Ended sessions tracked with `ended_at` timestamp; status filtering (Active/Ended/All)
- `--sessions-url` config option for remote session announcement
- In-process log sink (`InProcSessionLogSink`) forwards server log output to the server's own session, visible in the dashboard
### Session Log Viewer
- POST/GET endpoints for session logs (`/sessions/{id}/log`) supporting raw text and structured JSON/CbObject with batch `entries` array
- In-memory log storage per session (capped at 10k entries) with cursor-based pagination for efficient incremental fetching
- Log panel in the sessions dashboard with incremental DOM updates, auto-scroll (Follow toggle), newest-first toggle, text filter, and log-level coloring
- Auto-selects the server's own session on page load
### TCP Log Streaming
- `LogStreamListener` and `TcpLogStreamSink` for log delivery over TCP
- Sequence numbers on each message with drop detection and synthetic "dropped" notice on gaps
- Gathered buffer writes to reduce syscall overhead when flushing batches
- Tests covering basic delivery, multi-line splitting, drop detection, and sequencing
### New Dashboard Pages
- **Sessions**: master-detail layout with selectable rows, metadata panel, live WebSocket updates, paging, abbreviated date formatting, and "this" pill for the local session
- **Object Store**: summary stats tiles and bucket table with click-to-expand inline object listing (`GET /obj/`)
- **Storage**: per-volume disk usage breakdown (`GET /admin/storage`), Garbage Collection status section (next-run countdown, last-run stats), and GC History table with paginated rows and expandable detail panels
- **Network**: overview tiles, per-service request table, proxy connections, and live WebSocket updates; distinct client IPs and session counts via HyperLogLog
### Documentation Page
- In-dashboard Docs page with sidebar navigation, markdown rendering (via `marked`), Mermaid diagram support (theme-aware), collapsible sections, text filtering with highlighting, and cross-document linking
- New user-facing docs: `overview.md` (with architecture and per-mode diagrams), `sessions.md`, `cache.md`, `projects.md`; updated `compute.md`
- Dev docs moved to `docs/dev/`
### Infrastructure & Bug Fixes
- **Deflate compression** for the embedded frontend zip (~3.4MB → ~950KB); zlib inflate support added to `ZipFs` with cached decompressed buffers
- **Local IP addresses**: `GetLocalIpAddresses()` (Windows via `GetAdaptersAddresses`, Linux/Mac via `getifaddrs`); surfaced in `/status/status`, `/health/info`, and the dashboard banner
- **Dashboard nav**: unified into `zen-nav` web component with `MutationObserver` for dynamically added links, CSS `::part()` to merge banner/nav border radii, and prefix-based active link detection
- Stats broadcast refactored from manual JSON string concatenation to `CbObjectWriter`; `CbObject`-to-JS conversion improved for `TimeSpan`, `DateTime`, and large integers
- Stats WebSocket boilerplate consolidated into `ZenPage.connect_stats_ws()`
Diffstat (limited to 'src/zenserver/sessions/httpsessions.cpp')
| -rw-r--r-- | src/zenserver/sessions/httpsessions.cpp | 360 |
1 files changed, 332 insertions, 28 deletions
diff --git a/src/zenserver/sessions/httpsessions.cpp b/src/zenserver/sessions/httpsessions.cpp index 6cf12bea4..429ba98cf 100644 --- a/src/zenserver/sessions/httpsessions.cpp +++ b/src/zenserver/sessions/httpsessions.cpp @@ -3,7 +3,6 @@ #include "httpsessions.h" #include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/trace.h> @@ -12,17 +11,22 @@ namespace zen { using namespace std::literals; -HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService, HttpStatsService& StatsService, SessionsService& Sessions) +HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService, + HttpStatsService& StatsService, + SessionsService& Sessions, + asio::io_context& IoContext) : m_Log(logging::Get("sessions")) , m_StatusService(StatusService) , m_StatsService(StatsService) , m_Sessions(Sessions) +, m_PushTimer(IoContext) { Initialize(); } HttpSessionsService::~HttpSessionsService() { + m_PushTimer.cancel(); m_StatsService.UnregisterHandler("sessions", *this); m_StatusService.UnregisterHandler("sessions", *this); } @@ -107,12 +111,24 @@ HttpSessionsService::Initialize() HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kPut | HttpVerb::kDelete); m_Router.RegisterRoute( + "{session_id}/log", + [this](HttpRouterRequest& Req) { SessionLogRequest(Req); }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( "", [this](HttpRouterRequest& Req) { ListSessionsRequest(Req); }, HttpVerb::kGet); + m_Router.RegisterRoute( + "ws", + [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK); }, + HttpVerb::kGet); + m_StatsService.RegisterHandler("sessions", *this); m_StatusService.RegisterHandler("sessions", *this); + + EnqueuePushTimer(); } static void @@ -123,24 +139,55 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info) { Writer << "appname" << Info.AppName; } + if (!Info.Mode.empty()) + { + Writer << "mode" << Info.Mode; + } if (Info.JobId != Oid::Zero) { Writer << "jobid" << Info.JobId; } Writer << "created_at" << Info.CreatedAt; Writer << "updated_at" << Info.UpdatedAt; + if (Info.EndedAt.GetTicks() != 0) + { + Writer << "ended_at" << Info.EndedAt; + } if (Info.Metadata.GetSize() > 0) { - Writer.BeginObject("metadata"); - for (const CbField& Field : Info.Metadata) - { - Writer.AddField(Field); - } - Writer.EndObject(); + Writer.AddObject("metadata"sv, Info.Metadata); } } +CbObject +HttpSessionsService::BuildSessionListResponse() +{ + std::vector<Ref<SessionsService::Session>> Active = m_Sessions.GetSessions(); + std::vector<Ref<SessionsService::Session>> Ended = m_Sessions.GetEndedSessions(); + + CbObjectWriter Response; + if (m_SelfSessionId != Oid::Zero) + { + Response << "self_id" << m_SelfSessionId; + } + Response.BeginArray("sessions"); + for (const Ref<SessionsService::Session>& Session : Active) + { + Response.BeginObject(); + WriteSessionInfo(Response, Session->Info()); + Response.EndObject(); + } + for (const Ref<SessionsService::Session>& Session : Ended) + { + Response.BeginObject(); + WriteSessionInfo(Response, Session->Info()); + Response.EndObject(); + } + Response.EndArray(); + return Response.Save(); +} + void HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req) { @@ -149,16 +196,35 @@ HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req) m_SessionsStats.SessionListCount++; m_SessionsStats.RequestCount++; - std::vector<Ref<SessionsService::Session>> Sessions = m_Sessions.GetSessions(); + HttpServerRequest::QueryParams Params = ServerRequest.GetQueryParams(); + std::string_view Status = Params.GetValue("status"sv); + + std::vector<Ref<SessionsService::Session>> Sessions; + if (Status == "ended"sv) + { + Sessions = m_Sessions.GetEndedSessions(); + } + else if (Status == "all"sv) + { + Sessions = m_Sessions.GetSessions(); + std::vector<Ref<SessionsService::Session>> Ended = m_Sessions.GetEndedSessions(); + Sessions.insert(Sessions.end(), Ended.begin(), Ended.end()); + } + else + { + Sessions = m_Sessions.GetSessions(); + } CbObjectWriter Response; + if (m_SelfSessionId != Oid::Zero) + { + Response << "self_id" << m_SelfSessionId; + } Response.BeginArray("sessions"); for (const Ref<SessionsService::Session>& Session : Sessions) { Response.BeginObject(); - { - WriteSessionInfo(Response, Session->Info()); - } + WriteSessionInfo(Response, Session->Info()); Response.EndObject(); } Response.EndArray(); @@ -187,30 +253,17 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req) case HttpVerb::kPost: case HttpVerb::kPut: { - IoBuffer Payload = ServerRequest.ReadPayload(); - CbObject RequestObject; - - if (Payload.GetSize() > 0) - { - if (CbValidateError ValidationResult = ValidateCompactBinary(Payload.GetView(), CbValidateMode::All); - ValidationResult != CbValidateError::None) - { - m_SessionsStats.BadRequestCount++; - return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Invalid payload: {}", zen::ToString(ValidationResult))); - } - RequestObject = LoadCompactBinaryObject(Payload); - } + CbObject RequestObject = ServerRequest.ReadPayloadObject(); if (ServerRequest.RequestVerb() == HttpVerb::kPost) { std::string AppName(RequestObject["appname"sv].AsString()); + std::string Mode(RequestObject["mode"sv].AsString()); Oid JobId = RequestObject["jobid"sv].AsObjectId(); CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView(); m_SessionsStats.SessionWriteCount++; - if (m_Sessions.RegisterSession(SessionId, std::move(AppName), JobId, MetadataView)) + if (m_Sessions.RegisterSession(SessionId, std::move(AppName), std::move(Mode), JobId, MetadataView)) { return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", SessionId)); } @@ -265,4 +318,255 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req) } } +////////////////////////////////////////////////////////////////////////// +// +// Session log +// + +static void +WriteLogEntry(CbWriter& Writer, const SessionsService::LogEntry& Entry) +{ + Writer << "timestamp" << Entry.Timestamp; + if (!Entry.Level.empty()) + { + Writer << "level" << Entry.Level; + } + if (!Entry.Message.empty()) + { + Writer << "message" << Entry.Message; + } + if (Entry.Data.GetSize() > 0) + { + Writer.AddObject("data"sv, Entry.Data); + } +} + +void +HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req) +{ + HttpServerRequest& ServerRequest = Req.ServerRequest(); + + const Oid SessionId = Oid::TryFromHexString(Req.GetCapture(1)); + if (SessionId == Oid::Zero) + { + m_SessionsStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid session id '{}'", Req.GetCapture(1))); + } + + m_SessionsStats.RequestCount++; + + Ref<SessionsService::Session> Session = m_Sessions.GetSession(SessionId); + if (!Session) + { + return ServerRequest.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Session '{}' not found", SessionId)); + } + + if (ServerRequest.RequestVerb() == HttpVerb::kPost) + { + m_SessionsStats.SessionWriteCount++; + + if (ServerRequest.RequestContentType() == HttpContentType::kText) + { + // Raw text — split by newlines, one entry per line + IoBuffer Payload = ServerRequest.ReadPayload(); + std::string_view Text(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); + const DateTime Now = DateTime::Now(); + + size_t Pos = 0; + while (Pos < Text.size()) + { + size_t End = Text.find('\n', Pos); + if (End == std::string_view::npos) + { + End = Text.size(); + } + + std::string_view Line = Text.substr(Pos, End - Pos); + // Strip trailing \r + if (!Line.empty() && Line.back() == '\r') + { + Line.remove_suffix(1); + } + + if (!Line.empty()) + { + Session->AppendLog(SessionsService::LogEntry{ + .Timestamp = Now, + .Message = std::string(Line), + }); + } + + Pos = End + 1; + } + } + else + { + // Structured log (JSON or CbObject) + // Accepts a single record or an "entries" array of records + CbObject RequestObject = ServerRequest.ReadPayloadObject(); + const DateTime Now = DateTime::Now(); + + auto AppendFromObject = [&](CbObjectView Obj) { + std::string Level(Obj["level"sv].AsString()); + std::string Message(Obj["message"sv].AsString()); + CbObjectView DataView = Obj["data"sv].AsObjectView(); + + Session->AppendLog(SessionsService::LogEntry{ + .Timestamp = Now, + .Level = std::move(Level), + .Message = std::move(Message), + .Data = CbObject::Clone(DataView), + }); + }; + + CbFieldView EntriesField = RequestObject["entries"sv]; + if (EntriesField.IsArray()) + { + for (CbFieldView Entry : EntriesField) + { + AppendFromObject(Entry.AsObjectView()); + } + } + else + { + AppendFromObject(RequestObject); + } + } + + return ServerRequest.WriteResponse(HttpResponseCode::OK); + } + else + { + // GET - return log entries + m_SessionsStats.SessionReadCount++; + + HttpServerRequest::QueryParams Params = ServerRequest.GetQueryParams(); + + // cursor-based retrieval: client passes the cursor from the previous response + // and receives only entries appended since then. + std::string_view CursorStr = Params.GetValue("cursor"sv); + if (!CursorStr.empty()) + { + uint64_t AfterCursor = std::strtoull(std::string(CursorStr).c_str(), nullptr, 10); + + SessionsService::Session::CursorResult Result = Session->GetLogEntriesAfter(AfterCursor); + + CbObjectWriter Response; + Response << "cursor" << Result.Cursor; + Response << "count" << Result.Count; + Response.BeginArray("entries"); + for (const SessionsService::LogEntry& Entry : Result.Entries) + { + Response.BeginObject(); + WriteLogEntry(Response, Entry); + Response.EndObject(); + } + Response.EndArray(); + + return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); + } + + // Legacy offset/limit retrieval + uint32_t Limit = 0; + uint32_t Offset = 0; + + if (std::string_view LimitStr = Params.GetValue("limit"sv); !LimitStr.empty()) + { + Limit = uint32_t(std::strtoul(std::string(LimitStr).c_str(), nullptr, 10)); + } + if (std::string_view OffsetStr = Params.GetValue("offset"sv); !OffsetStr.empty()) + { + Offset = uint32_t(std::strtoul(std::string(OffsetStr).c_str(), nullptr, 10)); + } + + std::vector<SessionsService::LogEntry> Entries = Session->GetLogEntries(Limit, Offset); + + CbObjectWriter Response; + Response << "total" << Session->GetLogCount(); + Response.BeginArray("entries"); + for (const SessionsService::LogEntry& Entry : Entries) + { + Response.BeginObject(); + WriteLogEntry(Response, Entry); + Response.EndObject(); + } + Response.EndArray(); + + return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); + } +} + +////////////////////////////////////////////////////////////////////////// +// +// WebSocket push +// + +void +HttpSessionsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection) +{ + ZEN_INFO("Sessions WebSocket client connected"); + m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); +} + +void +HttpSessionsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/) +{ + // No client-to-server messages expected +} + +void +HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason) +{ + ZEN_INFO("Sessions WebSocket client disconnected (code {})", Code); + m_WsConnectionsLock.WithExclusiveLock([&] { + auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) { + return C.Get() == &Conn; + }); + m_WsConnections.erase(It, m_WsConnections.end()); + }); +} + +void +HttpSessionsService::BroadcastSessions() +{ + std::vector<Ref<WebSocketConnection>> Connections; + m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; }); + + if (Connections.empty()) + { + return; + } + + ExtendableStringBuilder<4096> JsonBuilder; + BuildSessionListResponse().ToJson(JsonBuilder); + std::string_view Json = JsonBuilder.ToView(); + + for (const Ref<WebSocketConnection>& Conn : Connections) + { + if (Conn->IsOpen()) + { + Conn->SendText(Json); + } + } +} + +void +HttpSessionsService::EnqueuePushTimer() +{ + m_PushTimer.expires_after(std::chrono::seconds(2)); + m_PushTimer.async_wait([this](const asio::error_code& Ec) { + if (Ec) + { + return; + } + + BroadcastSessions(); + EnqueuePushTimer(); + }); +} + } // namespace zen |