aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/sessions/httpsessions.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-23 14:19:57 +0100
committerGitHub Enterprise <[email protected]>2026-03-23 14:19:57 +0100
commit2a445406e09328cb4cf320300f2678997d6775b7 (patch)
treea92f02d94c92144cb6ae32160397298533e4c822 /src/zenserver/sessions/httpsessions.cpp
parentadd hub instance crash recovery (#885) (diff)
downloadzen-2a445406e09328cb4cf320300f2678997d6775b7.tar.xz
zen-2a445406e09328cb4cf320300f2678997d6775b7.zip
Dashboard refresh (logs, storage, network, object store, docs) (#835)
## Summary This PR adds a session management service, several new dashboard pages, and a number of infrastructure improvements. ### Sessions Service - `SessionsServiceClient` in `zenutil` announces sessions to a remote zenserver with a 15s heartbeat (POST/PUT/DELETE lifecycle) - Storage server registers itself with its own local sessions service on startup - Session mode attribute coupled to server mode (Compute, Proxy, Hub, etc.) - Ended sessions tracked with `ended_at` timestamp; status filtering (Active/Ended/All) - `--sessions-url` config option for remote session announcement - In-process log sink (`InProcSessionLogSink`) forwards server log output to the server's own session, visible in the dashboard ### Session Log Viewer - POST/GET endpoints for session logs (`/sessions/{id}/log`) supporting raw text and structured JSON/CbObject with batch `entries` array - In-memory log storage per session (capped at 10k entries) with cursor-based pagination for efficient incremental fetching - Log panel in the sessions dashboard with incremental DOM updates, auto-scroll (Follow toggle), newest-first toggle, text filter, and log-level coloring - Auto-selects the server's own session on page load ### TCP Log Streaming - `LogStreamListener` and `TcpLogStreamSink` for log delivery over TCP - Sequence numbers on each message with drop detection and synthetic "dropped" notice on gaps - Gathered buffer writes to reduce syscall overhead when flushing batches - Tests covering basic delivery, multi-line splitting, drop detection, and sequencing ### New Dashboard Pages - **Sessions**: master-detail layout with selectable rows, metadata panel, live WebSocket updates, paging, abbreviated date formatting, and "this" pill for the local session - **Object Store**: summary stats tiles and bucket table with click-to-expand inline object listing (`GET /obj/`) - **Storage**: per-volume disk usage breakdown (`GET /admin/storage`), Garbage Collection status section (next-run countdown, last-run stats), and GC History table with paginated rows and expandable detail panels - **Network**: overview tiles, per-service request table, proxy connections, and live WebSocket updates; distinct client IPs and session counts via HyperLogLog ### Documentation Page - In-dashboard Docs page with sidebar navigation, markdown rendering (via `marked`), Mermaid diagram support (theme-aware), collapsible sections, text filtering with highlighting, and cross-document linking - New user-facing docs: `overview.md` (with architecture and per-mode diagrams), `sessions.md`, `cache.md`, `projects.md`; updated `compute.md` - Dev docs moved to `docs/dev/` ### Infrastructure & Bug Fixes - **Deflate compression** for the embedded frontend zip (~3.4MB → ~950KB); zlib inflate support added to `ZipFs` with cached decompressed buffers - **Local IP addresses**: `GetLocalIpAddresses()` (Windows via `GetAdaptersAddresses`, Linux/Mac via `getifaddrs`); surfaced in `/status/status`, `/health/info`, and the dashboard banner - **Dashboard nav**: unified into `zen-nav` web component with `MutationObserver` for dynamically added links, CSS `::part()` to merge banner/nav border radii, and prefix-based active link detection - Stats broadcast refactored from manual JSON string concatenation to `CbObjectWriter`; `CbObject`-to-JS conversion improved for `TimeSpan`, `DateTime`, and large integers - Stats WebSocket boilerplate consolidated into `ZenPage.connect_stats_ws()`
Diffstat (limited to 'src/zenserver/sessions/httpsessions.cpp')
-rw-r--r--src/zenserver/sessions/httpsessions.cpp360
1 files changed, 332 insertions, 28 deletions
diff --git a/src/zenserver/sessions/httpsessions.cpp b/src/zenserver/sessions/httpsessions.cpp
index 6cf12bea4..429ba98cf 100644
--- a/src/zenserver/sessions/httpsessions.cpp
+++ b/src/zenserver/sessions/httpsessions.cpp
@@ -3,7 +3,6 @@
#include "httpsessions.h"
#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/trace.h>
@@ -12,17 +11,22 @@
namespace zen {
using namespace std::literals;
-HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService, HttpStatsService& StatsService, SessionsService& Sessions)
+HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService,
+ HttpStatsService& StatsService,
+ SessionsService& Sessions,
+ asio::io_context& IoContext)
: m_Log(logging::Get("sessions"))
, m_StatusService(StatusService)
, m_StatsService(StatsService)
, m_Sessions(Sessions)
+, m_PushTimer(IoContext)
{
Initialize();
}
HttpSessionsService::~HttpSessionsService()
{
+ m_PushTimer.cancel();
m_StatsService.UnregisterHandler("sessions", *this);
m_StatusService.UnregisterHandler("sessions", *this);
}
@@ -107,12 +111,24 @@ HttpSessionsService::Initialize()
HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kPut | HttpVerb::kDelete);
m_Router.RegisterRoute(
+ "{session_id}/log",
+ [this](HttpRouterRequest& Req) { SessionLogRequest(Req); },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
"",
[this](HttpRouterRequest& Req) { ListSessionsRequest(Req); },
HttpVerb::kGet);
+ m_Router.RegisterRoute(
+ "ws",
+ [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK); },
+ HttpVerb::kGet);
+
m_StatsService.RegisterHandler("sessions", *this);
m_StatusService.RegisterHandler("sessions", *this);
+
+ EnqueuePushTimer();
}
static void
@@ -123,24 +139,55 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info)
{
Writer << "appname" << Info.AppName;
}
+ if (!Info.Mode.empty())
+ {
+ Writer << "mode" << Info.Mode;
+ }
if (Info.JobId != Oid::Zero)
{
Writer << "jobid" << Info.JobId;
}
Writer << "created_at" << Info.CreatedAt;
Writer << "updated_at" << Info.UpdatedAt;
+ if (Info.EndedAt.GetTicks() != 0)
+ {
+ Writer << "ended_at" << Info.EndedAt;
+ }
if (Info.Metadata.GetSize() > 0)
{
- Writer.BeginObject("metadata");
- for (const CbField& Field : Info.Metadata)
- {
- Writer.AddField(Field);
- }
- Writer.EndObject();
+ Writer.AddObject("metadata"sv, Info.Metadata);
}
}
+CbObject
+HttpSessionsService::BuildSessionListResponse()
+{
+ std::vector<Ref<SessionsService::Session>> Active = m_Sessions.GetSessions();
+ std::vector<Ref<SessionsService::Session>> Ended = m_Sessions.GetEndedSessions();
+
+ CbObjectWriter Response;
+ if (m_SelfSessionId != Oid::Zero)
+ {
+ Response << "self_id" << m_SelfSessionId;
+ }
+ Response.BeginArray("sessions");
+ for (const Ref<SessionsService::Session>& Session : Active)
+ {
+ Response.BeginObject();
+ WriteSessionInfo(Response, Session->Info());
+ Response.EndObject();
+ }
+ for (const Ref<SessionsService::Session>& Session : Ended)
+ {
+ Response.BeginObject();
+ WriteSessionInfo(Response, Session->Info());
+ Response.EndObject();
+ }
+ Response.EndArray();
+ return Response.Save();
+}
+
void
HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req)
{
@@ -149,16 +196,35 @@ HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req)
m_SessionsStats.SessionListCount++;
m_SessionsStats.RequestCount++;
- std::vector<Ref<SessionsService::Session>> Sessions = m_Sessions.GetSessions();
+ HttpServerRequest::QueryParams Params = ServerRequest.GetQueryParams();
+ std::string_view Status = Params.GetValue("status"sv);
+
+ std::vector<Ref<SessionsService::Session>> Sessions;
+ if (Status == "ended"sv)
+ {
+ Sessions = m_Sessions.GetEndedSessions();
+ }
+ else if (Status == "all"sv)
+ {
+ Sessions = m_Sessions.GetSessions();
+ std::vector<Ref<SessionsService::Session>> Ended = m_Sessions.GetEndedSessions();
+ Sessions.insert(Sessions.end(), Ended.begin(), Ended.end());
+ }
+ else
+ {
+ Sessions = m_Sessions.GetSessions();
+ }
CbObjectWriter Response;
+ if (m_SelfSessionId != Oid::Zero)
+ {
+ Response << "self_id" << m_SelfSessionId;
+ }
Response.BeginArray("sessions");
for (const Ref<SessionsService::Session>& Session : Sessions)
{
Response.BeginObject();
- {
- WriteSessionInfo(Response, Session->Info());
- }
+ WriteSessionInfo(Response, Session->Info());
Response.EndObject();
}
Response.EndArray();
@@ -187,30 +253,17 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
case HttpVerb::kPost:
case HttpVerb::kPut:
{
- IoBuffer Payload = ServerRequest.ReadPayload();
- CbObject RequestObject;
-
- if (Payload.GetSize() > 0)
- {
- if (CbValidateError ValidationResult = ValidateCompactBinary(Payload.GetView(), CbValidateMode::All);
- ValidationResult != CbValidateError::None)
- {
- m_SessionsStats.BadRequestCount++;
- return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- fmt::format("Invalid payload: {}", zen::ToString(ValidationResult)));
- }
- RequestObject = LoadCompactBinaryObject(Payload);
- }
+ CbObject RequestObject = ServerRequest.ReadPayloadObject();
if (ServerRequest.RequestVerb() == HttpVerb::kPost)
{
std::string AppName(RequestObject["appname"sv].AsString());
+ std::string Mode(RequestObject["mode"sv].AsString());
Oid JobId = RequestObject["jobid"sv].AsObjectId();
CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView();
m_SessionsStats.SessionWriteCount++;
- if (m_Sessions.RegisterSession(SessionId, std::move(AppName), JobId, MetadataView))
+ if (m_Sessions.RegisterSession(SessionId, std::move(AppName), std::move(Mode), JobId, MetadataView))
{
return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", SessionId));
}
@@ -265,4 +318,255 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
}
}
+//////////////////////////////////////////////////////////////////////////
+//
+// Session log
+//
+
+static void
+WriteLogEntry(CbWriter& Writer, const SessionsService::LogEntry& Entry)
+{
+ Writer << "timestamp" << Entry.Timestamp;
+ if (!Entry.Level.empty())
+ {
+ Writer << "level" << Entry.Level;
+ }
+ if (!Entry.Message.empty())
+ {
+ Writer << "message" << Entry.Message;
+ }
+ if (Entry.Data.GetSize() > 0)
+ {
+ Writer.AddObject("data"sv, Entry.Data);
+ }
+}
+
+void
+HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req)
+{
+ HttpServerRequest& ServerRequest = Req.ServerRequest();
+
+ const Oid SessionId = Oid::TryFromHexString(Req.GetCapture(1));
+ if (SessionId == Oid::Zero)
+ {
+ m_SessionsStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Invalid session id '{}'", Req.GetCapture(1)));
+ }
+
+ m_SessionsStats.RequestCount++;
+
+ Ref<SessionsService::Session> Session = m_Sessions.GetSession(SessionId);
+ if (!Session)
+ {
+ return ServerRequest.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Session '{}' not found", SessionId));
+ }
+
+ if (ServerRequest.RequestVerb() == HttpVerb::kPost)
+ {
+ m_SessionsStats.SessionWriteCount++;
+
+ if (ServerRequest.RequestContentType() == HttpContentType::kText)
+ {
+ // Raw text — split by newlines, one entry per line
+ IoBuffer Payload = ServerRequest.ReadPayload();
+ std::string_view Text(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
+ const DateTime Now = DateTime::Now();
+
+ size_t Pos = 0;
+ while (Pos < Text.size())
+ {
+ size_t End = Text.find('\n', Pos);
+ if (End == std::string_view::npos)
+ {
+ End = Text.size();
+ }
+
+ std::string_view Line = Text.substr(Pos, End - Pos);
+ // Strip trailing \r
+ if (!Line.empty() && Line.back() == '\r')
+ {
+ Line.remove_suffix(1);
+ }
+
+ if (!Line.empty())
+ {
+ Session->AppendLog(SessionsService::LogEntry{
+ .Timestamp = Now,
+ .Message = std::string(Line),
+ });
+ }
+
+ Pos = End + 1;
+ }
+ }
+ else
+ {
+ // Structured log (JSON or CbObject)
+ // Accepts a single record or an "entries" array of records
+ CbObject RequestObject = ServerRequest.ReadPayloadObject();
+ const DateTime Now = DateTime::Now();
+
+ auto AppendFromObject = [&](CbObjectView Obj) {
+ std::string Level(Obj["level"sv].AsString());
+ std::string Message(Obj["message"sv].AsString());
+ CbObjectView DataView = Obj["data"sv].AsObjectView();
+
+ Session->AppendLog(SessionsService::LogEntry{
+ .Timestamp = Now,
+ .Level = std::move(Level),
+ .Message = std::move(Message),
+ .Data = CbObject::Clone(DataView),
+ });
+ };
+
+ CbFieldView EntriesField = RequestObject["entries"sv];
+ if (EntriesField.IsArray())
+ {
+ for (CbFieldView Entry : EntriesField)
+ {
+ AppendFromObject(Entry.AsObjectView());
+ }
+ }
+ else
+ {
+ AppendFromObject(RequestObject);
+ }
+ }
+
+ return ServerRequest.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ // GET - return log entries
+ m_SessionsStats.SessionReadCount++;
+
+ HttpServerRequest::QueryParams Params = ServerRequest.GetQueryParams();
+
+ // cursor-based retrieval: client passes the cursor from the previous response
+ // and receives only entries appended since then.
+ std::string_view CursorStr = Params.GetValue("cursor"sv);
+ if (!CursorStr.empty())
+ {
+ uint64_t AfterCursor = std::strtoull(std::string(CursorStr).c_str(), nullptr, 10);
+
+ SessionsService::Session::CursorResult Result = Session->GetLogEntriesAfter(AfterCursor);
+
+ CbObjectWriter Response;
+ Response << "cursor" << Result.Cursor;
+ Response << "count" << Result.Count;
+ Response.BeginArray("entries");
+ for (const SessionsService::LogEntry& Entry : Result.Entries)
+ {
+ Response.BeginObject();
+ WriteLogEntry(Response, Entry);
+ Response.EndObject();
+ }
+ Response.EndArray();
+
+ return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save());
+ }
+
+ // Legacy offset/limit retrieval
+ uint32_t Limit = 0;
+ uint32_t Offset = 0;
+
+ if (std::string_view LimitStr = Params.GetValue("limit"sv); !LimitStr.empty())
+ {
+ Limit = uint32_t(std::strtoul(std::string(LimitStr).c_str(), nullptr, 10));
+ }
+ if (std::string_view OffsetStr = Params.GetValue("offset"sv); !OffsetStr.empty())
+ {
+ Offset = uint32_t(std::strtoul(std::string(OffsetStr).c_str(), nullptr, 10));
+ }
+
+ std::vector<SessionsService::LogEntry> Entries = Session->GetLogEntries(Limit, Offset);
+
+ CbObjectWriter Response;
+ Response << "total" << Session->GetLogCount();
+ Response.BeginArray("entries");
+ for (const SessionsService::LogEntry& Entry : Entries)
+ {
+ Response.BeginObject();
+ WriteLogEntry(Response, Entry);
+ Response.EndObject();
+ }
+ Response.EndArray();
+
+ return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save());
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// WebSocket push
+//
+
+void
+HttpSessionsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection)
+{
+ ZEN_INFO("Sessions WebSocket client connected");
+ m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); });
+}
+
+void
+HttpSessionsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/)
+{
+ // No client-to-server messages expected
+}
+
+void
+HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason)
+{
+ ZEN_INFO("Sessions WebSocket client disconnected (code {})", Code);
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) {
+ return C.Get() == &Conn;
+ });
+ m_WsConnections.erase(It, m_WsConnections.end());
+ });
+}
+
+void
+HttpSessionsService::BroadcastSessions()
+{
+ std::vector<Ref<WebSocketConnection>> Connections;
+ m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; });
+
+ if (Connections.empty())
+ {
+ return;
+ }
+
+ ExtendableStringBuilder<4096> JsonBuilder;
+ BuildSessionListResponse().ToJson(JsonBuilder);
+ std::string_view Json = JsonBuilder.ToView();
+
+ for (const Ref<WebSocketConnection>& Conn : Connections)
+ {
+ if (Conn->IsOpen())
+ {
+ Conn->SendText(Json);
+ }
+ }
+}
+
+void
+HttpSessionsService::EnqueuePushTimer()
+{
+ m_PushTimer.expires_after(std::chrono::seconds(2));
+ m_PushTimer.async_wait([this](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+
+ BroadcastSessions();
+ EnqueuePushTimer();
+ });
+}
+
} // namespace zen