// Copyright Epic Games, Inc. All Rights Reserved. #include "httpsessions.h" #include #include #include #include #include "sessions.h" namespace zen { using namespace std::literals; HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService, HttpStatsService& StatsService, SessionsService& Sessions, asio::io_context& IoContext) : m_Log(logging::Get("sessions")) , m_StatusService(StatusService) , m_StatsService(StatsService) , m_Sessions(Sessions) , m_PushTimer(IoContext) { Initialize(); } HttpSessionsService::~HttpSessionsService() { m_PushTimer.cancel(); m_StatsService.UnregisterHandler("sessions", *this); m_StatusService.UnregisterHandler("sessions", *this); } const char* HttpSessionsService::BaseUri() const { return "/sessions/"; } void HttpSessionsService::HandleRequest(HttpServerRequest& Request) { metrics::OperationTiming::Scope $(m_HttpRequests); if (m_Router.HandleRequest(Request) == false) { ZEN_WARN("No route found for {0}", Request.RelativeUri()); return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Not found"sv); } } CbObject HttpSessionsService::CollectStats() { ZEN_TRACE_CPU("SessionsService::Stats"); CbObjectWriter Cbo; EmitSnapshot("requests", m_HttpRequests, Cbo); Cbo.BeginObject("sessions"); { Cbo << "readcount" << m_SessionsStats.SessionReadCount; Cbo << "writecount" << m_SessionsStats.SessionWriteCount; Cbo << "deletecount" << m_SessionsStats.SessionDeleteCount; Cbo << "listcount" << m_SessionsStats.SessionListCount; Cbo << "requestcount" << m_SessionsStats.RequestCount; Cbo << "badrequestcount" << m_SessionsStats.BadRequestCount; Cbo << "count" << m_Sessions.GetSessionCount(); } Cbo.EndObject(); return Cbo.Save(); } void HttpSessionsService::HandleStatsRequest(HttpServerRequest& HttpReq) { HttpReq.WriteResponse(HttpResponseCode::OK, CollectStats()); } void HttpSessionsService::HandleStatusRequest(HttpServerRequest& Request) { ZEN_TRACE_CPU("HttpSessionsService::Status"); CbObjectWriter Cbo; Cbo << "ok" << true; Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } void HttpSessionsService::Initialize() { using namespace std::literals; ZEN_INFO("Initializing Sessions Service"); static constexpr AsciiSet ValidHexCharactersSet{"0123456789abcdefABCDEF"}; m_Router.AddMatcher("session_id", [](std::string_view Str) -> bool { return Str.length() == Oid::StringLength && AsciiSet::HasOnly(Str, ValidHexCharactersSet); }); m_Router.RegisterRoute( "list", [this](HttpRouterRequest& Req) { ListSessionsRequest(Req); }, HttpVerb::kGet); m_Router.RegisterRoute( "{session_id}", [this](HttpRouterRequest& Req) { SessionRequest(Req); }, HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kPut | HttpVerb::kDelete); m_Router.RegisterRoute( "{session_id}/log", [this](HttpRouterRequest& Req) { SessionLogRequest(Req); }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "", [this](HttpRouterRequest& Req) { ListSessionsRequest(Req); }, HttpVerb::kGet); m_Router.RegisterRoute( "ws", [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK); }, HttpVerb::kGet); m_StatsService.RegisterHandler("sessions", *this); m_StatusService.RegisterHandler("sessions", *this); EnqueuePushTimer(); } static void WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info) { Writer << "id" << Info.Id; if (!Info.AppName.empty()) { Writer << "appname" << Info.AppName; } if (!Info.Mode.empty()) { Writer << "mode" << Info.Mode; } if (Info.JobId != Oid::Zero) { Writer << "jobid" << Info.JobId; } Writer << "created_at" << Info.CreatedAt; Writer << "updated_at" << Info.UpdatedAt; if (Info.EndedAt.GetTicks() != 0) { Writer << "ended_at" << Info.EndedAt; } if (Info.Metadata.GetSize() > 0) { Writer.AddObject("metadata"sv, Info.Metadata); } } CbObject HttpSessionsService::BuildSessionListResponse() { std::vector> Active = m_Sessions.GetSessions(); std::vector> Ended = m_Sessions.GetEndedSessions(); CbObjectWriter Response; if (m_SelfSessionId != Oid::Zero) { Response << "self_id" << m_SelfSessionId; } Response.BeginArray("sessions"); for (const Ref& Session : Active) { Response.BeginObject(); WriteSessionInfo(Response, Session->Info()); Response.EndObject(); } for (const Ref& Session : Ended) { Response.BeginObject(); WriteSessionInfo(Response, Session->Info()); Response.EndObject(); } Response.EndArray(); return Response.Save(); } void HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); m_SessionsStats.SessionListCount++; m_SessionsStats.RequestCount++; HttpServerRequest::QueryParams Params = ServerRequest.GetQueryParams(); std::string_view Status = Params.GetValue("status"sv); std::vector> Sessions; if (Status == "ended"sv) { Sessions = m_Sessions.GetEndedSessions(); } else if (Status == "all"sv) { Sessions = m_Sessions.GetSessions(); std::vector> Ended = m_Sessions.GetEndedSessions(); Sessions.insert(Sessions.end(), Ended.begin(), Ended.end()); } else { Sessions = m_Sessions.GetSessions(); } CbObjectWriter Response; if (m_SelfSessionId != Oid::Zero) { Response << "self_id" << m_SelfSessionId; } Response.BeginArray("sessions"); for (const Ref& Session : Sessions) { Response.BeginObject(); WriteSessionInfo(Response, Session->Info()); Response.EndObject(); } Response.EndArray(); return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); } void HttpSessionsService::SessionRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); const Oid SessionId = Oid::TryFromHexString(Req.GetCapture(1)); if (SessionId == Oid::Zero) { m_SessionsStats.BadRequestCount++; return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Invalid session id '{}'", Req.GetCapture(1))); } m_SessionsStats.RequestCount++; switch (ServerRequest.RequestVerb()) { case HttpVerb::kPost: case HttpVerb::kPut: { CbObject RequestObject = ServerRequest.ReadPayloadObject(); if (ServerRequest.RequestVerb() == HttpVerb::kPost) { std::string AppName(RequestObject["appname"sv].AsString()); std::string Mode(RequestObject["mode"sv].AsString()); Oid JobId = RequestObject["jobid"sv].AsObjectId(); CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView(); m_SessionsStats.SessionWriteCount++; if (m_Sessions.RegisterSession(SessionId, std::move(AppName), std::move(Mode), JobId, MetadataView)) { return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", SessionId)); } else { // Already exists - try update instead if (m_Sessions.UpdateSession(SessionId, MetadataView)) { return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", SessionId)); } return ServerRequest.WriteResponse(HttpResponseCode::InternalServerError); } } else { // PUT - update only m_SessionsStats.SessionWriteCount++; if (m_Sessions.UpdateSession(SessionId, RequestObject["metadata"sv].AsObjectView())) { return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", SessionId)); } return ServerRequest.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Session '{}' not found", SessionId)); } } case HttpVerb::kGet: { m_SessionsStats.SessionReadCount++; Ref Session = m_Sessions.GetSession(SessionId); if (Session) { CbObjectWriter Response; WriteSessionInfo(Response, Session->Info()); return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); } return ServerRequest.WriteResponse(HttpResponseCode::NotFound); } case HttpVerb::kDelete: { m_SessionsStats.SessionDeleteCount++; if (m_Sessions.RemoveSession(SessionId)) { return ServerRequest.WriteResponse(HttpResponseCode::OK); } return ServerRequest.WriteResponse(HttpResponseCode::NotFound); } default: { return ServerRequest.WriteResponse(HttpResponseCode::MethodNotAllowed); } } } ////////////////////////////////////////////////////////////////////////// // // Session log // static void WriteLogEntry(CbWriter& Writer, const SessionsService::LogEntry& Entry) { Writer << "timestamp" << Entry.Timestamp; if (!Entry.Level.empty()) { Writer << "level" << Entry.Level; } if (!Entry.Message.empty()) { Writer << "message" << Entry.Message; } if (Entry.Data.GetSize() > 0) { Writer.AddObject("data"sv, Entry.Data); } } void HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); const Oid SessionId = Oid::TryFromHexString(Req.GetCapture(1)); if (SessionId == Oid::Zero) { m_SessionsStats.BadRequestCount++; return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Invalid session id '{}'", Req.GetCapture(1))); } m_SessionsStats.RequestCount++; Ref Session = m_Sessions.GetSession(SessionId); if (!Session) { return ServerRequest.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Session '{}' not found", SessionId)); } if (ServerRequest.RequestVerb() == HttpVerb::kPost) { m_SessionsStats.SessionWriteCount++; if (ServerRequest.RequestContentType() == HttpContentType::kText) { // Raw text — split by newlines, one entry per line IoBuffer Payload = ServerRequest.ReadPayload(); std::string_view Text(reinterpret_cast(Payload.GetData()), Payload.GetSize()); const DateTime Now = DateTime::Now(); size_t Pos = 0; while (Pos < Text.size()) { size_t End = Text.find('\n', Pos); if (End == std::string_view::npos) { End = Text.size(); } std::string_view Line = Text.substr(Pos, End - Pos); // Strip trailing \r if (!Line.empty() && Line.back() == '\r') { Line.remove_suffix(1); } if (!Line.empty()) { Session->AppendLog(SessionsService::LogEntry{ .Timestamp = Now, .Message = std::string(Line), }); } Pos = End + 1; } } else { // Structured log (JSON or CbObject) // Accepts a single record or an "entries" array of records CbObject RequestObject = ServerRequest.ReadPayloadObject(); const DateTime Now = DateTime::Now(); auto AppendFromObject = [&](CbObjectView Obj) { std::string Level(Obj["level"sv].AsString()); std::string Message(Obj["message"sv].AsString()); CbObjectView DataView = Obj["data"sv].AsObjectView(); Session->AppendLog(SessionsService::LogEntry{ .Timestamp = Now, .Level = std::move(Level), .Message = std::move(Message), .Data = CbObject::Clone(DataView), }); }; CbFieldView EntriesField = RequestObject["entries"sv]; if (EntriesField.IsArray()) { for (CbFieldView Entry : EntriesField) { AppendFromObject(Entry.AsObjectView()); } } else { AppendFromObject(RequestObject); } } return ServerRequest.WriteResponse(HttpResponseCode::OK); } else { // GET - return log entries m_SessionsStats.SessionReadCount++; HttpServerRequest::QueryParams Params = ServerRequest.GetQueryParams(); uint32_t Limit = 0; uint32_t Offset = 0; if (std::string_view LimitStr = Params.GetValue("limit"sv); !LimitStr.empty()) { Limit = uint32_t(std::strtoul(std::string(LimitStr).c_str(), nullptr, 10)); } if (std::string_view OffsetStr = Params.GetValue("offset"sv); !OffsetStr.empty()) { Offset = uint32_t(std::strtoul(std::string(OffsetStr).c_str(), nullptr, 10)); } std::vector Entries = Session->GetLogEntries(Limit, Offset); CbObjectWriter Response; Response << "total" << Session->GetLogCount(); Response.BeginArray("entries"); for (const SessionsService::LogEntry& Entry : Entries) { Response.BeginObject(); WriteLogEntry(Response, Entry); Response.EndObject(); } Response.EndArray(); return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); } } ////////////////////////////////////////////////////////////////////////// // // WebSocket push // void HttpSessionsService::OnWebSocketOpen(Ref Connection) { ZEN_INFO("Sessions WebSocket client connected"); m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); } void HttpSessionsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/) { // No client-to-server messages expected } void HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason) { ZEN_INFO("Sessions WebSocket client disconnected (code {})", Code); m_WsConnectionsLock.WithExclusiveLock([&] { auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref& C) { return C.Get() == &Conn; }); m_WsConnections.erase(It, m_WsConnections.end()); }); } void HttpSessionsService::BroadcastSessions() { std::vector> Connections; m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; }); if (Connections.empty()) { return; } ExtendableStringBuilder<4096> JsonBuilder; BuildSessionListResponse().ToJson(JsonBuilder); std::string_view Json = JsonBuilder.ToView(); for (const Ref& Conn : Connections) { if (Conn->IsOpen()) { Conn->SendText(Json); } } } void HttpSessionsService::EnqueuePushTimer() { m_PushTimer.expires_after(std::chrono::seconds(2)); m_PushTimer.async_wait([this](const asio::error_code& Ec) { if (Ec) { return; } BroadcastSessions(); EnqueuePushTimer(); }); } } // namespace zen