diff options
Diffstat (limited to 'src/zenserver/sessions')
| -rw-r--r-- | src/zenserver/sessions/httpsessions.cpp | 360 | ||||
| -rw-r--r-- | src/zenserver/sessions/httpsessions.h | 33 | ||||
| -rw-r--r-- | src/zenserver/sessions/inprocsessionlogsink.cpp | 39 | ||||
| -rw-r--r-- | src/zenserver/sessions/inprocsessionlogsink.h | 30 | ||||
| -rw-r--r-- | src/zenserver/sessions/sessions.cpp | 141 | ||||
| -rw-r--r-- | src/zenserver/sessions/sessions.h | 45 |
6 files changed, 590 insertions, 58 deletions
diff --git a/src/zenserver/sessions/httpsessions.cpp b/src/zenserver/sessions/httpsessions.cpp index 6cf12bea4..429ba98cf 100644 --- a/src/zenserver/sessions/httpsessions.cpp +++ b/src/zenserver/sessions/httpsessions.cpp @@ -3,7 +3,6 @@ #include "httpsessions.h" #include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/trace.h> @@ -12,17 +11,22 @@ namespace zen { using namespace std::literals; -HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService, HttpStatsService& StatsService, SessionsService& Sessions) +HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService, + HttpStatsService& StatsService, + SessionsService& Sessions, + asio::io_context& IoContext) : m_Log(logging::Get("sessions")) , m_StatusService(StatusService) , m_StatsService(StatsService) , m_Sessions(Sessions) +, m_PushTimer(IoContext) { Initialize(); } HttpSessionsService::~HttpSessionsService() { + m_PushTimer.cancel(); m_StatsService.UnregisterHandler("sessions", *this); m_StatusService.UnregisterHandler("sessions", *this); } @@ -107,12 +111,24 @@ HttpSessionsService::Initialize() HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kPut | HttpVerb::kDelete); m_Router.RegisterRoute( + "{session_id}/log", + [this](HttpRouterRequest& Req) { SessionLogRequest(Req); }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( "", [this](HttpRouterRequest& Req) { ListSessionsRequest(Req); }, HttpVerb::kGet); + m_Router.RegisterRoute( + "ws", + [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK); }, + HttpVerb::kGet); + m_StatsService.RegisterHandler("sessions", *this); m_StatusService.RegisterHandler("sessions", *this); + + EnqueuePushTimer(); } static void @@ -123,24 +139,55 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info) { Writer << "appname" << Info.AppName; } + if (!Info.Mode.empty()) + { + Writer << "mode" << Info.Mode; + } if (Info.JobId != Oid::Zero) { Writer << "jobid" << Info.JobId; } Writer << "created_at" << Info.CreatedAt; Writer << "updated_at" << Info.UpdatedAt; + if (Info.EndedAt.GetTicks() != 0) + { + Writer << "ended_at" << Info.EndedAt; + } if (Info.Metadata.GetSize() > 0) { - Writer.BeginObject("metadata"); - for (const CbField& Field : Info.Metadata) - { - Writer.AddField(Field); - } - Writer.EndObject(); + Writer.AddObject("metadata"sv, Info.Metadata); } } +CbObject +HttpSessionsService::BuildSessionListResponse() +{ + std::vector<Ref<SessionsService::Session>> Active = m_Sessions.GetSessions(); + std::vector<Ref<SessionsService::Session>> Ended = m_Sessions.GetEndedSessions(); + + CbObjectWriter Response; + if (m_SelfSessionId != Oid::Zero) + { + Response << "self_id" << m_SelfSessionId; + } + Response.BeginArray("sessions"); + for (const Ref<SessionsService::Session>& Session : Active) + { + Response.BeginObject(); + WriteSessionInfo(Response, Session->Info()); + Response.EndObject(); + } + for (const Ref<SessionsService::Session>& Session : Ended) + { + Response.BeginObject(); + WriteSessionInfo(Response, Session->Info()); + Response.EndObject(); + } + Response.EndArray(); + return Response.Save(); +} + void HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req) { @@ -149,16 +196,35 @@ HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req) m_SessionsStats.SessionListCount++; m_SessionsStats.RequestCount++; - std::vector<Ref<SessionsService::Session>> Sessions = m_Sessions.GetSessions(); + HttpServerRequest::QueryParams Params = ServerRequest.GetQueryParams(); + std::string_view Status = Params.GetValue("status"sv); + + std::vector<Ref<SessionsService::Session>> Sessions; + if (Status == "ended"sv) + { + Sessions = m_Sessions.GetEndedSessions(); + } + else if (Status == "all"sv) + { + Sessions = m_Sessions.GetSessions(); + std::vector<Ref<SessionsService::Session>> Ended = m_Sessions.GetEndedSessions(); + Sessions.insert(Sessions.end(), Ended.begin(), Ended.end()); + } + else + { + Sessions = m_Sessions.GetSessions(); + } CbObjectWriter Response; + if (m_SelfSessionId != Oid::Zero) + { + Response << "self_id" << m_SelfSessionId; + } Response.BeginArray("sessions"); for (const Ref<SessionsService::Session>& Session : Sessions) { Response.BeginObject(); - { - WriteSessionInfo(Response, Session->Info()); - } + WriteSessionInfo(Response, Session->Info()); Response.EndObject(); } Response.EndArray(); @@ -187,30 +253,17 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req) case HttpVerb::kPost: case HttpVerb::kPut: { - IoBuffer Payload = ServerRequest.ReadPayload(); - CbObject RequestObject; - - if (Payload.GetSize() > 0) - { - if (CbValidateError ValidationResult = ValidateCompactBinary(Payload.GetView(), CbValidateMode::All); - ValidationResult != CbValidateError::None) - { - m_SessionsStats.BadRequestCount++; - return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Invalid payload: {}", zen::ToString(ValidationResult))); - } - RequestObject = LoadCompactBinaryObject(Payload); - } + CbObject RequestObject = ServerRequest.ReadPayloadObject(); if (ServerRequest.RequestVerb() == HttpVerb::kPost) { std::string AppName(RequestObject["appname"sv].AsString()); + std::string Mode(RequestObject["mode"sv].AsString()); Oid JobId = RequestObject["jobid"sv].AsObjectId(); CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView(); m_SessionsStats.SessionWriteCount++; - if (m_Sessions.RegisterSession(SessionId, std::move(AppName), JobId, MetadataView)) + if (m_Sessions.RegisterSession(SessionId, std::move(AppName), std::move(Mode), JobId, MetadataView)) { return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", SessionId)); } @@ -265,4 +318,255 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req) } } +////////////////////////////////////////////////////////////////////////// +// +// Session log +// + +static void +WriteLogEntry(CbWriter& Writer, const SessionsService::LogEntry& Entry) +{ + Writer << "timestamp" << Entry.Timestamp; + if (!Entry.Level.empty()) + { + Writer << "level" << Entry.Level; + } + if (!Entry.Message.empty()) + { + Writer << "message" << Entry.Message; + } + if (Entry.Data.GetSize() > 0) + { + Writer.AddObject("data"sv, Entry.Data); + } +} + +void +HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req) +{ + HttpServerRequest& ServerRequest = Req.ServerRequest(); + + const Oid SessionId = Oid::TryFromHexString(Req.GetCapture(1)); + if (SessionId == Oid::Zero) + { + m_SessionsStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid session id '{}'", Req.GetCapture(1))); + } + + m_SessionsStats.RequestCount++; + + Ref<SessionsService::Session> Session = m_Sessions.GetSession(SessionId); + if (!Session) + { + return ServerRequest.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Session '{}' not found", SessionId)); + } + + if (ServerRequest.RequestVerb() == HttpVerb::kPost) + { + m_SessionsStats.SessionWriteCount++; + + if (ServerRequest.RequestContentType() == HttpContentType::kText) + { + // Raw text — split by newlines, one entry per line + IoBuffer Payload = ServerRequest.ReadPayload(); + std::string_view Text(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); + const DateTime Now = DateTime::Now(); + + size_t Pos = 0; + while (Pos < Text.size()) + { + size_t End = Text.find('\n', Pos); + if (End == std::string_view::npos) + { + End = Text.size(); + } + + std::string_view Line = Text.substr(Pos, End - Pos); + // Strip trailing \r + if (!Line.empty() && Line.back() == '\r') + { + Line.remove_suffix(1); + } + + if (!Line.empty()) + { + Session->AppendLog(SessionsService::LogEntry{ + .Timestamp = Now, + .Message = std::string(Line), + }); + } + + Pos = End + 1; + } + } + else + { + // Structured log (JSON or CbObject) + // Accepts a single record or an "entries" array of records + CbObject RequestObject = ServerRequest.ReadPayloadObject(); + const DateTime Now = DateTime::Now(); + + auto AppendFromObject = [&](CbObjectView Obj) { + std::string Level(Obj["level"sv].AsString()); + std::string Message(Obj["message"sv].AsString()); + CbObjectView DataView = Obj["data"sv].AsObjectView(); + + Session->AppendLog(SessionsService::LogEntry{ + .Timestamp = Now, + .Level = std::move(Level), + .Message = std::move(Message), + .Data = CbObject::Clone(DataView), + }); + }; + + CbFieldView EntriesField = RequestObject["entries"sv]; + if (EntriesField.IsArray()) + { + for (CbFieldView Entry : EntriesField) + { + AppendFromObject(Entry.AsObjectView()); + } + } + else + { + AppendFromObject(RequestObject); + } + } + + return ServerRequest.WriteResponse(HttpResponseCode::OK); + } + else + { + // GET - return log entries + m_SessionsStats.SessionReadCount++; + + HttpServerRequest::QueryParams Params = ServerRequest.GetQueryParams(); + + // cursor-based retrieval: client passes the cursor from the previous response + // and receives only entries appended since then. + std::string_view CursorStr = Params.GetValue("cursor"sv); + if (!CursorStr.empty()) + { + uint64_t AfterCursor = std::strtoull(std::string(CursorStr).c_str(), nullptr, 10); + + SessionsService::Session::CursorResult Result = Session->GetLogEntriesAfter(AfterCursor); + + CbObjectWriter Response; + Response << "cursor" << Result.Cursor; + Response << "count" << Result.Count; + Response.BeginArray("entries"); + for (const SessionsService::LogEntry& Entry : Result.Entries) + { + Response.BeginObject(); + WriteLogEntry(Response, Entry); + Response.EndObject(); + } + Response.EndArray(); + + return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); + } + + // Legacy offset/limit retrieval + uint32_t Limit = 0; + uint32_t Offset = 0; + + if (std::string_view LimitStr = Params.GetValue("limit"sv); !LimitStr.empty()) + { + Limit = uint32_t(std::strtoul(std::string(LimitStr).c_str(), nullptr, 10)); + } + if (std::string_view OffsetStr = Params.GetValue("offset"sv); !OffsetStr.empty()) + { + Offset = uint32_t(std::strtoul(std::string(OffsetStr).c_str(), nullptr, 10)); + } + + std::vector<SessionsService::LogEntry> Entries = Session->GetLogEntries(Limit, Offset); + + CbObjectWriter Response; + Response << "total" << Session->GetLogCount(); + Response.BeginArray("entries"); + for (const SessionsService::LogEntry& Entry : Entries) + { + Response.BeginObject(); + WriteLogEntry(Response, Entry); + Response.EndObject(); + } + Response.EndArray(); + + return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); + } +} + +////////////////////////////////////////////////////////////////////////// +// +// WebSocket push +// + +void +HttpSessionsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection) +{ + ZEN_INFO("Sessions WebSocket client connected"); + m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); +} + +void +HttpSessionsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/) +{ + // No client-to-server messages expected +} + +void +HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason) +{ + ZEN_INFO("Sessions WebSocket client disconnected (code {})", Code); + m_WsConnectionsLock.WithExclusiveLock([&] { + auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) { + return C.Get() == &Conn; + }); + m_WsConnections.erase(It, m_WsConnections.end()); + }); +} + +void +HttpSessionsService::BroadcastSessions() +{ + std::vector<Ref<WebSocketConnection>> Connections; + m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; }); + + if (Connections.empty()) + { + return; + } + + ExtendableStringBuilder<4096> JsonBuilder; + BuildSessionListResponse().ToJson(JsonBuilder); + std::string_view Json = JsonBuilder.ToView(); + + for (const Ref<WebSocketConnection>& Conn : Connections) + { + if (Conn->IsOpen()) + { + Conn->SendText(Json); + } + } +} + +void +HttpSessionsService::EnqueuePushTimer() +{ + m_PushTimer.expires_after(std::chrono::seconds(2)); + m_PushTimer.async_wait([this](const asio::error_code& Ec) { + if (Ec) + { + return; + } + + BroadcastSessions(); + EnqueuePushTimer(); + }); +} + } // namespace zen diff --git a/src/zenserver/sessions/httpsessions.h b/src/zenserver/sessions/httpsessions.h index e07f3b59b..a5783a46b 100644 --- a/src/zenserver/sessions/httpsessions.h +++ b/src/zenserver/sessions/httpsessions.h @@ -5,16 +5,25 @@ #include <zenhttp/httpserver.h> #include <zenhttp/httpstats.h> #include <zenhttp/httpstatus.h> +#include <zenhttp/websocket.h> #include <zentelemetry/stats.h> +ZEN_THIRD_PARTY_INCLUDES_START +#include <asio/io_context.hpp> +#include <asio/steady_timer.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + namespace zen { class SessionsService; -class HttpSessionsService final : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider +class HttpSessionsService final : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider, public IWebSocketHandler { public: - HttpSessionsService(HttpStatusService& StatusService, HttpStatsService& StatsService, SessionsService& Sessions); + HttpSessionsService(HttpStatusService& StatusService, + HttpStatsService& StatsService, + SessionsService& Sessions, + asio::io_context& IoContext); virtual ~HttpSessionsService(); virtual const char* BaseUri() const override; @@ -24,6 +33,13 @@ public: virtual void HandleStatsRequest(HttpServerRequest& Request) override; virtual void HandleStatusRequest(HttpServerRequest& Request) override; + void SetSelfSessionId(const Oid& Id) { m_SelfSessionId = Id; } + + // IWebSocketHandler + void OnWebSocketOpen(Ref<WebSocketConnection> Connection) override; + void OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg) override; + void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason) override; + private: struct SessionsStats { @@ -43,6 +59,7 @@ private: void ListSessionsRequest(HttpRouterRequest& Req); void SessionRequest(HttpRouterRequest& Req); + void SessionLogRequest(HttpRouterRequest& Req); HttpStatusService& m_StatusService; HttpStatsService& m_StatsService; @@ -50,6 +67,18 @@ private: SessionsService& m_Sessions; SessionsStats m_SessionsStats; metrics::OperationTiming m_HttpRequests; + + // WebSocket push + RwLock m_WsConnectionsLock; + std::vector<Ref<WebSocketConnection>> m_WsConnections; + asio::steady_timer m_PushTimer; + + void BroadcastSessions(); + void EnqueuePushTimer(); + + Oid m_SelfSessionId = Oid::Zero; + + CbObject BuildSessionListResponse(); }; } // namespace zen diff --git a/src/zenserver/sessions/inprocsessionlogsink.cpp b/src/zenserver/sessions/inprocsessionlogsink.cpp new file mode 100644 index 000000000..9982859b6 --- /dev/null +++ b/src/zenserver/sessions/inprocsessionlogsink.cpp @@ -0,0 +1,39 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "inprocsessionlogsink.h" + +#include <zencore/logbase.h> + +namespace zen { + +/// Bias in seconds from DateTime epoch (year 1) to Unix epoch (1970). +static constexpr uint64_t UnixEpochBiasSeconds = uint64_t(double(1970 - 1) * 365.2425) * 86400; + +static DateTime +TimePointToDateTime(logging::LogClock::time_point Time) +{ + auto Duration = Time.time_since_epoch(); + auto Seconds = std::chrono::duration_cast<std::chrono::seconds>(Duration); + uint64_t Ticks = (UnixEpochBiasSeconds + static_cast<uint64_t>(Seconds.count())) * TimeSpan::TicksPerSecond; + return DateTime{Ticks}; +} + +void +InProcSessionLogSink::Log(const logging::LogMessage& Msg) +{ + Ref<SessionsService::Session> Session = m_Service.GetSession(m_SessionId); + if (!Session) + { + return; + } + + SessionsService::LogEntry Entry{ + .Timestamp = TimePointToDateTime(Msg.GetTime()), + .Level = std::string(logging::ToStringView(Msg.GetLevel())), + .Message = std::string(Msg.GetPayload()), + }; + + Session->AppendLog(std::move(Entry)); +} + +} // namespace zen diff --git a/src/zenserver/sessions/inprocsessionlogsink.h b/src/zenserver/sessions/inprocsessionlogsink.h new file mode 100644 index 000000000..15f9b5ec3 --- /dev/null +++ b/src/zenserver/sessions/inprocsessionlogsink.h @@ -0,0 +1,30 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "sessions.h" + +#include <zencore/logging/sink.h> +#include <zencore/session.h> + +#include <mutex> + +namespace zen { + +/// Log sink that forwards log messages to the server's own session +/// in the SessionsService, making them visible in the sessions browser UI. +class InProcSessionLogSink : public logging::Sink +{ +public: + explicit InProcSessionLogSink(SessionsService& Service) : m_Service(Service), m_SessionId(GetSessionId()) {} + + void Log(const logging::LogMessage& Msg) override; + void Flush() override {} + void SetFormatter(std::unique_ptr<logging::Formatter> /*InFormatter*/) override {} + +private: + SessionsService& m_Service; + Oid m_SessionId; +}; + +} // namespace zen diff --git a/src/zenserver/sessions/sessions.cpp b/src/zenserver/sessions/sessions.cpp index d919db6e9..1212ba5d8 100644 --- a/src/zenserver/sessions/sessions.cpp +++ b/src/zenserver/sessions/sessions.cpp @@ -46,6 +46,78 @@ SessionsService::Session::Session(const SessionInfo& Info) : m_Info(Info) } SessionsService::Session::~Session() = default; +void +SessionsService::Session::AppendLog(LogEntry Entry) +{ + RwLock::ExclusiveLockScope Lock(m_LogLock); + m_LogEntries.push_back(std::move(Entry)); + ++m_TotalAppended; + while (m_LogEntries.size() > MaxLogEntries) + { + m_LogEntries.pop_front(); + } +} + +std::vector<SessionsService::LogEntry> +SessionsService::Session::GetLogEntries(uint32_t Limit, uint32_t Offset) const +{ + RwLock::SharedLockScope Lock(m_LogLock); + + const uint32_t Total = uint32_t(m_LogEntries.size()); + if (Offset >= Total) + { + return {}; + } + + const uint32_t Available = Total - Offset; + const uint32_t Count = (Limit > 0) ? std::min(Limit, Available) : Available; + + std::vector<LogEntry> Result; + Result.reserve(Count); + for (uint32_t i = Offset; i < Offset + Count; i++) + { + Result.push_back(m_LogEntries[i]); + } + return Result; +} + +uint64_t +SessionsService::Session::GetLogCount() const +{ + RwLock::SharedLockScope Lock(m_LogLock); + return m_LogEntries.size(); +} + +SessionsService::Session::CursorResult +SessionsService::Session::GetLogEntriesAfter(uint64_t AfterCursor) const +{ + RwLock::SharedLockScope Lock(m_LogLock); + + const uint64_t DequeSize = m_LogEntries.size(); + + // Cursor 0 means "give me everything currently in the deque". + // Otherwise, compute how many new entries were appended since the cursor. + uint64_t NewCount = (AfterCursor == 0) ? DequeSize : (m_TotalAppended > AfterCursor ? m_TotalAppended - AfterCursor : 0); + + // Clamp to what's actually available in the deque (entries may have been evicted). + NewCount = std::min(NewCount, DequeSize); + + std::vector<LogEntry> Result; + Result.reserve(NewCount); + + const uint64_t StartIndex = DequeSize - NewCount; + for (uint64_t i = StartIndex; i < DequeSize; i++) + { + Result.push_back(m_LogEntries[i]); + } + + return CursorResult{ + .Entries = std::move(Result), + .Cursor = m_TotalAppended, + .Count = DequeSize, + }; +} + ////////////////////////////////////////////////////////////////////////// SessionsService::SessionsService() : m_Log(logging::Get("sessions")) @@ -55,25 +127,31 @@ SessionsService::SessionsService() : m_Log(logging::Get("sessions")) SessionsService::~SessionsService() = default; bool -SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, const Oid& JobId, CbObjectView Metadata) +SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, const Oid& JobId, CbObjectView Metadata) { - RwLock::ExclusiveLockScope Lock(m_Lock); - - if (m_Sessions.contains(SessionId)) + // Log outside the lock scope — InProcSessionLogSink calls back into + // GetSession() which acquires m_Lock shared, so logging while holding + // m_Lock exclusively would deadlock. { - return false; + RwLock::ExclusiveLockScope Lock(m_Lock); + + if (m_Sessions.contains(SessionId)) + { + return false; + } + + const DateTime Now = DateTime::Now(); + m_Sessions.emplace(SessionId, + Ref(new Session(SessionInfo{.Id = SessionId, + .AppName = AppName, + .Mode = Mode, + .JobId = JobId, + .Metadata = CbObject::Clone(Metadata), + .CreatedAt = Now, + .UpdatedAt = Now}))); } - ZEN_INFO("Session {} registered (AppName: {}, JobId: {})", SessionId, AppName, JobId); - - const DateTime Now = DateTime::Now(); - m_Sessions.emplace(SessionId, - Ref(new Session(SessionInfo{.Id = SessionId, - .AppName = std::move(AppName), - .JobId = JobId, - .Metadata = CbObject::Clone(Metadata), - .CreatedAt = Now, - .UpdatedAt = Now}))); + ZEN_INFO("Session {} registered (AppName: {}, Mode: {}, JobId: {})", SessionId, AppName, Mode, JobId); return true; } @@ -126,20 +204,39 @@ SessionsService::GetSessions() const bool SessionsService::RemoveSession(const Oid& SessionId) { - RwLock::ExclusiveLockScope Lock(m_Lock); + std::string RemovedAppName; + Oid RemovedJobId; - auto It = m_Sessions.find(SessionId); - if (It == m_Sessions.end()) { - return false; - } + RwLock::ExclusiveLockScope Lock(m_Lock); + + auto It = m_Sessions.find(SessionId); + if (It == m_Sessions.end()) + { + return false; + } + + RemovedAppName = It.value()->Info().AppName; + RemovedJobId = It.value()->Info().JobId; + + Ref<Session> Ended = It.value(); + Ended->SetEndedAt(DateTime::Now()); + m_EndedSessions.push_back(std::move(Ended)); - ZEN_INFO("Session {} removed (AppName: {}, JobId: {})", SessionId, It.value()->Info().AppName, It.value()->Info().JobId); + m_Sessions.erase(It); + } - m_Sessions.erase(It); + ZEN_INFO("Session {} removed (AppName: {}, JobId: {})", SessionId, RemovedAppName, RemovedJobId); return true; } +std::vector<Ref<SessionsService::Session>> +SessionsService::GetEndedSessions() const +{ + RwLock::SharedLockScope Lock(m_Lock); + return m_EndedSessions; +} + uint64_t SessionsService::GetSessionCount() const { diff --git a/src/zenserver/sessions/sessions.h b/src/zenserver/sessions/sessions.h index db9704430..8f07bfc31 100644 --- a/src/zenserver/sessions/sessions.h +++ b/src/zenserver/sessions/sessions.h @@ -11,6 +11,7 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> ZEN_THIRD_PARTY_INCLUDES_END +#include <deque> #include <optional> #include <string> #include <vector> @@ -33,10 +34,20 @@ public: { Oid Id; std::string AppName; + std::string Mode; Oid JobId; CbObject Metadata; DateTime CreatedAt; DateTime UpdatedAt; + DateTime EndedAt{0}; + }; + + struct LogEntry + { + DateTime Timestamp; + std::string Level; + std::string Message; + CbObject Data; }; class Session : public TRefCounted<Session> @@ -51,23 +62,44 @@ public: const SessionInfo& Info() const { return m_Info; } void UpdateMetadata(CbObjectView Metadata) { - // Should this be additive rather than replacing the whole thing? We'll see. m_Info.Metadata = CbObject::Clone(Metadata); m_Info.UpdatedAt = DateTime::Now(); } + void SetEndedAt(DateTime When) { m_Info.EndedAt = When; } + + void AppendLog(LogEntry Entry); + std::vector<LogEntry> GetLogEntries(uint32_t Limit = 0, uint32_t Offset = 0) const; + uint64_t GetLogCount() const; + + /// Returns entries appended after the given cursor and the new cursor value. + /// A cursor of 0 returns all entries currently in the deque. + struct CursorResult + { + std::vector<LogEntry> Entries; + uint64_t Cursor; // new cursor for next poll + uint64_t Count; // current deque size + }; + CursorResult GetLogEntriesAfter(uint64_t AfterCursor) const; + private: - SessionInfo m_Info; - Ref<SessionLog> m_Log; + SessionInfo m_Info; + Ref<SessionLog> m_Log; + mutable RwLock m_LogLock; + std::deque<LogEntry> m_LogEntries; + uint64_t m_TotalAppended = 0; // monotonically increasing counter + + static constexpr uint32_t MaxLogEntries = 10000; }; SessionsService(); ~SessionsService(); - bool RegisterSession(const Oid& SessionId, std::string AppName, const Oid& JobId, CbObjectView Metadata); - bool UpdateSession(const Oid& SessionId, CbObjectView Metadata); - Ref<Session> GetSession(const Oid& SessionId) const; + bool RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, const Oid& JobId, CbObjectView Metadata); + bool UpdateSession(const Oid& SessionId, CbObjectView Metadata); + Ref<Session> GetSession(const Oid& SessionId) const; std::vector<Ref<Session>> GetSessions() const; + std::vector<Ref<Session>> GetEndedSessions() const; bool RemoveSession(const Oid& SessionId); uint64_t GetSessionCount() const; @@ -77,6 +109,7 @@ private: LoggerRef m_Log; mutable RwLock m_Lock; tsl::robin_map<Oid, Ref<Session>, Oid::Hasher> m_Sessions; + std::vector<Ref<Session>> m_EndedSessions; std::unique_ptr<SessionLogStore> m_SessionLogs; }; |