// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { ////////////////////////////////////////////////////////////////////////// // // SessionLogSink — batching log sink that forwards to /sessions/{id}/log // static const char* LogLevelToString(logging::LogLevel Level) { switch (Level) { case logging::Trace: return "trace"; case logging::Debug: return "debug"; case logging::Info: return "info"; case logging::Warn: return "warn"; case logging::Err: return "error"; case logging::Critical: return "critical"; default: return "info"; } } struct BufferedLogEntry { enum class Type : uint8_t { Log, Flush, Shutdown }; Type Type = Type::Log; std::string Level; std::string Message; }; class SessionLogSink final : public logging::Sink { public: SessionLogSink(std::string TargetUrl, std::string LogPath) : m_LogPath(std::move(LogPath)) { HttpClientSettings Settings; Settings.ConnectTimeout = std::chrono::milliseconds(3000); m_Http = std::make_unique(std::move(TargetUrl), Settings); SetLevel(logging::Info); m_WorkerThread = std::thread([this]() { zen::SetCurrentThreadName("SessionLog"); WorkerLoop(); }); } ~SessionLogSink() override { BufferedLogEntry ShutdownMsg; ShutdownMsg.Type = BufferedLogEntry::Type::Shutdown; m_Queue.Enqueue(std::move(ShutdownMsg)); if (m_WorkerThread.joinable()) { m_WorkerThread.join(); } } void Log(const logging::LogMessage& Msg) override { BufferedLogEntry Entry; Entry.Type = BufferedLogEntry::Type::Log; Entry.Level = LogLevelToString(Msg.GetLevel()); Entry.Message = std::string(Msg.GetPayload()); m_Queue.Enqueue(std::move(Entry)); } void Flush() override { // Best-effort: enqueue a flush marker so the worker sends any pending entries BufferedLogEntry FlushMsg; FlushMsg.Type = BufferedLogEntry::Type::Flush; m_Queue.Enqueue(std::move(FlushMsg)); } void SetFormatter(std::unique_ptr /*InFormatter*/) override { // No formatting needed — we send raw message text } private: static constexpr size_t BatchSize = 50; void WorkerLoop() { std::vector Batch; Batch.reserve(BatchSize); BufferedLogEntry Msg; while (m_Queue.WaitAndDequeue(Msg)) { if (Msg.Type == BufferedLogEntry::Type::Shutdown) { // Drain remaining log entries BufferedLogEntry Remaining; while (m_Queue.WaitAndDequeue(Remaining)) { if (Remaining.Type == BufferedLogEntry::Type::Log) { Batch.push_back(std::move(Remaining)); } } if (!Batch.empty()) { SendBatch(Batch); } return; } if (Msg.Type == BufferedLogEntry::Type::Flush) { if (!Batch.empty()) { SendBatch(Batch); Batch.clear(); } continue; } // Log entry Batch.push_back(std::move(Msg)); if (Batch.size() >= BatchSize) { SendBatch(Batch); Batch.clear(); } else { // Drain any additional queued entries without blocking while (Batch.size() < BatchSize && m_Queue.Size() > 0) { BufferedLogEntry Extra; if (m_Queue.WaitAndDequeue(Extra)) { if (Extra.Type == BufferedLogEntry::Type::Shutdown) { if (!Batch.empty()) { SendBatch(Batch); } // Drain remaining while (m_Queue.WaitAndDequeue(Extra)) { if (Extra.Type == BufferedLogEntry::Type::Log) { Batch.push_back(std::move(Extra)); } } if (!Batch.empty()) { SendBatch(Batch); } return; } if (Extra.Type == BufferedLogEntry::Type::Log) { Batch.push_back(std::move(Extra)); } else if (Extra.Type == BufferedLogEntry::Type::Flush) { break; } } } if (!Batch.empty()) { SendBatch(Batch); Batch.clear(); } } } } void SendBatch(const std::vector& Batch) { try { CbObjectWriter Writer; Writer.BeginArray("entries"); for (const BufferedLogEntry& Entry : Batch) { Writer.BeginObject(); Writer << "level" << Entry.Level; Writer << "message" << Entry.Message; Writer.EndObject(); } Writer.EndArray(); HttpClient::Response Result = m_Http->Post(m_LogPath, Writer.Save()); (void)Result; // Best-effort } catch (const std::exception&) { // Best-effort — silently discard on failure } } std::string m_LogPath; std::unique_ptr m_Http; BlockingQueue m_Queue; std::thread m_WorkerThread; }; SessionsServiceClient::SessionsServiceClient(Options Opts) : m_Log(logging::Get("sessionsclient")) , m_Options(std::move(Opts)) , m_SessionPath(fmt::format("sessions/{}", m_Options.SessionId)) { HttpClientSettings Settings; Settings.ConnectTimeout = std::chrono::milliseconds(3000); m_Http = std::make_unique(m_Options.TargetUrl, Settings); } SessionsServiceClient::~SessionsServiceClient() = default; CbObject SessionsServiceClient::BuildRequestBody(CbObjectView Metadata) const { CbObjectWriter Writer; Writer << "appname" << m_Options.AppName; if (!m_Options.Mode.empty()) { Writer << "mode" << m_Options.Mode; } if (m_Options.JobId != Oid::Zero) { Writer << "jobid" << m_Options.JobId; } if (Metadata.GetSize() > 0) { Writer.AddObject("metadata", Metadata); } return Writer.Save(); } bool SessionsServiceClient::Announce(CbObjectView Metadata) { try { CbObject Body = BuildRequestBody(Metadata); HttpClient::Response Result = m_Http->Post(m_SessionPath, std::move(Body)); if (Result.Error) { ZEN_WARN("sessions announce failed for '{}': HTTP error {} - {}", m_Options.TargetUrl, static_cast(Result.Error->ErrorCode), Result.Error->ErrorMessage); return false; } if (!IsHttpOk(Result.StatusCode)) { ZEN_WARN("sessions announce failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast(Result.StatusCode)); return false; } ZEN_INFO("session announced to '{}'", m_Options.TargetUrl); return true; } catch (const std::exception& Ex) { ZEN_WARN("sessions announce failed for '{}': {}", m_Options.TargetUrl, Ex.what()); return false; } } bool SessionsServiceClient::UpdateMetadata(CbObjectView Metadata) { try { CbObject Body = BuildRequestBody(Metadata); MemoryView View = Body.GetView(); IoBuffer Payload = IoBufferBuilder::MakeCloneFromMemory(View, ZenContentType::kCbObject); HttpClient::Response Result = m_Http->Put(m_SessionPath, Payload); if (Result.Error) { ZEN_WARN("sessions update failed for '{}': HTTP error {} - {}", m_Options.TargetUrl, static_cast(Result.Error->ErrorCode), Result.Error->ErrorMessage); return false; } if (!IsHttpOk(Result.StatusCode)) { ZEN_WARN("sessions update failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast(Result.StatusCode)); return false; } return true; } catch (const std::exception& Ex) { ZEN_WARN("sessions update failed for '{}': {}", m_Options.TargetUrl, Ex.what()); return false; } } bool SessionsServiceClient::Remove() { try { HttpClient::Response Result = m_Http->Delete(m_SessionPath); if (Result.Error) { ZEN_WARN("sessions remove failed for '{}': HTTP error {} - {}", m_Options.TargetUrl, static_cast(Result.Error->ErrorCode), Result.Error->ErrorMessage); return false; } if (!IsHttpOk(Result.StatusCode)) { ZEN_WARN("sessions remove failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast(Result.StatusCode)); return false; } ZEN_INFO("session removed from '{}'", m_Options.TargetUrl); return true; } catch (const std::exception& Ex) { ZEN_WARN("sessions remove failed for '{}': {}", m_Options.TargetUrl, Ex.what()); return false; } } logging::SinkPtr SessionsServiceClient::CreateLogSink() { std::string LogPath = m_SessionPath + "/log"; return Ref(new SessionLogSink(m_Options.TargetUrl, std::move(LogPath))); } } // namespace zen