diff options
| author | Liam Mitchell <[email protected]> | 2026-03-09 19:06:36 -0700 |
|---|---|---|
| committer | Liam Mitchell <[email protected]> | 2026-03-09 19:06:36 -0700 |
| commit | d1abc50ee9d4fb72efc646e17decafea741caa34 (patch) | |
| tree | e4288e00f2f7ca0391b83d986efcb69d3ba66a83 /src/zenserver/sessions | |
| parent | Allow requests with invalid content-types unless specified in command line or... (diff) | |
| parent | updated chunk–block analyser (#818) (diff) | |
| download | zen-d1abc50ee9d4fb72efc646e17decafea741caa34.tar.xz zen-d1abc50ee9d4fb72efc646e17decafea741caa34.zip | |
Merge branch 'main' into lm/restrict-content-type
Diffstat (limited to 'src/zenserver/sessions')
| -rw-r--r-- | src/zenserver/sessions/httpsessions.cpp | 264 | ||||
| -rw-r--r-- | src/zenserver/sessions/httpsessions.h | 55 | ||||
| -rw-r--r-- | src/zenserver/sessions/sessions.cpp | 150 | ||||
| -rw-r--r-- | src/zenserver/sessions/sessions.h | 83 |
4 files changed, 552 insertions, 0 deletions
diff --git a/src/zenserver/sessions/httpsessions.cpp b/src/zenserver/sessions/httpsessions.cpp new file mode 100644 index 000000000..05be3c814 --- /dev/null +++ b/src/zenserver/sessions/httpsessions.cpp @@ -0,0 +1,264 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httpsessions.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/trace.h> +#include "sessions.h" + +namespace zen { +using namespace std::literals; + +HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService, HttpStatsService& StatsService, SessionsService& Sessions) +: m_Log(logging::Get("sessions")) +, m_StatusService(StatusService) +, m_StatsService(StatsService) +, m_Sessions(Sessions) +{ + Initialize(); +} + +HttpSessionsService::~HttpSessionsService() +{ + m_StatsService.UnregisterHandler("sessions", *this); + m_StatusService.UnregisterHandler("sessions", *this); +} + +const char* +HttpSessionsService::BaseUri() const +{ + return "/sessions/"; +} + +void +HttpSessionsService::HandleRequest(HttpServerRequest& Request) +{ + metrics::OperationTiming::Scope $(m_HttpRequests); + + if (m_Router.HandleRequest(Request) == false) + { + ZEN_WARN("No route found for {0}", Request.RelativeUri()); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Not found"sv); + } +} + +CbObject +HttpSessionsService::CollectStats() +{ + ZEN_TRACE_CPU("SessionsService::Stats"); + CbObjectWriter Cbo; + + EmitSnapshot("requests", m_HttpRequests, Cbo); + + Cbo.BeginObject("sessions"); + { + Cbo << "readcount" << m_SessionsStats.SessionReadCount; + Cbo << "writecount" << m_SessionsStats.SessionWriteCount; + Cbo << "deletecount" << m_SessionsStats.SessionDeleteCount; + Cbo << "listcount" << m_SessionsStats.SessionListCount; + Cbo << "requestcount" << m_SessionsStats.RequestCount; + Cbo << "badrequestcount" << m_SessionsStats.BadRequestCount; + Cbo << "count" << m_Sessions.GetSessionCount(); + } + Cbo.EndObject(); + + return Cbo.Save(); +} + +void +HttpSessionsService::HandleStatsRequest(HttpServerRequest& HttpReq) +{ + HttpReq.WriteResponse(HttpResponseCode::OK, CollectStats()); +} + +void +HttpSessionsService::HandleStatusRequest(HttpServerRequest& Request) +{ + ZEN_TRACE_CPU("HttpSessionsService::Status"); + CbObjectWriter Cbo; + Cbo << "ok" << true; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void +HttpSessionsService::Initialize() +{ + using namespace std::literals; + + ZEN_INFO("Initializing Sessions Service"); + + static constexpr AsciiSet ValidHexCharactersSet{"0123456789abcdefABCDEF"}; + + m_Router.AddMatcher("session_id", [](std::string_view Str) -> bool { + return Str.length() == Oid::StringLength && AsciiSet::HasOnly(Str, ValidHexCharactersSet); + }); + + m_Router.RegisterRoute( + "list", + [this](HttpRouterRequest& Req) { ListSessionsRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{session_id}", + [this](HttpRouterRequest& Req) { SessionRequest(Req); }, + HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kPut | HttpVerb::kDelete); + + m_Router.RegisterRoute( + "", + [this](HttpRouterRequest& Req) { ListSessionsRequest(Req); }, + HttpVerb::kGet); + + m_StatsService.RegisterHandler("sessions", *this); + m_StatusService.RegisterHandler("sessions", *this); +} + +static void +WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info) +{ + Writer << "id" << Info.Id; + if (!Info.AppName.empty()) + { + Writer << "appname" << Info.AppName; + } + if (Info.JobId != Oid::Zero) + { + Writer << "jobid" << Info.JobId; + } + Writer << "created_at" << Info.CreatedAt; + Writer << "updated_at" << Info.UpdatedAt; + + if (Info.Metadata.GetSize() > 0) + { + Writer.BeginObject("metadata"); + for (const CbField& Field : Info.Metadata) + { + Writer.AddField(Field); + } + Writer.EndObject(); + } +} + +void +HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req) +{ + HttpServerRequest& ServerRequest = Req.ServerRequest(); + + m_SessionsStats.SessionListCount++; + m_SessionsStats.RequestCount++; + + std::vector<Ref<SessionsService::Session>> Sessions = m_Sessions.GetSessions(); + + CbObjectWriter Response; + Response.BeginArray("sessions"); + for (const Ref<SessionsService::Session>& Session : Sessions) + { + Response.BeginObject(); + { + WriteSessionInfo(Response, Session->Info()); + } + Response.EndObject(); + } + Response.EndArray(); + + return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); +} + +void +HttpSessionsService::SessionRequest(HttpRouterRequest& Req) +{ + HttpServerRequest& ServerRequest = Req.ServerRequest(); + + const Oid SessionId = Oid::TryFromHexString(Req.GetCapture(1)); + if (SessionId == Oid::Zero) + { + m_SessionsStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid session id '{}'", Req.GetCapture(1))); + } + + m_SessionsStats.RequestCount++; + + switch (ServerRequest.RequestVerb()) + { + case HttpVerb::kPost: + case HttpVerb::kPut: + { + IoBuffer Payload = ServerRequest.ReadPayload(); + CbObject RequestObject; + + if (Payload.GetSize() > 0) + { + if (CbValidateError ValidationResult = ValidateCompactBinary(Payload.GetView(), CbValidateMode::All); + ValidationResult != CbValidateError::None) + { + m_SessionsStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid payload: {}", zen::ToString(ValidationResult))); + } + RequestObject = LoadCompactBinaryObject(Payload); + } + + if (ServerRequest.RequestVerb() == HttpVerb::kPost) + { + std::string AppName(RequestObject["appname"sv].AsString()); + Oid JobId = RequestObject["jobid"sv].AsObjectId(); + CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView(); + + m_SessionsStats.SessionWriteCount++; + if (m_Sessions.RegisterSession(SessionId, std::move(AppName), JobId, MetadataView)) + { + return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", SessionId)); + } + else + { + // Already exists - try update instead + if (m_Sessions.UpdateSession(SessionId, MetadataView)) + { + return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", SessionId)); + } + return ServerRequest.WriteResponse(HttpResponseCode::InternalServerError); + } + } + else + { + // PUT - update only + m_SessionsStats.SessionWriteCount++; + if (m_Sessions.UpdateSession(SessionId, RequestObject["metadata"sv].AsObjectView())) + { + return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", SessionId)); + } + return ServerRequest.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Session '{}' not found", SessionId)); + } + } + case HttpVerb::kGet: + { + m_SessionsStats.SessionReadCount++; + Ref<SessionsService::Session> Session = m_Sessions.GetSession(SessionId); + if (Session) + { + CbObjectWriter Response; + WriteSessionInfo(Response, Session->Info()); + return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); + } + return ServerRequest.WriteResponse(HttpResponseCode::NotFound); + } + case HttpVerb::kDelete: + { + m_SessionsStats.SessionDeleteCount++; + if (m_Sessions.RemoveSession(SessionId)) + { + return ServerRequest.WriteResponse(HttpResponseCode::OK); + } + return ServerRequest.WriteResponse(HttpResponseCode::NotFound); + } + } +} + +} // namespace zen diff --git a/src/zenserver/sessions/httpsessions.h b/src/zenserver/sessions/httpsessions.h new file mode 100644 index 000000000..e07f3b59b --- /dev/null +++ b/src/zenserver/sessions/httpsessions.h @@ -0,0 +1,55 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhttp/httpserver.h> +#include <zenhttp/httpstats.h> +#include <zenhttp/httpstatus.h> +#include <zentelemetry/stats.h> + +namespace zen { + +class SessionsService; + +class HttpSessionsService final : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider +{ +public: + HttpSessionsService(HttpStatusService& StatusService, HttpStatsService& StatsService, SessionsService& Sessions); + virtual ~HttpSessionsService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + + virtual CbObject CollectStats() override; + virtual void HandleStatsRequest(HttpServerRequest& Request) override; + virtual void HandleStatusRequest(HttpServerRequest& Request) override; + +private: + struct SessionsStats + { + std::atomic_uint64_t SessionReadCount{}; + std::atomic_uint64_t SessionWriteCount{}; + std::atomic_uint64_t SessionDeleteCount{}; + std::atomic_uint64_t SessionListCount{}; + std::atomic_uint64_t RequestCount{}; + std::atomic_uint64_t BadRequestCount{}; + }; + + inline LoggerRef Log() { return m_Log; } + + LoggerRef m_Log; + + void Initialize(); + + void ListSessionsRequest(HttpRouterRequest& Req); + void SessionRequest(HttpRouterRequest& Req); + + HttpStatusService& m_StatusService; + HttpStatsService& m_StatsService; + HttpRequestRouter m_Router; + SessionsService& m_Sessions; + SessionsStats m_SessionsStats; + metrics::OperationTiming m_HttpRequests; +}; + +} // namespace zen diff --git a/src/zenserver/sessions/sessions.cpp b/src/zenserver/sessions/sessions.cpp new file mode 100644 index 000000000..f73aa40ff --- /dev/null +++ b/src/zenserver/sessions/sessions.cpp @@ -0,0 +1,150 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "sessions.h" + +#include <zencore/basicfile.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> + +namespace zen { +using namespace std::literals; + +class SessionLog : public TRefCounted<SessionLog> +{ +public: + SessionLog(std::filesystem::path LogFilePath) { m_LogFile.Open(LogFilePath, BasicFile::Mode::kWrite); } + +private: + BasicFile m_LogFile; +}; + +class SessionLogStore +{ +public: + SessionLogStore(std::filesystem::path StoragePath) : m_StoragePath(std::move(StoragePath)) {} + + ~SessionLogStore() = default; + + Ref<SessionLog> GetLogForSession(const Oid& SessionId) + { + // For now, just return a new log for each session. We can implement actual log storage and retrieval later. + return Ref(new SessionLog(m_StoragePath / (SessionId.ToString() + ".log"))); + } + + Ref<SessionLog> CreateLogForSession(const Oid& SessionId) + { + // For now, just return a new log for each session. We can implement actual log storage and retrieval later. + return Ref(new SessionLog(m_StoragePath / (SessionId.ToString() + ".log"))); + } + +private: + std::filesystem::path m_StoragePath; +}; + +SessionsService::Session::Session(const SessionInfo& Info) : m_Info(Info) +{ +} +SessionsService::Session::~Session() = default; + +////////////////////////////////////////////////////////////////////////// + +SessionsService::SessionsService() : m_Log(logging::Get("sessions")) +{ +} + +SessionsService::~SessionsService() = default; + +bool +SessionsService::RegisterSession(const Oid& SessionId, std::string AppName, const Oid& JobId, CbObjectView Metadata) +{ + RwLock::ExclusiveLockScope Lock(m_Lock); + + if (m_Sessions.contains(SessionId)) + { + return false; + } + + const DateTime Now = DateTime::Now(); + m_Sessions.emplace(SessionId, + Ref(new Session(SessionInfo{.Id = SessionId, + .AppName = std::move(AppName), + .JobId = JobId, + .Metadata = CbObject::Clone(Metadata), + .CreatedAt = Now, + .UpdatedAt = Now}))); + + ZEN_INFO("Session {} registered (AppName: {}, JobId: {})", SessionId, AppName, JobId); + return true; +} + +bool +SessionsService::UpdateSession(const Oid& SessionId, CbObjectView Metadata) +{ + RwLock::ExclusiveLockScope Lock(m_Lock); + + auto It = m_Sessions.find(SessionId); + if (It == m_Sessions.end()) + { + return false; + } + + It.value()->UpdateMetadata(Metadata); + + const SessionInfo& Info = It.value()->Info(); + ZEN_DEBUG("Session {} updated (AppName: {}, JobId: {})", SessionId, Info.AppName, Info.JobId); + return true; +} + +Ref<SessionsService::Session> +SessionsService::GetSession(const Oid& SessionId) const +{ + RwLock::SharedLockScope Lock(m_Lock); + + auto It = m_Sessions.find(SessionId); + if (It == m_Sessions.end()) + { + return {}; + } + + return It->second; +} + +std::vector<Ref<SessionsService::Session>> +SessionsService::GetSessions() const +{ + RwLock::SharedLockScope Lock(m_Lock); + + std::vector<Ref<Session>> Result; + Result.reserve(m_Sessions.size()); + for (const auto& [Id, SessionRef] : m_Sessions) + { + Result.push_back(SessionRef); + } + return Result; +} + +bool +SessionsService::RemoveSession(const Oid& SessionId) +{ + RwLock::ExclusiveLockScope Lock(m_Lock); + + auto It = m_Sessions.find(SessionId); + if (It == m_Sessions.end()) + { + return false; + } + + ZEN_INFO("Session {} removed (AppName: {}, JobId: {})", SessionId, It.value()->Info().AppName, It.value()->Info().JobId); + + m_Sessions.erase(It); + return true; +} + +uint64_t +SessionsService::GetSessionCount() const +{ + RwLock::SharedLockScope Lock(m_Lock); + return m_Sessions.size(); +} + +} // namespace zen diff --git a/src/zenserver/sessions/sessions.h b/src/zenserver/sessions/sessions.h new file mode 100644 index 000000000..db9704430 --- /dev/null +++ b/src/zenserver/sessions/sessions.h @@ -0,0 +1,83 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compactbinary.h> +#include <zencore/logbase.h> +#include <zencore/thread.h> +#include <zencore/uid.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <optional> +#include <string> +#include <vector> + +namespace zen { + +class SessionLogStore; +class SessionLog; + +/** Session tracker + * + * Acts as a log and session info concentrator when dealing with multiple + * servers and external processes acting as a group. + */ + +class SessionsService +{ +public: + struct SessionInfo + { + Oid Id; + std::string AppName; + Oid JobId; + CbObject Metadata; + DateTime CreatedAt; + DateTime UpdatedAt; + }; + + class Session : public TRefCounted<Session> + { + public: + Session(const SessionInfo& Info); + ~Session(); + + Session(Session&&) = delete; + Session& operator=(Session&&) = delete; + + const SessionInfo& Info() const { return m_Info; } + void UpdateMetadata(CbObjectView Metadata) + { + // Should this be additive rather than replacing the whole thing? We'll see. + m_Info.Metadata = CbObject::Clone(Metadata); + m_Info.UpdatedAt = DateTime::Now(); + } + + private: + SessionInfo m_Info; + Ref<SessionLog> m_Log; + }; + + SessionsService(); + ~SessionsService(); + + bool RegisterSession(const Oid& SessionId, std::string AppName, const Oid& JobId, CbObjectView Metadata); + bool UpdateSession(const Oid& SessionId, CbObjectView Metadata); + Ref<Session> GetSession(const Oid& SessionId) const; + std::vector<Ref<Session>> GetSessions() const; + bool RemoveSession(const Oid& SessionId); + uint64_t GetSessionCount() const; + +private: + LoggerRef& Log() { return m_Log; } + + LoggerRef m_Log; + mutable RwLock m_Lock; + tsl::robin_map<Oid, Ref<Session>, Oid::Hasher> m_Sessions; + std::unique_ptr<SessionLogStore> m_SessionLogs; +}; + +} // namespace zen |