diff options
Diffstat (limited to 'src/zenutil/sessionsclient.cpp')
| -rw-r--r-- | src/zenutil/sessionsclient.cpp | 377 |
1 files changed, 377 insertions, 0 deletions
diff --git a/src/zenutil/sessionsclient.cpp b/src/zenutil/sessionsclient.cpp new file mode 100644 index 000000000..c62cc4099 --- /dev/null +++ b/src/zenutil/sessionsclient.cpp @@ -0,0 +1,377 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/sessionsclient.h> + +#include <zencore/blockingqueue.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> +#include <zencore/logging/logmsg.h> +#include <zencore/thread.h> +#include <zenhttp/httpclient.h> + +#include <thread> +#include <vector> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +////////////////////////////////////////////////////////////////////////// +// +// SessionLogSink — batching log sink that forwards to /sessions/{id}/log +// + +static const char* +LogLevelToString(logging::LogLevel Level) +{ + switch (Level) + { + case logging::Trace: + return "trace"; + case logging::Debug: + return "debug"; + case logging::Info: + return "info"; + case logging::Warn: + return "warn"; + case logging::Err: + return "error"; + case logging::Critical: + return "critical"; + default: + return "info"; + } +} + +struct BufferedLogEntry +{ + enum class Type : uint8_t + { + Log, + Flush, + Shutdown + }; + + Type Type = Type::Log; + std::string Level; + std::string Message; +}; + +class SessionLogSink final : public logging::Sink +{ +public: + SessionLogSink(std::string TargetUrl, std::string LogPath) : m_LogPath(std::move(LogPath)) + { + HttpClientSettings Settings; + Settings.ConnectTimeout = std::chrono::milliseconds(3000); + m_Http = std::make_unique<HttpClient>(std::move(TargetUrl), Settings); + + SetLevel(logging::Info); + + m_WorkerThread = std::thread([this]() { + zen::SetCurrentThreadName("SessionLog"); + WorkerLoop(); + }); + } + + ~SessionLogSink() override + { + BufferedLogEntry ShutdownMsg; + ShutdownMsg.Type = BufferedLogEntry::Type::Shutdown; + m_Queue.Enqueue(std::move(ShutdownMsg)); + + if (m_WorkerThread.joinable()) + { + m_WorkerThread.join(); + } + } + + void Log(const logging::LogMessage& Msg) override + { + BufferedLogEntry Entry; + Entry.Type = BufferedLogEntry::Type::Log; + Entry.Level = LogLevelToString(Msg.GetLevel()); + Entry.Message = std::string(Msg.GetPayload()); + m_Queue.Enqueue(std::move(Entry)); + } + + void Flush() override + { + // Best-effort: enqueue a flush marker so the worker sends any pending entries + BufferedLogEntry FlushMsg; + FlushMsg.Type = BufferedLogEntry::Type::Flush; + m_Queue.Enqueue(std::move(FlushMsg)); + } + + void SetFormatter(std::unique_ptr<logging::Formatter> /*InFormatter*/) override + { + // No formatting needed — we send raw message text + } + +private: + static constexpr size_t BatchSize = 50; + + void WorkerLoop() + { + std::vector<BufferedLogEntry> Batch; + Batch.reserve(BatchSize); + + BufferedLogEntry Msg; + while (m_Queue.WaitAndDequeue(Msg)) + { + if (Msg.Type == BufferedLogEntry::Type::Shutdown) + { + // Drain remaining log entries + BufferedLogEntry Remaining; + while (m_Queue.WaitAndDequeue(Remaining)) + { + if (Remaining.Type == BufferedLogEntry::Type::Log) + { + Batch.push_back(std::move(Remaining)); + } + } + if (!Batch.empty()) + { + SendBatch(Batch); + } + return; + } + + if (Msg.Type == BufferedLogEntry::Type::Flush) + { + if (!Batch.empty()) + { + SendBatch(Batch); + Batch.clear(); + } + continue; + } + + // Log entry + Batch.push_back(std::move(Msg)); + + if (Batch.size() >= BatchSize) + { + SendBatch(Batch); + Batch.clear(); + } + else + { + // Drain any additional queued entries without blocking + while (Batch.size() < BatchSize && m_Queue.Size() > 0) + { + BufferedLogEntry Extra; + if (m_Queue.WaitAndDequeue(Extra)) + { + if (Extra.Type == BufferedLogEntry::Type::Shutdown) + { + if (!Batch.empty()) + { + SendBatch(Batch); + } + // Drain remaining + while (m_Queue.WaitAndDequeue(Extra)) + { + if (Extra.Type == BufferedLogEntry::Type::Log) + { + Batch.push_back(std::move(Extra)); + } + } + if (!Batch.empty()) + { + SendBatch(Batch); + } + return; + } + if (Extra.Type == BufferedLogEntry::Type::Log) + { + Batch.push_back(std::move(Extra)); + } + else if (Extra.Type == BufferedLogEntry::Type::Flush) + { + break; + } + } + } + + if (!Batch.empty()) + { + SendBatch(Batch); + Batch.clear(); + } + } + } + } + + void SendBatch(const std::vector<BufferedLogEntry>& Batch) + { + try + { + CbObjectWriter Writer; + Writer.BeginArray("entries"); + for (const BufferedLogEntry& Entry : Batch) + { + Writer.BeginObject(); + Writer << "level" << Entry.Level; + Writer << "message" << Entry.Message; + Writer.EndObject(); + } + Writer.EndArray(); + + HttpClient::Response Result = m_Http->Post(m_LogPath, Writer.Save()); + (void)Result; // Best-effort + } + catch (const std::exception&) + { + // Best-effort — silently discard on failure + } + } + + std::string m_LogPath; + std::unique_ptr<HttpClient> m_Http; + BlockingQueue<BufferedLogEntry> m_Queue; + std::thread m_WorkerThread; +}; + +SessionsServiceClient::SessionsServiceClient(Options Opts) +: m_Log(logging::Get("sessionsclient")) +, m_Options(std::move(Opts)) +, m_SessionPath(fmt::format("sessions/{}", m_Options.SessionId)) +{ + HttpClientSettings Settings; + Settings.ConnectTimeout = std::chrono::milliseconds(3000); + m_Http = std::make_unique<HttpClient>(m_Options.TargetUrl, Settings); +} + +SessionsServiceClient::~SessionsServiceClient() = default; + +CbObject +SessionsServiceClient::BuildRequestBody(CbObjectView Metadata) const +{ + CbObjectWriter Writer; + Writer << "appname" << m_Options.AppName; + if (!m_Options.Mode.empty()) + { + Writer << "mode" << m_Options.Mode; + } + if (m_Options.JobId != Oid::Zero) + { + Writer << "jobid" << m_Options.JobId; + } + if (Metadata.GetSize() > 0) + { + Writer.AddObject("metadata", Metadata); + } + return Writer.Save(); +} + +bool +SessionsServiceClient::Announce(CbObjectView Metadata) +{ + try + { + CbObject Body = BuildRequestBody(Metadata); + + HttpClient::Response Result = m_Http->Post(m_SessionPath, std::move(Body)); + + if (Result.Error) + { + ZEN_WARN("sessions announce failed for '{}': HTTP error {} - {}", + m_Options.TargetUrl, + static_cast<int>(Result.Error->ErrorCode), + Result.Error->ErrorMessage); + return false; + } + if (!IsHttpOk(Result.StatusCode)) + { + ZEN_WARN("sessions announce failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode)); + return false; + } + + ZEN_INFO("session announced to '{}'", m_Options.TargetUrl); + return true; + } + catch (const std::exception& Ex) + { + ZEN_WARN("sessions announce failed for '{}': {}", m_Options.TargetUrl, Ex.what()); + return false; + } +} + +bool +SessionsServiceClient::UpdateMetadata(CbObjectView Metadata) +{ + try + { + CbObject Body = BuildRequestBody(Metadata); + + MemoryView View = Body.GetView(); + IoBuffer Payload = IoBufferBuilder::MakeCloneFromMemory(View, ZenContentType::kCbObject); + + HttpClient::Response Result = m_Http->Put(m_SessionPath, Payload); + + if (Result.Error) + { + ZEN_WARN("sessions update failed for '{}': HTTP error {} - {}", + m_Options.TargetUrl, + static_cast<int>(Result.Error->ErrorCode), + Result.Error->ErrorMessage); + return false; + } + if (!IsHttpOk(Result.StatusCode)) + { + ZEN_WARN("sessions update failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode)); + return false; + } + + return true; + } + catch (const std::exception& Ex) + { + ZEN_WARN("sessions update failed for '{}': {}", m_Options.TargetUrl, Ex.what()); + return false; + } +} + +bool +SessionsServiceClient::Remove() +{ + try + { + HttpClient::Response Result = m_Http->Delete(m_SessionPath); + + if (Result.Error) + { + ZEN_WARN("sessions remove failed for '{}': HTTP error {} - {}", + m_Options.TargetUrl, + static_cast<int>(Result.Error->ErrorCode), + Result.Error->ErrorMessage); + return false; + } + if (!IsHttpOk(Result.StatusCode)) + { + ZEN_WARN("sessions remove failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode)); + return false; + } + + ZEN_INFO("session removed from '{}'", m_Options.TargetUrl); + return true; + } + catch (const std::exception& Ex) + { + ZEN_WARN("sessions remove failed for '{}': {}", m_Options.TargetUrl, Ex.what()); + return false; + } +} + +logging::SinkPtr +SessionsServiceClient::CreateLogSink() +{ + std::string LogPath = m_SessionPath + "/log"; + return Ref(new SessionLogSink(m_Options.TargetUrl, std::move(LogPath))); +} + +} // namespace zen |