From 3d59b5d7036c35fe484d052ff32dbdc9d0a75cf7 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 13 Apr 2026 19:17:09 +0200 Subject: fix utf characters in source code (#953) --- src/zenutil/sessionsclient.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/zenutil/sessionsclient.cpp') diff --git a/src/zenutil/sessionsclient.cpp b/src/zenutil/sessionsclient.cpp index c62cc4099..df8853d20 100644 --- a/src/zenutil/sessionsclient.cpp +++ b/src/zenutil/sessionsclient.cpp @@ -21,7 +21,7 @@ namespace zen { ////////////////////////////////////////////////////////////////////////// // -// SessionLogSink — batching log sink that forwards to /sessions/{id}/log +// SessionLogSink - batching log sink that forwards to /sessions/{id}/log // static const char* @@ -108,7 +108,7 @@ public: void SetFormatter(std::unique_ptr /*InFormatter*/) override { - // No formatting needed — we send raw message text + // No formatting needed - we send raw message text } private: @@ -226,7 +226,7 @@ private: } catch (const std::exception&) { - // Best-effort — silently discard on failure + // Best-effort - silently discard on failure } } -- cgit v1.2.3 From 5a48e941b6f7e41ff6f0e86e6999f8b0a15d5c5b Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 15 Apr 2026 19:12:44 +0200 Subject: add sessions to hub and proxy (#960) * move session service to zenserver base class and make it available in all zenserver modes * fix deadlock in sessionsclient shutdown --- src/zenutil/sessionsclient.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src/zenutil/sessionsclient.cpp') diff --git a/src/zenutil/sessionsclient.cpp b/src/zenutil/sessionsclient.cpp index df8853d20..ec9c6177a 100644 --- a/src/zenutil/sessionsclient.cpp +++ b/src/zenutil/sessionsclient.cpp @@ -124,6 +124,9 @@ private: { if (Msg.Type == BufferedLogEntry::Type::Shutdown) { + // Mark complete so WaitAndDequeue returns false on empty queue + m_Queue.CompleteAdding(); + // Drain remaining log entries BufferedLogEntry Remaining; while (m_Queue.WaitAndDequeue(Remaining)) @@ -172,7 +175,7 @@ private: { SendBatch(Batch); } - // Drain remaining + m_Queue.CompleteAdding(); while (m_Queue.WaitAndDequeue(Extra)) { if (Extra.Type == BufferedLogEntry::Type::Log) -- cgit v1.2.3 From 27d72af24a8de9a81500e68a0874f1430297b3bc Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 20 Apr 2026 23:52:38 +0200 Subject: Zen CLI common server interface (#920) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces a common `ZenServiceClient` RAII wrapper for zen CLI commands that interact with a zenserver instance. CLI operations (admin, builds, cache, exec, hub, info, projectstore, trace, ui, version, vfs, workspaces) automatically register sessions so they become visible in the server's session list, and forward log output to the server's session log endpoint. All session HTTP I/O (announce, remove, log batches) runs on a single background worker thread, so CLI startup and shutdown never block on server availability. ### Key changes - **`ZenServiceClient`** — new RAII class that wraps host resolution, HTTP client creation, and session lifecycle (register on connect, remove on exit). Replaces ad-hoc boilerplate across all command files that talk to a server, including the new `trace` subcommands (`start`, `stop`, `status`). - **Async session I/O** — `SessionsServiceClient` now owns a single worker thread and command queue. `Announce()`, `Remove()`, and `UpdateMetadata()` enqueue commands and return immediately. The worker creates one `HttpClient` with a 5-second total timeout, bounding any individual request. Eliminates main-thread stalls when the server is unreachable. - **Session log forwarding** — `SessionLogSink` is a thin enqueuer that posts log messages to the same worker queue (no separate thread or HTTP client). Log levels are serialized as integers; the server-side ingest handles both string and integer formats for backwards compatibility, with bounds checking on integer values. - **Build & projectstore session registration** — Long-running `builds` and projectstore cache (oplog-download) connections register sessions too, making them visible alongside regular CLI command sessions. ### Cleanup - Extract `SetupCacheSession` helper on `StorageInstance` to reduce duplication. - Remove unused `HttpClient` reference in ui command. --- src/zenutil/sessionsclient.cpp | 461 +++++++++++++++++++++-------------------- 1 file changed, 231 insertions(+), 230 deletions(-) (limited to 'src/zenutil/sessionsclient.cpp') diff --git a/src/zenutil/sessionsclient.cpp b/src/zenutil/sessionsclient.cpp index ec9c6177a..6ba997a62 100644 --- a/src/zenutil/sessionsclient.cpp +++ b/src/zenutil/sessionsclient.cpp @@ -2,15 +2,12 @@ #include -#include #include #include #include #include #include -#include -#include #include ZEN_THIRD_PARTY_INCLUDES_START @@ -21,89 +18,31 @@ namespace zen { ////////////////////////////////////////////////////////////////////////// // -// SessionLogSink - batching log sink that forwards to /sessions/{id}/log +// SessionLogSink — thin enqueuer that posts log messages to the +// SessionsServiceClient worker thread via its BlockingQueue. // -static const char* -LogLevelToString(logging::LogLevel Level) -{ - switch (Level) - { - case logging::Trace: - return "trace"; - case logging::Debug: - return "debug"; - case logging::Info: - return "info"; - case logging::Warn: - return "warn"; - case logging::Err: - return "error"; - case logging::Critical: - return "critical"; - default: - return "info"; - } -} - -struct BufferedLogEntry -{ - enum class Type : uint8_t - { - Log, - Flush, - Shutdown - }; - - Type Type = Type::Log; - std::string Level; - std::string Message; -}; - class SessionLogSink final : public logging::Sink { public: - SessionLogSink(std::string TargetUrl, std::string LogPath) : m_LogPath(std::move(LogPath)) - { - HttpClientSettings Settings; - Settings.ConnectTimeout = std::chrono::milliseconds(3000); - m_Http = std::make_unique(std::move(TargetUrl), Settings); - - SetLevel(logging::Info); - - m_WorkerThread = std::thread([this]() { - zen::SetCurrentThreadName("SessionLog"); - WorkerLoop(); - }); - } + explicit SessionLogSink(BlockingQueue* Queue) : m_Queue(Queue) { SetLevel(logging::Info); } - ~SessionLogSink() override - { - BufferedLogEntry ShutdownMsg; - ShutdownMsg.Type = BufferedLogEntry::Type::Shutdown; - m_Queue.Enqueue(std::move(ShutdownMsg)); - - if (m_WorkerThread.joinable()) - { - m_WorkerThread.join(); - } - } + ~SessionLogSink() override = default; void Log(const logging::LogMessage& Msg) override { - BufferedLogEntry Entry; - Entry.Type = BufferedLogEntry::Type::Log; - Entry.Level = LogLevelToString(Msg.GetLevel()); - Entry.Message = std::string(Msg.GetPayload()); - m_Queue.Enqueue(std::move(Entry)); + SessionsServiceClient::SessionCommand Cmd; + Cmd.CommandType = SessionsServiceClient::SessionCommand::Type::Log; + Cmd.LogLevel = Msg.GetLevel(); + Cmd.LogMessage = CompactString(Msg.GetPayload()); + m_Queue->Enqueue(std::move(Cmd)); } void Flush() override { - // Best-effort: enqueue a flush marker so the worker sends any pending entries - BufferedLogEntry FlushMsg; - FlushMsg.Type = BufferedLogEntry::Type::Flush; - m_Queue.Enqueue(std::move(FlushMsg)); + SessionsServiceClient::SessionCommand Cmd; + Cmd.CommandType = SessionsServiceClient::SessionCommand::Type::FlushLogs; + m_Queue->Enqueue(std::move(Cmd)); } void SetFormatter(std::unique_ptr /*InFormatter*/) override @@ -112,144 +51,43 @@ public: } private: - static constexpr size_t BatchSize = 50; - - void WorkerLoop() - { - std::vector Batch; - Batch.reserve(BatchSize); - - BufferedLogEntry Msg; - while (m_Queue.WaitAndDequeue(Msg)) - { - if (Msg.Type == BufferedLogEntry::Type::Shutdown) - { - // Mark complete so WaitAndDequeue returns false on empty queue - m_Queue.CompleteAdding(); - - // Drain remaining log entries - BufferedLogEntry Remaining; - while (m_Queue.WaitAndDequeue(Remaining)) - { - if (Remaining.Type == BufferedLogEntry::Type::Log) - { - Batch.push_back(std::move(Remaining)); - } - } - if (!Batch.empty()) - { - SendBatch(Batch); - } - return; - } - - if (Msg.Type == BufferedLogEntry::Type::Flush) - { - if (!Batch.empty()) - { - SendBatch(Batch); - Batch.clear(); - } - continue; - } - - // Log entry - Batch.push_back(std::move(Msg)); - - if (Batch.size() >= BatchSize) - { - SendBatch(Batch); - Batch.clear(); - } - else - { - // Drain any additional queued entries without blocking - while (Batch.size() < BatchSize && m_Queue.Size() > 0) - { - BufferedLogEntry Extra; - if (m_Queue.WaitAndDequeue(Extra)) - { - if (Extra.Type == BufferedLogEntry::Type::Shutdown) - { - if (!Batch.empty()) - { - SendBatch(Batch); - } - m_Queue.CompleteAdding(); - while (m_Queue.WaitAndDequeue(Extra)) - { - if (Extra.Type == BufferedLogEntry::Type::Log) - { - Batch.push_back(std::move(Extra)); - } - } - if (!Batch.empty()) - { - SendBatch(Batch); - } - return; - } - if (Extra.Type == BufferedLogEntry::Type::Log) - { - Batch.push_back(std::move(Extra)); - } - else if (Extra.Type == BufferedLogEntry::Type::Flush) - { - break; - } - } - } - - if (!Batch.empty()) - { - SendBatch(Batch); - Batch.clear(); - } - } - } - } - - void SendBatch(const std::vector& Batch) - { - try - { - CbObjectWriter Writer; - Writer.BeginArray("entries"); - for (const BufferedLogEntry& Entry : Batch) - { - Writer.BeginObject(); - Writer << "level" << Entry.Level; - Writer << "message" << Entry.Message; - Writer.EndObject(); - } - Writer.EndArray(); - - HttpClient::Response Result = m_Http->Post(m_LogPath, Writer.Save()); - (void)Result; // Best-effort - } - catch (const std::exception&) - { - // Best-effort - silently discard on failure - } - } - - std::string m_LogPath; - std::unique_ptr m_Http; - BlockingQueue m_Queue; - std::thread m_WorkerThread; + BlockingQueue* m_Queue; }; +////////////////////////////////////////////////////////////////////////// +// +// SessionsServiceClient +// + SessionsServiceClient::SessionsServiceClient(Options Opts) : m_Log(logging::Get("sessionsclient")) , m_Options(std::move(Opts)) -, m_SessionPath(fmt::format("sessions/{}", m_Options.SessionId)) +, m_SessionPath(fmt::format("/sessions/{}", m_Options.SessionId)) { - HttpClientSettings Settings; - Settings.ConnectTimeout = std::chrono::milliseconds(3000); - m_Http = std::make_unique(m_Options.TargetUrl, Settings); + // Strip trailing slash to avoid double-slash when appending paths like /sessions/{id} + while (m_Options.TargetUrl.ends_with('/')) + { + m_Options.TargetUrl.pop_back(); + } + + m_WorkerThread = std::thread([this]() { + zen::SetCurrentThreadName("SessionIO"); + WorkerLoop(); + }); } -SessionsServiceClient::~SessionsServiceClient() = default; +SessionsServiceClient::~SessionsServiceClient() +{ + SessionCommand ShutdownCmd; + ShutdownCmd.CommandType = SessionCommand::Type::Shutdown; + m_Queue.Enqueue(std::move(ShutdownCmd)); + m_Queue.CompleteAdding(); + + if (m_WorkerThread.joinable()) + { + m_WorkerThread.join(); + } +} CbObject SessionsServiceClient::BuildRequestBody(CbObjectView Metadata) const @@ -264,21 +102,65 @@ SessionsServiceClient::BuildRequestBody(CbObjectView Metadata) const { Writer << "jobid" << m_Options.JobId; } - if (Metadata.GetSize() > 0) + if (Metadata) { Writer.AddObject("metadata", Metadata); } return Writer.Save(); } -bool +////////////////////////////////////////////////////////////////////////// +// Public API — non-blocking enqueuers + +void SessionsServiceClient::Announce(CbObjectView Metadata) +{ + SessionCommand Cmd; + Cmd.CommandType = SessionCommand::Type::Announce; + if (Metadata) + { + Cmd.Metadata = CbObject::Clone(Metadata); + } + m_Queue.Enqueue(std::move(Cmd)); +} + +void +SessionsServiceClient::UpdateMetadata(CbObjectView Metadata) +{ + SessionCommand Cmd; + Cmd.CommandType = SessionCommand::Type::UpdateMetadata; + if (Metadata) + { + Cmd.Metadata = CbObject::Clone(Metadata); + } + m_Queue.Enqueue(std::move(Cmd)); +} + +void +SessionsServiceClient::Remove() +{ + SessionCommand Cmd; + Cmd.CommandType = SessionCommand::Type::Remove; + m_Queue.Enqueue(std::move(Cmd)); +} + +logging::SinkPtr +SessionsServiceClient::CreateLogSink() +{ + return Ref(new SessionLogSink(&m_Queue)); +} + +////////////////////////////////////////////////////////////////////////// +// Worker thread — processes all session HTTP I/O + +void +SessionsServiceClient::DoAnnounce(HttpClient& Http, CbObjectView Metadata) { try { CbObject Body = BuildRequestBody(Metadata); - HttpClient::Response Result = m_Http->Post(m_SessionPath, std::move(Body)); + HttpClient::Response Result = Http.Post(m_SessionPath, std::move(Body)); if (Result.Error) { @@ -286,26 +168,24 @@ SessionsServiceClient::Announce(CbObjectView Metadata) m_Options.TargetUrl, static_cast(Result.Error->ErrorCode), Result.Error->ErrorMessage); - return false; + return; } if (!IsHttpOk(Result.StatusCode)) { ZEN_WARN("sessions announce failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast(Result.StatusCode)); - return false; + return; } - ZEN_INFO("session announced to '{}'", m_Options.TargetUrl); - return true; + ZEN_DEBUG("session announced to '{}'", m_Options.TargetUrl); } catch (const std::exception& Ex) { ZEN_WARN("sessions announce failed for '{}': {}", m_Options.TargetUrl, Ex.what()); - return false; } } -bool -SessionsServiceClient::UpdateMetadata(CbObjectView Metadata) +void +SessionsServiceClient::DoUpdateMetadata(HttpClient& Http, CbObjectView Metadata) { try { @@ -314,7 +194,7 @@ SessionsServiceClient::UpdateMetadata(CbObjectView Metadata) MemoryView View = Body.GetView(); IoBuffer Payload = IoBufferBuilder::MakeCloneFromMemory(View, ZenContentType::kCbObject); - HttpClient::Response Result = m_Http->Put(m_SessionPath, Payload); + HttpClient::Response Result = Http.Put(m_SessionPath, Payload); if (Result.Error) { @@ -322,29 +202,26 @@ SessionsServiceClient::UpdateMetadata(CbObjectView Metadata) m_Options.TargetUrl, static_cast(Result.Error->ErrorCode), Result.Error->ErrorMessage); - return false; + return; } if (!IsHttpOk(Result.StatusCode)) { ZEN_WARN("sessions update failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast(Result.StatusCode)); - return false; + return; } - - return true; } catch (const std::exception& Ex) { ZEN_WARN("sessions update failed for '{}': {}", m_Options.TargetUrl, Ex.what()); - return false; } } -bool -SessionsServiceClient::Remove() +void +SessionsServiceClient::DoRemove(HttpClient& Http) { try { - HttpClient::Response Result = m_Http->Delete(m_SessionPath); + HttpClient::Response Result = Http.Delete(m_SessionPath); if (Result.Error) { @@ -352,29 +229,153 @@ SessionsServiceClient::Remove() m_Options.TargetUrl, static_cast(Result.Error->ErrorCode), Result.Error->ErrorMessage); - return false; + return; } if (!IsHttpOk(Result.StatusCode)) { ZEN_WARN("sessions remove failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast(Result.StatusCode)); - return false; + return; } - ZEN_INFO("session removed from '{}'", m_Options.TargetUrl); - return true; + ZEN_DEBUG("session removed from '{}'", m_Options.TargetUrl); } catch (const std::exception& Ex) { ZEN_WARN("sessions remove failed for '{}': {}", m_Options.TargetUrl, Ex.what()); - return false; } } -logging::SinkPtr -SessionsServiceClient::CreateLogSink() +void +SessionsServiceClient::SendLogBatch(HttpClient& Http, const std::string& LogPath, const std::vector& Batch) { + try + { + CbObjectWriter Writer; + Writer.BeginArray("entries"); + for (const SessionCommand& Entry : Batch) + { + Writer.BeginObject(); + Writer << "level" << static_cast(Entry.LogLevel); + Writer << "message" << Entry.LogMessage.c_str(); + Writer.EndObject(); + } + Writer.EndArray(); + + HttpClient::Response Result = Http.Post(LogPath, Writer.Save()); + (void)Result; // Best-effort + } + catch (const std::exception&) + { + // Best-effort — silently discard on failure + } +} + +void +SessionsServiceClient::WorkerLoop() +{ + HttpClientSettings Settings = m_Options.ClientSettings; + Settings.ConnectTimeout = std::chrono::milliseconds(3000); + Settings.Timeout = std::chrono::milliseconds(5000); + HttpClient Http(m_Options.TargetUrl, Settings); + std::string LogPath = m_SessionPath + "/log"; - return Ref(new SessionLogSink(m_Options.TargetUrl, std::move(LogPath))); + bool Removed = false; + + static constexpr size_t BatchSize = 50; + + std::vector LogBatch; + LogBatch.reserve(BatchSize); + + auto FlushLogBatch = [&]() { + if (!LogBatch.empty()) + { + SendLogBatch(Http, LogPath, LogBatch); + LogBatch.clear(); + } + }; + + // Returns false to signal loop exit (Shutdown received) + auto ProcessCommand = [&](SessionCommand& Cmd) -> bool { + switch (Cmd.CommandType) + { + case SessionCommand::Type::Log: + LogBatch.push_back(std::move(Cmd)); + if (LogBatch.size() >= BatchSize) + { + FlushLogBatch(); + } + return true; + + case SessionCommand::Type::FlushLogs: + FlushLogBatch(); + return true; + + case SessionCommand::Type::Announce: + FlushLogBatch(); + DoAnnounce(Http, Cmd.Metadata); + return true; + + case SessionCommand::Type::UpdateMetadata: + FlushLogBatch(); + DoUpdateMetadata(Http, Cmd.Metadata); + return true; + + case SessionCommand::Type::Remove: + FlushLogBatch(); + if (!Removed) + { + Removed = true; + DoRemove(Http); + } + return true; + + case SessionCommand::Type::Shutdown: + { + // Drain remaining log entries from the queue + SessionCommand Remaining; + while (m_Queue.WaitAndDequeue(Remaining)) + { + if (Remaining.CommandType == SessionCommand::Type::Log) + { + LogBatch.push_back(std::move(Remaining)); + } + } + FlushLogBatch(); + + if (!Removed) + { + Removed = true; + DoRemove(Http); + } + return false; + } + } + return true; + }; + + SessionCommand Cmd; + while (m_Queue.WaitAndDequeue(Cmd)) + { + if (!ProcessCommand(Cmd)) + { + return; + } + + // Drain additional queued entries without blocking (batching optimization) + while (LogBatch.size() < BatchSize && m_Queue.Size() > 0) + { + SessionCommand Extra; + if (m_Queue.WaitAndDequeue(Extra)) + { + if (!ProcessCommand(Extra)) + { + return; + } + } + } + + FlushLogBatch(); + } } } // namespace zen -- cgit v1.2.3