aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/sessions
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/sessions')
-rw-r--r--src/zenserver/sessions/httpsessions.cpp360
-rw-r--r--src/zenserver/sessions/httpsessions.h33
-rw-r--r--src/zenserver/sessions/inprocsessionlogsink.cpp39
-rw-r--r--src/zenserver/sessions/inprocsessionlogsink.h30
-rw-r--r--src/zenserver/sessions/sessions.cpp141
-rw-r--r--src/zenserver/sessions/sessions.h45
6 files changed, 590 insertions, 58 deletions
diff --git a/src/zenserver/sessions/httpsessions.cpp b/src/zenserver/sessions/httpsessions.cpp
index 6cf12bea4..429ba98cf 100644
--- a/src/zenserver/sessions/httpsessions.cpp
+++ b/src/zenserver/sessions/httpsessions.cpp
@@ -3,7 +3,6 @@
#include "httpsessions.h"
#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/trace.h>
@@ -12,17 +11,22 @@
namespace zen {
using namespace std::literals;
-HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService, HttpStatsService& StatsService, SessionsService& Sessions)
+HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService,
+ HttpStatsService& StatsService,
+ SessionsService& Sessions,
+ asio::io_context& IoContext)
: m_Log(logging::Get("sessions"))
, m_StatusService(StatusService)
, m_StatsService(StatsService)
, m_Sessions(Sessions)
+, m_PushTimer(IoContext)
{
Initialize();
}
HttpSessionsService::~HttpSessionsService()
{
+ m_PushTimer.cancel();
m_StatsService.UnregisterHandler("sessions", *this);
m_StatusService.UnregisterHandler("sessions", *this);
}
@@ -107,12 +111,24 @@ HttpSessionsService::Initialize()
HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kPut | HttpVerb::kDelete);
m_Router.RegisterRoute(
+ "{session_id}/log",
+ [this](HttpRouterRequest& Req) { SessionLogRequest(Req); },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
"",
[this](HttpRouterRequest& Req) { ListSessionsRequest(Req); },
HttpVerb::kGet);
+ m_Router.RegisterRoute(
+ "ws",
+ [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK); },
+ HttpVerb::kGet);
+
m_StatsService.RegisterHandler("sessions", *this);
m_StatusService.RegisterHandler("sessions", *this);
+
+ EnqueuePushTimer();
}
static void
@@ -123,24 +139,55 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info)
{
Writer << "appname" << Info.AppName;
}
+ if (!Info.Mode.empty())
+ {
+ Writer << "mode" << Info.Mode;
+ }
if (Info.JobId != Oid::Zero)
{
Writer << "jobid" << Info.JobId;
}
Writer << "created_at" << Info.CreatedAt;
Writer << "updated_at" << Info.UpdatedAt;
+ if (Info.EndedAt.GetTicks() != 0)
+ {
+ Writer << "ended_at" << Info.EndedAt;
+ }
if (Info.Metadata.GetSize() > 0)
{
- Writer.BeginObject("metadata");
- for (const CbField& Field : Info.Metadata)
- {
- Writer.AddField(Field);
- }
- Writer.EndObject();
+ Writer.AddObject("metadata"sv, Info.Metadata);
}
}
+CbObject
+HttpSessionsService::BuildSessionListResponse()
+{
+ std::vector<Ref<SessionsService::Session>> Active = m_Sessions.GetSessions();
+ std::vector<Ref<SessionsService::Session>> Ended = m_Sessions.GetEndedSessions();
+
+ CbObjectWriter Response;
+ if (m_SelfSessionId != Oid::Zero)
+ {
+ Response << "self_id" << m_SelfSessionId;
+ }
+ Response.BeginArray("sessions");
+ for (const Ref<SessionsService::Session>& Session : Active)
+ {
+ Response.BeginObject();
+ WriteSessionInfo(Response, Session->Info());
+ Response.EndObject();
+ }
+ for (const Ref<SessionsService::Session>& Session : Ended)
+ {
+ Response.BeginObject();
+ WriteSessionInfo(Response, Session->Info());
+ Response.EndObject();
+ }
+ Response.EndArray();
+ return Response.Save();
+}
+
void
HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req)
{
@@ -149,16 +196,35 @@ HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req)
m_SessionsStats.SessionListCount++;
m_SessionsStats.RequestCount++;
- std::vector<Ref<SessionsService::Session>> Sessions = m_Sessions.GetSessions();
+ HttpServerRequest::QueryParams Params = ServerRequest.GetQueryParams();
+ std::string_view Status = Params.GetValue("status"sv);
+
+ std::vector<Ref<SessionsService::Session>> Sessions;
+ if (Status == "ended"sv)
+ {
+ Sessions = m_Sessions.GetEndedSessions();
+ }
+ else if (Status == "all"sv)
+ {
+ Sessions = m_Sessions.GetSessions();
+ std::vector<Ref<SessionsService::Session>> Ended = m_Sessions.GetEndedSessions();
+ Sessions.insert(Sessions.end(), Ended.begin(), Ended.end());
+ }
+ else
+ {
+ Sessions = m_Sessions.GetSessions();
+ }
CbObjectWriter Response;
+ if (m_SelfSessionId != Oid::Zero)
+ {
+ Response << "self_id" << m_SelfSessionId;
+ }
Response.BeginArray("sessions");
for (const Ref<SessionsService::Session>& Session : Sessions)
{
Response.BeginObject();
- {
- WriteSessionInfo(Response, Session->Info());
- }
+ WriteSessionInfo(Response, Session->Info());
Response.EndObject();
}
Response.EndArray();
@@ -187,30 +253,17 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
case HttpVerb::kPost:
case HttpVerb::kPut:
{
- IoBuffer Payload = ServerRequest.ReadPayload();
- CbObject RequestObject;
-
- if (Payload.GetSize() > 0)
- {
- if (CbValidateError ValidationResult = ValidateCompactBinary(Payload.GetView(), CbValidateMode::All);
- ValidationResult != CbValidateError::None)
- {
- m_SessionsStats.BadRequestCount++;
- return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- fmt::format("Invalid payload: {}", zen::ToString(ValidationResult)));
- }
- RequestObject = LoadCompactBinaryObject(Payload);
- }
+ CbObject RequestObject = ServerRequest.ReadPayloadObject();
if (ServerRequest.RequestVerb() == HttpVerb::kPost)
{
std::string AppName(RequestObject["appname"sv].AsString());
+ std::string Mode(RequestObject["mode"sv].AsString());
Oid JobId = RequestObject["jobid"sv].AsObjectId();
CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView();
m_SessionsStats.SessionWriteCount++;
- if (m_Sessions.RegisterSession(SessionId, std::move(AppName), JobId, MetadataView))
+ if (m_Sessions.RegisterSession(SessionId, std::move(AppName), std::move(Mode), JobId, MetadataView))
{
return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", SessionId));
}
@@ -265,4 +318,255 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
}
}
+//////////////////////////////////////////////////////////////////////////
+//
+// Session log
+//
+
+static void
+WriteLogEntry(CbWriter& Writer, const SessionsService::LogEntry& Entry)
+{
+ Writer << "timestamp" << Entry.Timestamp;
+ if (!Entry.Level.empty())
+ {
+ Writer << "level" << Entry.Level;
+ }
+ if (!Entry.Message.empty())
+ {
+ Writer << "message" << Entry.Message;
+ }
+ if (Entry.Data.GetSize() > 0)
+ {
+ Writer.AddObject("data"sv, Entry.Data);
+ }
+}
+
+void
+HttpSessionsService::SessionLogRequest(HttpRouterRequest& Req)
+{
+ HttpServerRequest& ServerRequest = Req.ServerRequest();
+
+ const Oid SessionId = Oid::TryFromHexString(Req.GetCapture(1));
+ if (SessionId == Oid::Zero)
+ {
+ m_SessionsStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Invalid session id '{}'", Req.GetCapture(1)));
+ }
+
+ m_SessionsStats.RequestCount++;
+
+ Ref<SessionsService::Session> Session = m_Sessions.GetSession(SessionId);
+ if (!Session)
+ {
+ return ServerRequest.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Session '{}' not found", SessionId));
+ }
+
+ if (ServerRequest.RequestVerb() == HttpVerb::kPost)
+ {
+ m_SessionsStats.SessionWriteCount++;
+
+ if (ServerRequest.RequestContentType() == HttpContentType::kText)
+ {
+ // Raw text — split by newlines, one entry per line
+ IoBuffer Payload = ServerRequest.ReadPayload();
+ std::string_view Text(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
+ const DateTime Now = DateTime::Now();
+
+ size_t Pos = 0;
+ while (Pos < Text.size())
+ {
+ size_t End = Text.find('\n', Pos);
+ if (End == std::string_view::npos)
+ {
+ End = Text.size();
+ }
+
+ std::string_view Line = Text.substr(Pos, End - Pos);
+ // Strip trailing \r
+ if (!Line.empty() && Line.back() == '\r')
+ {
+ Line.remove_suffix(1);
+ }
+
+ if (!Line.empty())
+ {
+ Session->AppendLog(SessionsService::LogEntry{
+ .Timestamp = Now,
+ .Message = std::string(Line),
+ });
+ }
+
+ Pos = End + 1;
+ }
+ }
+ else
+ {
+ // Structured log (JSON or CbObject)
+ // Accepts a single record or an "entries" array of records
+ CbObject RequestObject = ServerRequest.ReadPayloadObject();
+ const DateTime Now = DateTime::Now();
+
+ auto AppendFromObject = [&](CbObjectView Obj) {
+ std::string Level(Obj["level"sv].AsString());
+ std::string Message(Obj["message"sv].AsString());
+ CbObjectView DataView = Obj["data"sv].AsObjectView();
+
+ Session->AppendLog(SessionsService::LogEntry{
+ .Timestamp = Now,
+ .Level = std::move(Level),
+ .Message = std::move(Message),
+ .Data = CbObject::Clone(DataView),
+ });
+ };
+
+ CbFieldView EntriesField = RequestObject["entries"sv];
+ if (EntriesField.IsArray())
+ {
+ for (CbFieldView Entry : EntriesField)
+ {
+ AppendFromObject(Entry.AsObjectView());
+ }
+ }
+ else
+ {
+ AppendFromObject(RequestObject);
+ }
+ }
+
+ return ServerRequest.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ // GET - return log entries
+ m_SessionsStats.SessionReadCount++;
+
+ HttpServerRequest::QueryParams Params = ServerRequest.GetQueryParams();
+
+ // cursor-based retrieval: client passes the cursor from the previous response
+ // and receives only entries appended since then.
+ std::string_view CursorStr = Params.GetValue("cursor"sv);
+ if (!CursorStr.empty())
+ {
+ uint64_t AfterCursor = std::strtoull(std::string(CursorStr).c_str(), nullptr, 10);
+
+ SessionsService::Session::CursorResult Result = Session->GetLogEntriesAfter(AfterCursor);
+
+ CbObjectWriter Response;
+ Response << "cursor" << Result.Cursor;
+ Response << "count" << Result.Count;
+ Response.BeginArray("entries");
+ for (const SessionsService::LogEntry& Entry : Result.Entries)
+ {
+ Response.BeginObject();
+ WriteLogEntry(Response, Entry);
+ Response.EndObject();
+ }
+ Response.EndArray();
+
+ return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save());
+ }
+
+ // Legacy offset/limit retrieval
+ uint32_t Limit = 0;
+ uint32_t Offset = 0;
+
+ if (std::string_view LimitStr = Params.GetValue("limit"sv); !LimitStr.empty())
+ {
+ Limit = uint32_t(std::strtoul(std::string(LimitStr).c_str(), nullptr, 10));
+ }
+ if (std::string_view OffsetStr = Params.GetValue("offset"sv); !OffsetStr.empty())
+ {
+ Offset = uint32_t(std::strtoul(std::string(OffsetStr).c_str(), nullptr, 10));
+ }
+
+ std::vector<SessionsService::LogEntry> Entries = Session->GetLogEntries(Limit, Offset);
+
+ CbObjectWriter Response;
+ Response << "total" << Session->GetLogCount();
+ Response.BeginArray("entries");
+ for (const SessionsService::LogEntry& Entry : Entries)
+ {
+ Response.BeginObject();
+ WriteLogEntry(Response, Entry);
+ Response.EndObject();
+ }
+ Response.EndArray();
+
+ return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save());
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// WebSocket push
+//
+
+void
+HttpSessionsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection)
+{
+ ZEN_INFO("Sessions WebSocket client connected");
+ m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); });
+}
+
+void
+HttpSessionsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/)
+{
+ // No client-to-server messages expected
+}
+
+void
+HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason)
+{
+ ZEN_INFO("Sessions WebSocket client disconnected (code {})", Code);
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) {
+ return C.Get() == &Conn;
+ });
+ m_WsConnections.erase(It, m_WsConnections.end());
+ });
+}
+
+void
+HttpSessionsService::BroadcastSessions()
+{
+ std::vector<Ref<WebSocketConnection>> Connections;
+ m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; });
+
+ if (Connections.empty())
+ {
+ return;
+ }
+
+ ExtendableStringBuilder<4096> JsonBuilder;
+ BuildSessionListResponse().ToJson(JsonBuilder);
+ std::string_view Json = JsonBuilder.ToView();
+
+ for (const Ref<WebSocketConnection>& Conn : Connections)
+ {
+ if (Conn->IsOpen())
+ {
+ Conn->SendText(Json);
+ }
+ }
+}
+
+void
+HttpSessionsService::EnqueuePushTimer()
+{
+ m_PushTimer.expires_after(std::chrono::seconds(2));
+ m_PushTimer.async_wait([this](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+
+ BroadcastSessions();
+ EnqueuePushTimer();
+ });
+}
+
} // namespace zen
diff --git a/src/zenserver/sessions/httpsessions.h b/src/zenserver/sessions/httpsessions.h
index e07f3b59b..a5783a46b 100644
--- a/src/zenserver/sessions/httpsessions.h
+++ b/src/zenserver/sessions/httpsessions.h
@@ -5,16 +5,25 @@
#include <zenhttp/httpserver.h>
#include <zenhttp/httpstats.h>
#include <zenhttp/httpstatus.h>
+#include <zenhttp/websocket.h>
#include <zentelemetry/stats.h>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <asio/io_context.hpp>
+#include <asio/steady_timer.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
namespace zen {
class SessionsService;
-class HttpSessionsService final : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider
+class HttpSessionsService final : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider, public IWebSocketHandler
{
public:
- HttpSessionsService(HttpStatusService& StatusService, HttpStatsService& StatsService, SessionsService& Sessions);
+ HttpSessionsService(HttpStatusService& StatusService,
+ HttpStatsService& StatsService,
+ SessionsService& Sessions,
+ asio::io_context& IoContext);
virtual ~HttpSessionsService();
virtual const char* BaseUri() const override;
@@ -24,6 +33,13 @@ public:
virtual void HandleStatsRequest(HttpServerRequest& Request) override;
virtual void HandleStatusRequest(HttpServerRequest& Request) override;
+ void SetSelfSessionId(const Oid& Id) { m_SelfSessionId = Id; }
+
+ // IWebSocketHandler
+ void OnWebSocketOpen(Ref<WebSocketConnection> Connection) override;
+ void OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg) override;
+ void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason) override;
+
private:
struct SessionsStats
{
@@ -43,6 +59,7 @@ private:
void ListSessionsRequest(HttpRouterRequest& Req);
void SessionRequest(HttpRouterRequest& Req);
+ void SessionLogRequest(HttpRouterRequest& Req);
HttpStatusService& m_StatusService;
HttpStatsService& m_StatsService;
@@ -50,6 +67,18 @@ private:
SessionsService& m_Sessions;
SessionsStats m_SessionsStats;
metrics::OperationTiming m_HttpRequests;
+
+ // WebSocket push
+ RwLock m_WsConnectionsLock;
+ std::vector<Ref<WebSocketConnection>> m_WsConnections;
+ asio::steady_timer m_PushTimer;
+
+ void BroadcastSessions();
+ void EnqueuePushTimer();
+
+ Oid m_SelfSessionId = Oid::Zero;
+
+ CbObject BuildSessionListResponse();
};
} // namespace zen
diff --git a/src/zenserver/sessions/inprocsessionlogsink.cpp b/src/zenserver/sessions/inprocsessionlogsink.cpp
new file mode 100644
index 000000000..9982859b6
--- /dev/null
+++ b/src/zenserver/sessions/inprocsessionlogsink.cpp
@@ -0,0 +1,39 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "inprocsessionlogsink.h"
+
+#include <zencore/logbase.h>
+
+namespace zen {
+
+/// Bias in seconds from DateTime epoch (year 1) to Unix epoch (1970).
+static constexpr uint64_t UnixEpochBiasSeconds = uint64_t(double(1970 - 1) * 365.2425) * 86400;
+
+static DateTime
+TimePointToDateTime(logging::LogClock::time_point Time)
+{
+ auto Duration = Time.time_since_epoch();
+ auto Seconds = std::chrono::duration_cast<std::chrono::seconds>(Duration);
+ uint64_t Ticks = (UnixEpochBiasSeconds + static_cast<uint64_t>(Seconds.count())) * TimeSpan::TicksPerSecond;
+ return DateTime{Ticks};
+}
+
+void
+InProcSessionLogSink::Log(const logging::LogMessage& Msg)
+{
+ Ref<SessionsService::Session> Session = m_Service.GetSession(m_SessionId);
+ if (!Session)
+ {
+ return;
+ }
+
+ SessionsService::LogEntry Entry{
+ .Timestamp = TimePointToDateTime(Msg.GetTime()),
+ .Level = std::string(logging::ToStringView(Msg.GetLevel())),
+ .Message = std::string(Msg.GetPayload()),
+ };
+
+ Session->AppendLog(std::move(Entry));
+}
+
+} // namespace zen
diff --git a/src/zenserver/sessions/inprocsessionlogsink.h b/src/zenserver/sessions/inprocsessionlogsink.h
new file mode 100644
index 000000000..15f9b5ec3
--- /dev/null
+++ b/src/zenserver/sessions/inprocsessionlogsink.h
@@ -0,0 +1,30 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "sessions.h"
+
+#include <zencore/logging/sink.h>
+#include <zencore/session.h>
+
+#include <mutex>
+
+namespace zen {
+
+/// Log sink that forwards log messages to the server's own session
+/// in the SessionsService, making them visible in the sessions browser UI.
+class InProcSessionLogSink : public logging::Sink
+{
+public:
+ explicit InProcSessionLogSink(SessionsService& Service) : m_Service(Service), m_SessionId(GetSessionId()) {}
+
+ void Log(const logging::LogMessage& Msg) override;
+ void Flush() override {}
+ void SetFormatter(std::unique_ptr<logging::Formatter> /*InFormatter*/) override {}
+
+private:
+ SessionsService& m_Service;
+ Oid m_SessionId;
+};
+
+} // namespace zen
diff --git a/src/zenserver/sessions/sessions.cpp b/src/zenserver/sessions/sessions.cpp
index d919db6e9..1212ba5d8 100644
--- a/src/zenserver/sessions/sessions.cpp
+++ b/src/zenserver/sessions/sessions.cpp
@@ -46,6 +46,78 @@ SessionsService::Session::Session(const SessionInfo& Info) : m_Info(Info)
}
SessionsService::Session::~Session() = default;
+void
+SessionsService::Session::AppendLog(LogEntry Entry)
+{
+ RwLock::ExclusiveLockScope Lock(m_LogLock);
+ m_LogEntries.push_back(std::move(Entry));
+ ++m_TotalAppended;
+ while (m_LogEntries.size() > MaxLogEntries)
+ {
+ m_LogEntries.pop_front();
+ }
+}
+
+std::vector<SessionsService::LogEntry>
+SessionsService::Session::GetLogEntries(uint32_t Limit, uint32_t Offset) const
+{
+ RwLock::SharedLockScope Lock(m_LogLock);
+
+ const uint32_t Total = uint32_t(m_LogEntries.size());
+ if (Offset >= Total)
+ {
+ return {};
+ }
+
+ const uint32_t Available = Total - Offset;
+ const uint32_t Count = (Limit > 0) ? std::min(Limit, Available) : Available;
+
+ std::vector<LogEntry> Result;
+ Result.reserve(Count);
+ for (uint32_t i = Offset; i < Offset + Count; i++)
+ {
+ Result.push_back(m_LogEntries[i]);
+ }
+ return Result;
+}
+
+uint64_t
+SessionsService::Session::GetLogCount() const
+{
+ RwLock::SharedLockScope Lock(m_LogLock);
+ return m_LogEntries.size();
+}
+
+SessionsService::Session::CursorResult
+SessionsService::Session::GetLogEntriesAfter(uint64_t AfterCursor) const
+{
+ RwLock::SharedLockScope Lock(m_LogLock);
+
+ const uint64_t DequeSize = m_LogEntries.size();
+
+ // Cursor 0 means "give me everything currently in the deque".
+ // Otherwise, compute how many new entries were appended since the cursor.
+ uint64_t NewCount = (AfterCursor == 0) ? DequeSize : (m_TotalAppended > AfterCursor ? m_TotalAppended - AfterCursor : 0);
+
+ // Clamp to what's actually available in the deque (entries may have been evicted).
+ NewCount = std::min(NewCount, DequeSize);
+
+ std::vector<LogEntry> Result;
+ Result.reserve(NewCount);
+
+ const uint64_t StartIndex = DequeSize - NewCount;
+ for (uint64_t i = StartIndex; i < DequeSize; i++)
+ {
+ Result.push_back(m_LogEntries[i]);
+ }
+
+ return CursorResult{
+ .Entries = std::move(Result),
+ .Cursor = m_TotalAppended,
+ .Count = DequeSize,
+ };
+}
+
//////////////////////////////////////////////////////////////////////////
SessionsService::SessionsService() : m_Log(logging::Get("sessions"))
@@ -55,25 +127,31 @@ SessionsService::SessionsService() : m_Log(logging::Get("sessions"))
SessionsService::~SessionsService() = default;
bool
-SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, const Oid& JobId, CbObjectView Metadata)
+SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, const Oid& JobId, CbObjectView Metadata)
{
- RwLock::ExclusiveLockScope Lock(m_Lock);
-
- if (m_Sessions.contains(SessionId))
+ // Log outside the lock scope — InProcSessionLogSink calls back into
+ // GetSession() which acquires m_Lock shared, so logging while holding
+ // m_Lock exclusively would deadlock.
{
- return false;
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+
+ if (m_Sessions.contains(SessionId))
+ {
+ return false;
+ }
+
+ const DateTime Now = DateTime::Now();
+ m_Sessions.emplace(SessionId,
+ Ref(new Session(SessionInfo{.Id = SessionId,
+ .AppName = AppName,
+ .Mode = Mode,
+ .JobId = JobId,
+ .Metadata = CbObject::Clone(Metadata),
+ .CreatedAt = Now,
+ .UpdatedAt = Now})));
}
- ZEN_INFO("Session {} registered (AppName: {}, JobId: {})", SessionId, AppName, JobId);
-
- const DateTime Now = DateTime::Now();
- m_Sessions.emplace(SessionId,
- Ref(new Session(SessionInfo{.Id = SessionId,
- .AppName = std::move(AppName),
- .JobId = JobId,
- .Metadata = CbObject::Clone(Metadata),
- .CreatedAt = Now,
- .UpdatedAt = Now})));
+ ZEN_INFO("Session {} registered (AppName: {}, Mode: {}, JobId: {})", SessionId, AppName, Mode, JobId);
return true;
}
@@ -126,20 +204,39 @@ SessionsService::GetSessions() const
bool
SessionsService::RemoveSession(const Oid& SessionId)
{
- RwLock::ExclusiveLockScope Lock(m_Lock);
+ std::string RemovedAppName;
+ Oid RemovedJobId;
- auto It = m_Sessions.find(SessionId);
- if (It == m_Sessions.end())
{
- return false;
- }
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+
+ auto It = m_Sessions.find(SessionId);
+ if (It == m_Sessions.end())
+ {
+ return false;
+ }
+
+ RemovedAppName = It.value()->Info().AppName;
+ RemovedJobId = It.value()->Info().JobId;
+
+ Ref<Session> Ended = It.value();
+ Ended->SetEndedAt(DateTime::Now());
+ m_EndedSessions.push_back(std::move(Ended));
- ZEN_INFO("Session {} removed (AppName: {}, JobId: {})", SessionId, It.value()->Info().AppName, It.value()->Info().JobId);
+ m_Sessions.erase(It);
+ }
- m_Sessions.erase(It);
+ ZEN_INFO("Session {} removed (AppName: {}, JobId: {})", SessionId, RemovedAppName, RemovedJobId);
return true;
}
+std::vector<Ref<SessionsService::Session>>
+SessionsService::GetEndedSessions() const
+{
+ RwLock::SharedLockScope Lock(m_Lock);
+ return m_EndedSessions;
+}
+
uint64_t
SessionsService::GetSessionCount() const
{
diff --git a/src/zenserver/sessions/sessions.h b/src/zenserver/sessions/sessions.h
index db9704430..8f07bfc31 100644
--- a/src/zenserver/sessions/sessions.h
+++ b/src/zenserver/sessions/sessions.h
@@ -11,6 +11,7 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <deque>
#include <optional>
#include <string>
#include <vector>
@@ -33,10 +34,20 @@ public:
{
Oid Id;
std::string AppName;
+ std::string Mode;
Oid JobId;
CbObject Metadata;
DateTime CreatedAt;
DateTime UpdatedAt;
+ DateTime EndedAt{0};
+ };
+
+ struct LogEntry
+ {
+ DateTime Timestamp;
+ std::string Level;
+ std::string Message;
+ CbObject Data;
};
class Session : public TRefCounted<Session>
@@ -51,23 +62,44 @@ public:
const SessionInfo& Info() const { return m_Info; }
void UpdateMetadata(CbObjectView Metadata)
{
- // Should this be additive rather than replacing the whole thing? We'll see.
m_Info.Metadata = CbObject::Clone(Metadata);
m_Info.UpdatedAt = DateTime::Now();
}
+ void SetEndedAt(DateTime When) { m_Info.EndedAt = When; }
+
+ void AppendLog(LogEntry Entry);
+ std::vector<LogEntry> GetLogEntries(uint32_t Limit = 0, uint32_t Offset = 0) const;
+ uint64_t GetLogCount() const;
+
+ /// Returns entries appended after the given cursor and the new cursor value.
+ /// A cursor of 0 returns all entries currently in the deque.
+ struct CursorResult
+ {
+ std::vector<LogEntry> Entries;
+ uint64_t Cursor; // new cursor for next poll
+ uint64_t Count; // current deque size
+ };
+ CursorResult GetLogEntriesAfter(uint64_t AfterCursor) const;
+
private:
- SessionInfo m_Info;
- Ref<SessionLog> m_Log;
+ SessionInfo m_Info;
+ Ref<SessionLog> m_Log;
+ mutable RwLock m_LogLock;
+ std::deque<LogEntry> m_LogEntries;
+ uint64_t m_TotalAppended = 0; // monotonically increasing counter
+
+ static constexpr uint32_t MaxLogEntries = 10000;
};
SessionsService();
~SessionsService();
- bool RegisterSession(const Oid& SessionId, std::string AppName, const Oid& JobId, CbObjectView Metadata);
- bool UpdateSession(const Oid& SessionId, CbObjectView Metadata);
- Ref<Session> GetSession(const Oid& SessionId) const;
+ bool RegisterSession(const Oid& SessionId, std::string AppName, std::string Mode, const Oid& JobId, CbObjectView Metadata);
+ bool UpdateSession(const Oid& SessionId, CbObjectView Metadata);
+ Ref<Session> GetSession(const Oid& SessionId) const;
std::vector<Ref<Session>> GetSessions() const;
+ std::vector<Ref<Session>> GetEndedSessions() const;
bool RemoveSession(const Oid& SessionId);
uint64_t GetSessionCount() const;
@@ -77,6 +109,7 @@ private:
LoggerRef m_Log;
mutable RwLock m_Lock;
tsl::robin_map<Oid, Ref<Session>, Oid::Hasher> m_Sessions;
+ std::vector<Ref<Session>> m_EndedSessions;
std::unique_ptr<SessionLogStore> m_SessionLogs;
};