diff options
| author | Stefan Boberg <[email protected]> | 2026-03-13 12:09:03 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2026-03-13 12:09:03 +0100 |
| commit | 34ea559aa1e2606a2af0377a661d71f54d198cc5 (patch) | |
| tree | 99c390daefd122598766632049d27b97e6622f54 /src/zenserver/sessions/httpsessions.cpp | |
| parent | Add --no-network option (#831) (diff) | |
| download | zen-34ea559aa1e2606a2af0377a661d71f54d198cc5.tar.xz zen-34ea559aa1e2606a2af0377a661d71f54d198cc5.zip | |
Add SessionsServiceClient and enhanced sessions dashboard
- Add SessionsServiceClient in zenutil for announcing sessions to a
remote zenserver (POST/PUT/DELETE lifecycle with 15s heartbeat timer)
- Storage server registers itself into its own local sessions service
- Add session mode attribute coupled to server mode (Compute, Proxy, etc)
- Track ended sessions with ended_at timestamp and status filtering
- Sessions dashboard: master-detail layout with selectable rows,
Active/Ended/All tabs, metadata panel, live WebSocket updates,
paging, abbreviated date formatting, and "this" pill for self session
- Accept JSON payloads on sessions POST endpoint via ReadPayloadObject
- Add --sessions-url config option for remote session announcement
Diffstat (limited to 'src/zenserver/sessions/httpsessions.cpp')
| -rw-r--r-- | src/zenserver/sessions/httpsessions.cpp | 173 |
1 files changed, 145 insertions, 28 deletions
diff --git a/src/zenserver/sessions/httpsessions.cpp b/src/zenserver/sessions/httpsessions.cpp index 6cf12bea4..6996ce5b5 100644 --- a/src/zenserver/sessions/httpsessions.cpp +++ b/src/zenserver/sessions/httpsessions.cpp @@ -3,7 +3,6 @@ #include "httpsessions.h" #include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/trace.h> @@ -12,17 +11,22 @@ namespace zen { using namespace std::literals; -HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService, HttpStatsService& StatsService, SessionsService& Sessions) +HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService, + HttpStatsService& StatsService, + SessionsService& Sessions, + asio::io_context& IoContext) : m_Log(logging::Get("sessions")) , m_StatusService(StatusService) , m_StatsService(StatsService) , m_Sessions(Sessions) +, m_PushTimer(IoContext) { Initialize(); } HttpSessionsService::~HttpSessionsService() { + m_PushTimer.cancel(); m_StatsService.UnregisterHandler("sessions", *this); m_StatusService.UnregisterHandler("sessions", *this); } @@ -111,8 +115,15 @@ HttpSessionsService::Initialize() [this](HttpRouterRequest& Req) { ListSessionsRequest(Req); }, HttpVerb::kGet); + m_Router.RegisterRoute( + "ws", + [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK); }, + HttpVerb::kGet); + m_StatsService.RegisterHandler("sessions", *this); m_StatusService.RegisterHandler("sessions", *this); + + EnqueuePushTimer(); } static void @@ -123,22 +134,53 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info) { Writer << "appname" << Info.AppName; } + if (!Info.Mode.empty()) + { + Writer << "mode" << Info.Mode; + } if (Info.JobId != Oid::Zero) { Writer << "jobid" << Info.JobId; } Writer << "created_at" << Info.CreatedAt; Writer << "updated_at" << Info.UpdatedAt; + if (Info.EndedAt.GetTicks() != 0) + { + Writer << "ended_at" << Info.EndedAt; + } if (Info.Metadata.GetSize() > 0) { - Writer.BeginObject("metadata"); - for (const CbField& Field : Info.Metadata) - { - Writer.AddField(Field); - } - Writer.EndObject(); + Writer.AddObject("metadata"sv, Info.Metadata); + } +} + +CbObject +HttpSessionsService::BuildSessionListResponse() +{ + std::vector<Ref<SessionsService::Session>> Active = m_Sessions.GetSessions(); + std::vector<Ref<SessionsService::Session>> Ended = m_Sessions.GetEndedSessions(); + + CbObjectWriter Response; + if (m_SelfSessionId != Oid::Zero) + { + Response << "self_id" << m_SelfSessionId; + } + Response.BeginArray("sessions"); + for (const Ref<SessionsService::Session>& Session : Active) + { + Response.BeginObject(); + WriteSessionInfo(Response, Session->Info()); + Response.EndObject(); + } + for (const Ref<SessionsService::Session>& Session : Ended) + { + Response.BeginObject(); + WriteSessionInfo(Response, Session->Info()); + Response.EndObject(); } + Response.EndArray(); + return Response.Save(); } void @@ -149,16 +191,35 @@ HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req) m_SessionsStats.SessionListCount++; m_SessionsStats.RequestCount++; - std::vector<Ref<SessionsService::Session>> Sessions = m_Sessions.GetSessions(); + HttpServerRequest::QueryParams Params = ServerRequest.GetQueryParams(); + std::string_view Status = Params.GetValue("status"sv); + + std::vector<Ref<SessionsService::Session>> Sessions; + if (Status == "ended"sv) + { + Sessions = m_Sessions.GetEndedSessions(); + } + else if (Status == "all"sv) + { + Sessions = m_Sessions.GetSessions(); + std::vector<Ref<SessionsService::Session>> Ended = m_Sessions.GetEndedSessions(); + Sessions.insert(Sessions.end(), Ended.begin(), Ended.end()); + } + else + { + Sessions = m_Sessions.GetSessions(); + } CbObjectWriter Response; + if (m_SelfSessionId != Oid::Zero) + { + Response << "self_id" << m_SelfSessionId; + } Response.BeginArray("sessions"); for (const Ref<SessionsService::Session>& Session : Sessions) { Response.BeginObject(); - { - WriteSessionInfo(Response, Session->Info()); - } + WriteSessionInfo(Response, Session->Info()); Response.EndObject(); } Response.EndArray(); @@ -187,30 +248,17 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req) case HttpVerb::kPost: case HttpVerb::kPut: { - IoBuffer Payload = ServerRequest.ReadPayload(); - CbObject RequestObject; - - if (Payload.GetSize() > 0) - { - if (CbValidateError ValidationResult = ValidateCompactBinary(Payload.GetView(), CbValidateMode::All); - ValidationResult != CbValidateError::None) - { - m_SessionsStats.BadRequestCount++; - return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Invalid payload: {}", zen::ToString(ValidationResult))); - } - RequestObject = LoadCompactBinaryObject(Payload); - } + CbObject RequestObject = ServerRequest.ReadPayloadObject(); if (ServerRequest.RequestVerb() == HttpVerb::kPost) { std::string AppName(RequestObject["appname"sv].AsString()); + std::string Mode(RequestObject["mode"sv].AsString()); Oid JobId = RequestObject["jobid"sv].AsObjectId(); CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView(); m_SessionsStats.SessionWriteCount++; - if (m_Sessions.RegisterSession(SessionId, std::move(AppName), JobId, MetadataView)) + if (m_Sessions.RegisterSession(SessionId, std::move(AppName), std::move(Mode), JobId, MetadataView)) { return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", SessionId)); } @@ -265,4 +313,73 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req) } } +////////////////////////////////////////////////////////////////////////// +// +// WebSocket push +// + +void +HttpSessionsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection) +{ + ZEN_INFO("Sessions WebSocket client connected"); + m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); +} + +void +HttpSessionsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/) +{ + // No client-to-server messages expected +} + +void +HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason) +{ + ZEN_INFO("Sessions WebSocket client disconnected (code {})", Code); + m_WsConnectionsLock.WithExclusiveLock([&] { + auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) { + return C.Get() == &Conn; + }); + m_WsConnections.erase(It, m_WsConnections.end()); + }); +} + +void +HttpSessionsService::BroadcastSessions() +{ + std::vector<Ref<WebSocketConnection>> Connections; + m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; }); + + if (Connections.empty()) + { + return; + } + + ExtendableStringBuilder<4096> JsonBuilder; + BuildSessionListResponse().ToJson(JsonBuilder); + std::string_view Json = JsonBuilder.ToView(); + + for (const Ref<WebSocketConnection>& Conn : Connections) + { + if (Conn->IsOpen()) + { + Conn->SendText(Json); + } + } +} + +void +HttpSessionsService::EnqueuePushTimer() +{ + m_PushTimer.expires_after(std::chrono::seconds(2)); + m_PushTimer.async_wait([this](const asio::error_code& Ec) { + if (Ec) + { + return; + } + + BroadcastSessions(); + EnqueuePushTimer(); + }); +} + } // namespace zen |