// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { ////////////////////////////////////////////////////////////////////////// // // SessionLogSink — thin enqueuer that posts log messages to the // SessionsServiceClient worker thread via its BlockingQueue. // class SessionLogSink final : public logging::Sink { public: explicit SessionLogSink(BlockingQueue* Queue) : m_Queue(Queue) { SetLevel(logging::Info); } ~SessionLogSink() override = default; void Log(const logging::LogMessage& Msg) override { SessionsServiceClient::SessionCommand Cmd; Cmd.CommandType = SessionsServiceClient::SessionCommand::Type::Log; Cmd.LogLevel = Msg.GetLevel(); Cmd.LogMessage = CompactString(Msg.GetPayload()); m_Queue->Enqueue(std::move(Cmd)); } void Flush() override { SessionsServiceClient::SessionCommand Cmd; Cmd.CommandType = SessionsServiceClient::SessionCommand::Type::FlushLogs; m_Queue->Enqueue(std::move(Cmd)); } void SetFormatter(std::unique_ptr /*InFormatter*/) override { // No formatting needed - we send raw message text } private: BlockingQueue* m_Queue; }; ////////////////////////////////////////////////////////////////////////// // // SessionsServiceClient // SessionsServiceClient::SessionsServiceClient(Options Opts) : m_Log(logging::Get("sessionsclient")) , m_Options(std::move(Opts)) , m_SessionPath(fmt::format("/sessions/{}", m_Options.SessionId)) { // Strip trailing slash to avoid double-slash when appending paths like /sessions/{id} while (m_Options.TargetUrl.ends_with('/')) { m_Options.TargetUrl.pop_back(); } // Auto-detect the platform if the caller didn't set one explicitly. if (m_Options.Platform.empty()) { m_Options.Platform = std::string(GetRuntimePlatformName()); } // Auto-fill ClientPid when we can reasonably assume the target is on the // same machine. The server ALSO defensively gates pid acceptance on // IsLocalMachineRequest(), so sending a pid for a non-local URL doesn't // cause false positives — this heuristic just avoids the redundant send. if (m_Options.ClientPid == 0) { const bool IsUnixSocket = !m_Options.ClientSettings.UnixSocketPath.empty(); const bool LooksLocal = IsUnixSocket || m_Options.TargetUrl.find("localhost") != std::string::npos || m_Options.TargetUrl.find("127.0.0.1") != std::string::npos; if (LooksLocal) { m_Options.ClientPid = static_cast(GetCurrentProcessId()); } } m_WorkerThread = std::thread([this]() { zen::SetCurrentThreadName("SessionIO"); WorkerLoop(); }); } SessionsServiceClient::~SessionsServiceClient() { SessionCommand ShutdownCmd; ShutdownCmd.CommandType = SessionCommand::Type::Shutdown; m_Queue.Enqueue(std::move(ShutdownCmd)); m_Queue.CompleteAdding(); if (m_WorkerThread.joinable()) { m_WorkerThread.join(); } } CbObject SessionsServiceClient::BuildRequestBody(CbObjectView Metadata) const { CbObjectWriter Writer; Writer << "appname" << m_Options.AppName; if (!m_Options.Mode.empty()) { Writer << "mode" << m_Options.Mode; } if (!m_Options.Platform.empty()) { Writer << "platform" << m_Options.Platform; } if (m_Options.ClientPid != 0) { Writer << "pid" << m_Options.ClientPid; } if (m_Options.ParentSessionId != Oid::Zero) { Writer << "parent_session_id" << m_Options.ParentSessionId; } if (m_Options.JobId != Oid::Zero) { Writer << "jobid" << m_Options.JobId; } if (Metadata) { Writer.AddObject("metadata", Metadata); } return Writer.Save(); } ////////////////////////////////////////////////////////////////////////// // Public API — non-blocking enqueuers void SessionsServiceClient::Announce(CbObjectView Metadata) { SessionCommand Cmd; Cmd.CommandType = SessionCommand::Type::Announce; if (Metadata) { Cmd.Metadata = CbObject::Clone(Metadata); } m_Queue.Enqueue(std::move(Cmd)); } void SessionsServiceClient::UpdateMetadata(CbObjectView Metadata) { SessionCommand Cmd; Cmd.CommandType = SessionCommand::Type::UpdateMetadata; if (Metadata) { Cmd.Metadata = CbObject::Clone(Metadata); } m_Queue.Enqueue(std::move(Cmd)); } void SessionsServiceClient::Remove() { SessionCommand Cmd; Cmd.CommandType = SessionCommand::Type::Remove; m_Queue.Enqueue(std::move(Cmd)); } logging::SinkPtr SessionsServiceClient::CreateLogSink() { return Ref(new SessionLogSink(&m_Queue)); } ////////////////////////////////////////////////////////////////////////// // Worker thread — processes all session HTTP I/O void SessionsServiceClient::DoAnnounce(HttpClient& Http, CbObjectView Metadata) { try { CbObject Body = BuildRequestBody(Metadata); HttpClient::Response Result = Http.Post(m_SessionPath, std::move(Body)); if (Result.Error) { ZEN_WARN("sessions announce failed for '{}': HTTP error {} - {}", m_Options.TargetUrl, static_cast(Result.Error->ErrorCode), Result.Error->ErrorMessage); return; } if (!IsHttpOk(Result.StatusCode)) { ZEN_WARN("sessions announce failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast(Result.StatusCode)); return; } ZEN_DEBUG("session announced to '{}'", m_Options.TargetUrl); } catch (const std::exception& Ex) { ZEN_WARN("sessions announce failed for '{}': {}", m_Options.TargetUrl, Ex.what()); } } void SessionsServiceClient::DoUpdateMetadata(HttpClient& Http, CbObjectView Metadata) { try { CbObject Body = BuildRequestBody(Metadata); MemoryView View = Body.GetView(); IoBuffer Payload = IoBufferBuilder::MakeCloneFromMemory(View, ZenContentType::kCbObject); HttpClient::Response Result = Http.Put(m_SessionPath, Payload); if (Result.Error) { ZEN_WARN("sessions update failed for '{}': HTTP error {} - {}", m_Options.TargetUrl, static_cast(Result.Error->ErrorCode), Result.Error->ErrorMessage); return; } if (!IsHttpOk(Result.StatusCode)) { ZEN_WARN("sessions update failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast(Result.StatusCode)); return; } } catch (const std::exception& Ex) { ZEN_WARN("sessions update failed for '{}': {}", m_Options.TargetUrl, Ex.what()); } } void SessionsServiceClient::DoRemove(HttpClient& Http) { try { HttpClient::Response Result = Http.Delete(m_SessionPath); if (Result.Error) { ZEN_WARN("sessions remove failed for '{}': HTTP error {} - {}", m_Options.TargetUrl, static_cast(Result.Error->ErrorCode), Result.Error->ErrorMessage); return; } if (!IsHttpOk(Result.StatusCode)) { ZEN_WARN("sessions remove failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast(Result.StatusCode)); return; } ZEN_DEBUG("session removed from '{}'", m_Options.TargetUrl); } catch (const std::exception& Ex) { ZEN_WARN("sessions remove failed for '{}': {}", m_Options.TargetUrl, Ex.what()); } } void SessionsServiceClient::SendLogBatch(HttpClient& Http, const std::string& LogPath, const std::vector& Batch) { try { CbObjectWriter Writer; Writer.BeginArray("entries"); for (const SessionCommand& Entry : Batch) { Writer.BeginObject(); Writer << "level" << static_cast(Entry.LogLevel); Writer << "message" << Entry.LogMessage.c_str(); Writer.EndObject(); } Writer.EndArray(); HttpClient::Response Result = Http.Post(LogPath, Writer.Save()); (void)Result; // Best-effort } catch (const std::exception&) { // Best-effort — silently discard on failure } } void SessionsServiceClient::WorkerLoop() { HttpClientSettings Settings = m_Options.ClientSettings; Settings.ConnectTimeout = std::chrono::milliseconds(3000); Settings.Timeout = std::chrono::milliseconds(5000); HttpClient Http(m_Options.TargetUrl, Settings); std::string LogPath = m_SessionPath + "/log"; bool Removed = false; static constexpr size_t BatchSize = 50; std::vector LogBatch; LogBatch.reserve(BatchSize); auto FlushLogBatch = [&]() { if (!LogBatch.empty()) { SendLogBatch(Http, LogPath, LogBatch); LogBatch.clear(); } }; // Returns false to signal loop exit (Shutdown received) auto ProcessCommand = [&](SessionCommand& Cmd) -> bool { switch (Cmd.CommandType) { case SessionCommand::Type::Log: LogBatch.push_back(std::move(Cmd)); if (LogBatch.size() >= BatchSize) { FlushLogBatch(); } return true; case SessionCommand::Type::FlushLogs: FlushLogBatch(); return true; case SessionCommand::Type::Announce: FlushLogBatch(); DoAnnounce(Http, Cmd.Metadata); return true; case SessionCommand::Type::UpdateMetadata: FlushLogBatch(); DoUpdateMetadata(Http, Cmd.Metadata); return true; case SessionCommand::Type::Remove: FlushLogBatch(); if (!Removed) { Removed = true; DoRemove(Http); } return true; case SessionCommand::Type::Shutdown: { // Drain remaining log entries from the queue SessionCommand Remaining; while (m_Queue.WaitAndDequeue(Remaining)) { if (Remaining.CommandType == SessionCommand::Type::Log) { LogBatch.push_back(std::move(Remaining)); } } FlushLogBatch(); if (!Removed) { Removed = true; DoRemove(Http); } return false; } } return true; }; SessionCommand Cmd; while (m_Queue.WaitAndDequeue(Cmd)) { if (!ProcessCommand(Cmd)) { return; } // Drain additional queued entries without blocking (batching optimization) while (LogBatch.size() < BatchSize && m_Queue.Size() > 0) { SessionCommand Extra; if (m_Queue.WaitAndDequeue(Extra)) { if (!ProcessCommand(Extra)) { return; } } } FlushLogBatch(); } } } // namespace zen