diff options
Diffstat (limited to 'src/zenutil/sessionsclient.cpp')
| -rw-r--r-- | src/zenutil/sessionsclient.cpp | 460 |
1 files changed, 232 insertions, 228 deletions
diff --git a/src/zenutil/sessionsclient.cpp b/src/zenutil/sessionsclient.cpp index c62cc4099..6ba997a62 100644 --- a/src/zenutil/sessionsclient.cpp +++ b/src/zenutil/sessionsclient.cpp @@ -2,15 +2,12 @@ #include <zenutil/sessionsclient.h> -#include <zencore/blockingqueue.h> #include <zencore/compactbinarybuilder.h> #include <zencore/fmtutils.h> #include <zencore/iobuffer.h> #include <zencore/logging/logmsg.h> #include <zencore/thread.h> -#include <zenhttp/httpclient.h> -#include <thread> #include <vector> ZEN_THIRD_PARTY_INCLUDES_START @@ -21,232 +18,76 @@ namespace zen { ////////////////////////////////////////////////////////////////////////// // -// SessionLogSink — batching log sink that forwards to /sessions/{id}/log +// SessionLogSink — thin enqueuer that posts log messages to the +// SessionsServiceClient worker thread via its BlockingQueue. // -static const char* -LogLevelToString(logging::LogLevel Level) -{ - switch (Level) - { - case logging::Trace: - return "trace"; - case logging::Debug: - return "debug"; - case logging::Info: - return "info"; - case logging::Warn: - return "warn"; - case logging::Err: - return "error"; - case logging::Critical: - return "critical"; - default: - return "info"; - } -} - -struct BufferedLogEntry -{ - enum class Type : uint8_t - { - Log, - Flush, - Shutdown - }; - - Type Type = Type::Log; - std::string Level; - std::string Message; -}; - class SessionLogSink final : public logging::Sink { public: - SessionLogSink(std::string TargetUrl, std::string LogPath) : m_LogPath(std::move(LogPath)) - { - HttpClientSettings Settings; - Settings.ConnectTimeout = std::chrono::milliseconds(3000); - m_Http = std::make_unique<HttpClient>(std::move(TargetUrl), Settings); - - SetLevel(logging::Info); - - m_WorkerThread = std::thread([this]() { - zen::SetCurrentThreadName("SessionLog"); - WorkerLoop(); - }); - } + explicit SessionLogSink(BlockingQueue<SessionsServiceClient::SessionCommand>* Queue) : m_Queue(Queue) { SetLevel(logging::Info); } - ~SessionLogSink() override - { - BufferedLogEntry ShutdownMsg; - ShutdownMsg.Type = BufferedLogEntry::Type::Shutdown; - m_Queue.Enqueue(std::move(ShutdownMsg)); - - if (m_WorkerThread.joinable()) - { - m_WorkerThread.join(); - } - } + ~SessionLogSink() override = default; void Log(const logging::LogMessage& Msg) override { - BufferedLogEntry Entry; - Entry.Type = BufferedLogEntry::Type::Log; - Entry.Level = LogLevelToString(Msg.GetLevel()); - Entry.Message = std::string(Msg.GetPayload()); - m_Queue.Enqueue(std::move(Entry)); + SessionsServiceClient::SessionCommand Cmd; + Cmd.CommandType = SessionsServiceClient::SessionCommand::Type::Log; + Cmd.LogLevel = Msg.GetLevel(); + Cmd.LogMessage = CompactString(Msg.GetPayload()); + m_Queue->Enqueue(std::move(Cmd)); } void Flush() override { - // Best-effort: enqueue a flush marker so the worker sends any pending entries - BufferedLogEntry FlushMsg; - FlushMsg.Type = BufferedLogEntry::Type::Flush; - m_Queue.Enqueue(std::move(FlushMsg)); + SessionsServiceClient::SessionCommand Cmd; + Cmd.CommandType = SessionsServiceClient::SessionCommand::Type::FlushLogs; + m_Queue->Enqueue(std::move(Cmd)); } void SetFormatter(std::unique_ptr<logging::Formatter> /*InFormatter*/) override { - // No formatting needed — we send raw message text + // No formatting needed - we send raw message text } private: - static constexpr size_t BatchSize = 50; - - void WorkerLoop() - { - std::vector<BufferedLogEntry> Batch; - Batch.reserve(BatchSize); - - BufferedLogEntry Msg; - while (m_Queue.WaitAndDequeue(Msg)) - { - if (Msg.Type == BufferedLogEntry::Type::Shutdown) - { - // Drain remaining log entries - BufferedLogEntry Remaining; - while (m_Queue.WaitAndDequeue(Remaining)) - { - if (Remaining.Type == BufferedLogEntry::Type::Log) - { - Batch.push_back(std::move(Remaining)); - } - } - if (!Batch.empty()) - { - SendBatch(Batch); - } - return; - } - - if (Msg.Type == BufferedLogEntry::Type::Flush) - { - if (!Batch.empty()) - { - SendBatch(Batch); - Batch.clear(); - } - continue; - } - - // Log entry - Batch.push_back(std::move(Msg)); - - if (Batch.size() >= BatchSize) - { - SendBatch(Batch); - Batch.clear(); - } - else - { - // Drain any additional queued entries without blocking - while (Batch.size() < BatchSize && m_Queue.Size() > 0) - { - BufferedLogEntry Extra; - if (m_Queue.WaitAndDequeue(Extra)) - { - if (Extra.Type == BufferedLogEntry::Type::Shutdown) - { - if (!Batch.empty()) - { - SendBatch(Batch); - } - // Drain remaining - while (m_Queue.WaitAndDequeue(Extra)) - { - if (Extra.Type == BufferedLogEntry::Type::Log) - { - Batch.push_back(std::move(Extra)); - } - } - if (!Batch.empty()) - { - SendBatch(Batch); - } - return; - } - if (Extra.Type == BufferedLogEntry::Type::Log) - { - Batch.push_back(std::move(Extra)); - } - else if (Extra.Type == BufferedLogEntry::Type::Flush) - { - break; - } - } - } - - if (!Batch.empty()) - { - SendBatch(Batch); - Batch.clear(); - } - } - } - } - - void SendBatch(const std::vector<BufferedLogEntry>& Batch) - { - try - { - CbObjectWriter Writer; - Writer.BeginArray("entries"); - for (const BufferedLogEntry& Entry : Batch) - { - Writer.BeginObject(); - Writer << "level" << Entry.Level; - Writer << "message" << Entry.Message; - Writer.EndObject(); - } - Writer.EndArray(); - - HttpClient::Response Result = m_Http->Post(m_LogPath, Writer.Save()); - (void)Result; // Best-effort - } - catch (const std::exception&) - { - // Best-effort — silently discard on failure - } - } - - std::string m_LogPath; - std::unique_ptr<HttpClient> m_Http; - BlockingQueue<BufferedLogEntry> m_Queue; - std::thread m_WorkerThread; + BlockingQueue<SessionsServiceClient::SessionCommand>* m_Queue; }; +////////////////////////////////////////////////////////////////////////// +// +// SessionsServiceClient +// + SessionsServiceClient::SessionsServiceClient(Options Opts) : m_Log(logging::Get("sessionsclient")) , m_Options(std::move(Opts)) -, m_SessionPath(fmt::format("sessions/{}", m_Options.SessionId)) +, m_SessionPath(fmt::format("/sessions/{}", m_Options.SessionId)) { - HttpClientSettings Settings; - Settings.ConnectTimeout = std::chrono::milliseconds(3000); - m_Http = std::make_unique<HttpClient>(m_Options.TargetUrl, Settings); + // Strip trailing slash to avoid double-slash when appending paths like /sessions/{id} + while (m_Options.TargetUrl.ends_with('/')) + { + m_Options.TargetUrl.pop_back(); + } + + m_WorkerThread = std::thread([this]() { + zen::SetCurrentThreadName("SessionIO"); + WorkerLoop(); + }); } -SessionsServiceClient::~SessionsServiceClient() = default; +SessionsServiceClient::~SessionsServiceClient() +{ + SessionCommand ShutdownCmd; + ShutdownCmd.CommandType = SessionCommand::Type::Shutdown; + m_Queue.Enqueue(std::move(ShutdownCmd)); + m_Queue.CompleteAdding(); + + if (m_WorkerThread.joinable()) + { + m_WorkerThread.join(); + } +} CbObject SessionsServiceClient::BuildRequestBody(CbObjectView Metadata) const @@ -261,21 +102,65 @@ SessionsServiceClient::BuildRequestBody(CbObjectView Metadata) const { Writer << "jobid" << m_Options.JobId; } - if (Metadata.GetSize() > 0) + if (Metadata) { Writer.AddObject("metadata", Metadata); } return Writer.Save(); } -bool +////////////////////////////////////////////////////////////////////////// +// Public API — non-blocking enqueuers + +void SessionsServiceClient::Announce(CbObjectView Metadata) { + SessionCommand Cmd; + Cmd.CommandType = SessionCommand::Type::Announce; + if (Metadata) + { + Cmd.Metadata = CbObject::Clone(Metadata); + } + m_Queue.Enqueue(std::move(Cmd)); +} + +void +SessionsServiceClient::UpdateMetadata(CbObjectView Metadata) +{ + SessionCommand Cmd; + Cmd.CommandType = SessionCommand::Type::UpdateMetadata; + if (Metadata) + { + Cmd.Metadata = CbObject::Clone(Metadata); + } + m_Queue.Enqueue(std::move(Cmd)); +} + +void +SessionsServiceClient::Remove() +{ + SessionCommand Cmd; + Cmd.CommandType = SessionCommand::Type::Remove; + m_Queue.Enqueue(std::move(Cmd)); +} + +logging::SinkPtr +SessionsServiceClient::CreateLogSink() +{ + return Ref(new SessionLogSink(&m_Queue)); +} + +////////////////////////////////////////////////////////////////////////// +// Worker thread — processes all session HTTP I/O + +void +SessionsServiceClient::DoAnnounce(HttpClient& Http, CbObjectView Metadata) +{ try { CbObject Body = BuildRequestBody(Metadata); - HttpClient::Response Result = m_Http->Post(m_SessionPath, std::move(Body)); + HttpClient::Response Result = Http.Post(m_SessionPath, std::move(Body)); if (Result.Error) { @@ -283,26 +168,24 @@ SessionsServiceClient::Announce(CbObjectView Metadata) m_Options.TargetUrl, static_cast<int>(Result.Error->ErrorCode), Result.Error->ErrorMessage); - return false; + return; } if (!IsHttpOk(Result.StatusCode)) { ZEN_WARN("sessions announce failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode)); - return false; + return; } - ZEN_INFO("session announced to '{}'", m_Options.TargetUrl); - return true; + ZEN_DEBUG("session announced to '{}'", m_Options.TargetUrl); } catch (const std::exception& Ex) { ZEN_WARN("sessions announce failed for '{}': {}", m_Options.TargetUrl, Ex.what()); - return false; } } -bool -SessionsServiceClient::UpdateMetadata(CbObjectView Metadata) +void +SessionsServiceClient::DoUpdateMetadata(HttpClient& Http, CbObjectView Metadata) { try { @@ -311,7 +194,7 @@ SessionsServiceClient::UpdateMetadata(CbObjectView Metadata) MemoryView View = Body.GetView(); IoBuffer Payload = IoBufferBuilder::MakeCloneFromMemory(View, ZenContentType::kCbObject); - HttpClient::Response Result = m_Http->Put(m_SessionPath, Payload); + HttpClient::Response Result = Http.Put(m_SessionPath, Payload); if (Result.Error) { @@ -319,29 +202,26 @@ SessionsServiceClient::UpdateMetadata(CbObjectView Metadata) m_Options.TargetUrl, static_cast<int>(Result.Error->ErrorCode), Result.Error->ErrorMessage); - return false; + return; } if (!IsHttpOk(Result.StatusCode)) { ZEN_WARN("sessions update failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode)); - return false; + return; } - - return true; } catch (const std::exception& Ex) { ZEN_WARN("sessions update failed for '{}': {}", m_Options.TargetUrl, Ex.what()); - return false; } } -bool -SessionsServiceClient::Remove() +void +SessionsServiceClient::DoRemove(HttpClient& Http) { try { - HttpClient::Response Result = m_Http->Delete(m_SessionPath); + HttpClient::Response Result = Http.Delete(m_SessionPath); if (Result.Error) { @@ -349,29 +229,153 @@ SessionsServiceClient::Remove() m_Options.TargetUrl, static_cast<int>(Result.Error->ErrorCode), Result.Error->ErrorMessage); - return false; + return; } if (!IsHttpOk(Result.StatusCode)) { ZEN_WARN("sessions remove failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode)); - return false; + return; } - ZEN_INFO("session removed from '{}'", m_Options.TargetUrl); - return true; + ZEN_DEBUG("session removed from '{}'", m_Options.TargetUrl); } catch (const std::exception& Ex) { ZEN_WARN("sessions remove failed for '{}': {}", m_Options.TargetUrl, Ex.what()); - return false; } } -logging::SinkPtr -SessionsServiceClient::CreateLogSink() +void +SessionsServiceClient::SendLogBatch(HttpClient& Http, const std::string& LogPath, const std::vector<SessionCommand>& Batch) { + try + { + CbObjectWriter Writer; + Writer.BeginArray("entries"); + for (const SessionCommand& Entry : Batch) + { + Writer.BeginObject(); + Writer << "level" << static_cast<int32_t>(Entry.LogLevel); + Writer << "message" << Entry.LogMessage.c_str(); + Writer.EndObject(); + } + Writer.EndArray(); + + HttpClient::Response Result = Http.Post(LogPath, Writer.Save()); + (void)Result; // Best-effort + } + catch (const std::exception&) + { + // Best-effort — silently discard on failure + } +} + +void +SessionsServiceClient::WorkerLoop() +{ + HttpClientSettings Settings = m_Options.ClientSettings; + Settings.ConnectTimeout = std::chrono::milliseconds(3000); + Settings.Timeout = std::chrono::milliseconds(5000); + HttpClient Http(m_Options.TargetUrl, Settings); + std::string LogPath = m_SessionPath + "/log"; - return Ref(new SessionLogSink(m_Options.TargetUrl, std::move(LogPath))); + bool Removed = false; + + static constexpr size_t BatchSize = 50; + + std::vector<SessionCommand> LogBatch; + LogBatch.reserve(BatchSize); + + auto FlushLogBatch = [&]() { + if (!LogBatch.empty()) + { + SendLogBatch(Http, LogPath, LogBatch); + LogBatch.clear(); + } + }; + + // Returns false to signal loop exit (Shutdown received) + auto ProcessCommand = [&](SessionCommand& Cmd) -> bool { + switch (Cmd.CommandType) + { + case SessionCommand::Type::Log: + LogBatch.push_back(std::move(Cmd)); + if (LogBatch.size() >= BatchSize) + { + FlushLogBatch(); + } + return true; + + case SessionCommand::Type::FlushLogs: + FlushLogBatch(); + return true; + + case SessionCommand::Type::Announce: + FlushLogBatch(); + DoAnnounce(Http, Cmd.Metadata); + return true; + + case SessionCommand::Type::UpdateMetadata: + FlushLogBatch(); + DoUpdateMetadata(Http, Cmd.Metadata); + return true; + + case SessionCommand::Type::Remove: + FlushLogBatch(); + if (!Removed) + { + Removed = true; + DoRemove(Http); + } + return true; + + case SessionCommand::Type::Shutdown: + { + // Drain remaining log entries from the queue + SessionCommand Remaining; + while (m_Queue.WaitAndDequeue(Remaining)) + { + if (Remaining.CommandType == SessionCommand::Type::Log) + { + LogBatch.push_back(std::move(Remaining)); + } + } + FlushLogBatch(); + + if (!Removed) + { + Removed = true; + DoRemove(Http); + } + return false; + } + } + return true; + }; + + SessionCommand Cmd; + while (m_Queue.WaitAndDequeue(Cmd)) + { + if (!ProcessCommand(Cmd)) + { + return; + } + + // Drain additional queued entries without blocking (batching optimization) + while (LogBatch.size() < BatchSize && m_Queue.Size() > 0) + { + SessionCommand Extra; + if (m_Queue.WaitAndDequeue(Extra)) + { + if (!ProcessCommand(Extra)) + { + return; + } + } + } + + FlushLogBatch(); + } } } // namespace zen |