aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-23 14:19:57 +0100
committerGitHub Enterprise <[email protected]>2026-03-23 14:19:57 +0100
commit2a445406e09328cb4cf320300f2678997d6775b7 (patch)
treea92f02d94c92144cb6ae32160397298533e4c822 /src/zenutil
parentadd hub instance crash recovery (#885) (diff)
downloadzen-2a445406e09328cb4cf320300f2678997d6775b7.tar.xz
zen-2a445406e09328cb4cf320300f2678997d6775b7.zip
Dashboard refresh (logs, storage, network, object store, docs) (#835)
## Summary This PR adds a session management service, several new dashboard pages, and a number of infrastructure improvements. ### Sessions Service - `SessionsServiceClient` in `zenutil` announces sessions to a remote zenserver with a 15s heartbeat (POST/PUT/DELETE lifecycle) - Storage server registers itself with its own local sessions service on startup - Session mode attribute coupled to server mode (Compute, Proxy, Hub, etc.) - Ended sessions tracked with `ended_at` timestamp; status filtering (Active/Ended/All) - `--sessions-url` config option for remote session announcement - In-process log sink (`InProcSessionLogSink`) forwards server log output to the server's own session, visible in the dashboard ### Session Log Viewer - POST/GET endpoints for session logs (`/sessions/{id}/log`) supporting raw text and structured JSON/CbObject with batch `entries` array - In-memory log storage per session (capped at 10k entries) with cursor-based pagination for efficient incremental fetching - Log panel in the sessions dashboard with incremental DOM updates, auto-scroll (Follow toggle), newest-first toggle, text filter, and log-level coloring - Auto-selects the server's own session on page load ### TCP Log Streaming - `LogStreamListener` and `TcpLogStreamSink` for log delivery over TCP - Sequence numbers on each message with drop detection and synthetic "dropped" notice on gaps - Gathered buffer writes to reduce syscall overhead when flushing batches - Tests covering basic delivery, multi-line splitting, drop detection, and sequencing ### New Dashboard Pages - **Sessions**: master-detail layout with selectable rows, metadata panel, live WebSocket updates, paging, abbreviated date formatting, and "this" pill for the local session - **Object Store**: summary stats tiles and bucket table with click-to-expand inline object listing (`GET /obj/`) - **Storage**: per-volume disk usage breakdown (`GET /admin/storage`), Garbage Collection status section (next-run countdown, last-run stats), and GC History table with paginated rows and expandable detail panels - **Network**: overview tiles, per-service request table, proxy connections, and live WebSocket updates; distinct client IPs and session counts via HyperLogLog ### Documentation Page - In-dashboard Docs page with sidebar navigation, markdown rendering (via `marked`), Mermaid diagram support (theme-aware), collapsible sections, text filtering with highlighting, and cross-document linking - New user-facing docs: `overview.md` (with architecture and per-mode diagrams), `sessions.md`, `cache.md`, `projects.md`; updated `compute.md` - Dev docs moved to `docs/dev/` ### Infrastructure & Bug Fixes - **Deflate compression** for the embedded frontend zip (~3.4MB → ~950KB); zlib inflate support added to `ZipFs` with cached decompressed buffers - **Local IP addresses**: `GetLocalIpAddresses()` (Windows via `GetAdaptersAddresses`, Linux/Mac via `getifaddrs`); surfaced in `/status/status`, `/health/info`, and the dashboard banner - **Dashboard nav**: unified into `zen-nav` web component with `MutationObserver` for dynamically added links, CSS `::part()` to merge banner/nav border radii, and prefix-based active link detection - Stats broadcast refactored from manual JSON string concatenation to `CbObjectWriter`; `CbObject`-to-JS conversion improved for `TimeSpan`, `DateTime`, and large integers - Stats WebSocket boilerplate consolidated into `ZenPage.connect_stats_ws()`
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/include/zenutil/sessionsclient.h65
-rw-r--r--src/zenutil/include/zenutil/splitconsole/logstreamlistener.h27
-rw-r--r--src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h70
-rw-r--r--src/zenutil/sessionsclient.cpp377
-rw-r--r--src/zenutil/splitconsole/logstreamlistener.cpp230
-rw-r--r--src/zenutil/zenutil.cpp4
6 files changed, 676 insertions, 97 deletions
diff --git a/src/zenutil/include/zenutil/sessionsclient.h b/src/zenutil/include/zenutil/sessionsclient.h
new file mode 100644
index 000000000..aca45e61d
--- /dev/null
+++ b/src/zenutil/include/zenutil/sessionsclient.h
@@ -0,0 +1,65 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinary.h>
+#include <zencore/logging.h>
+#include <zencore/logging/sink.h>
+
+#include <zencore/uid.h>
+
+#include <memory>
+#include <string>
+
+namespace zen {
+
+class HttpClient;
+
+/// Client for announcing and maintaining a session on a remote zenserver's /sessions/ endpoint.
+/// Follows the same best-effort pattern as ZenComputeServer's coordinator announce.
+class SessionsServiceClient
+{
+public:
+ struct Options
+ {
+ std::string TargetUrl; // Base URL of the target zenserver (e.g. "http://localhost:8558")
+ std::string AppName; // Application name to register
+ std::string Mode; // Server mode (e.g. "Server", "Compute", "Proxy")
+ Oid SessionId = Oid::Zero; // Session ID to register under
+ Oid JobId = Oid::Zero; // Optional job ID
+ };
+
+ explicit SessionsServiceClient(Options Opts);
+ ~SessionsServiceClient();
+
+ SessionsServiceClient(const SessionsServiceClient&) = delete;
+ SessionsServiceClient& operator=(const SessionsServiceClient&) = delete;
+
+ /// POST /sessions/{id} — register or re-announce the session with optional metadata.
+ [[nodiscard]] bool Announce(CbObjectView Metadata = {});
+
+ /// PUT /sessions/{id} — update metadata on an existing session.
+ [[nodiscard]] bool UpdateMetadata(CbObjectView Metadata = {});
+
+ /// DELETE /sessions/{id} — remove the session.
+ [[nodiscard]] bool Remove();
+
+ /// Create a logging sink that forwards log messages to the session's log endpoint.
+ /// The sink batches messages on a background thread and POSTs them periodically.
+ /// The returned sink can be added to any logger via Logger::AddSink().
+ logging::SinkPtr CreateLogSink();
+
+ const Options& GetOptions() const { return m_Options; }
+ const std::string& GetSessionPath() const { return m_SessionPath; }
+
+private:
+ CbObject BuildRequestBody(CbObjectView Metadata) const;
+
+ LoggerRef Log() { return m_Log; }
+ LoggerRef m_Log;
+ Options m_Options;
+ std::string m_SessionPath; // "sessions/<hex>"
+ std::unique_ptr<HttpClient> m_Http;
+};
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h b/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h
index 06544308c..f3b960f51 100644
--- a/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h
+++ b/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h
@@ -6,7 +6,7 @@
#include <cstdint>
#include <memory>
-#include <string>
+#include <string_view>
ZEN_THIRD_PARTY_INCLUDES_START
#include <asio/io_context.hpp>
@@ -14,44 +14,37 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
-/// Interface for receiving log lines from a LogStreamListener.
-/// Clients implement this to route received log messages to their desired output.
-class LogStreamHandler
+/// Abstract target for log lines received over a TCP log stream.
+class LogStreamTarget
{
public:
- virtual ~LogStreamHandler() = default;
+ virtual ~LogStreamTarget() = default;
- virtual void AppendLogLine(std::string Line) = 0;
+ /// Called (potentially from any thread) when a log line is received.
+ virtual void AppendLogLine(std::string_view Text) = 0;
};
/// TCP listener that accepts connections from remote processes streaming log messages.
/// Each message is a CbObject with fields: "text" (string), "source" (string), "level" (string, optional).
///
-/// A LogStreamHandler can be set to receive parsed log lines. If no handler is set,
-/// received messages are silently discarded.
-///
/// Two modes of operation:
-/// - Owned thread: pass only Port; an internal IO thread is created.
+/// - Owned thread: pass only Target and Port; an internal IO thread is created.
/// - External io_context: pass an existing asio::io_context; no thread is created,
/// the caller is responsible for running the io_context.
class LogStreamListener
{
public:
/// Start listening with an internal IO thread.
- explicit LogStreamListener(uint16_t Port = 0);
+ LogStreamListener(LogStreamTarget& Target, uint16_t Port = 0);
/// Start listening on an externally-driven io_context (no thread created).
- LogStreamListener(asio::io_context& IoContext, uint16_t Port = 0);
+ LogStreamListener(LogStreamTarget& Target, asio::io_context& IoContext, uint16_t Port = 0);
~LogStreamListener();
LogStreamListener(const LogStreamListener&) = delete;
LogStreamListener& operator=(const LogStreamListener&) = delete;
- /// Set the handler that will receive parsed log lines. May be called at any time.
- /// Pass nullptr to stop delivering messages.
- void SetHandler(LogStreamHandler* Handler);
-
/// Returns the actual port the listener is bound to.
uint16_t GetPort() const;
@@ -63,4 +56,6 @@ private:
std::unique_ptr<Impl> m_Impl;
};
+void logstreamlistener_forcelink();
+
} // namespace zen
diff --git a/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h b/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h
index 2ab7d469e..f4ac5ff22 100644
--- a/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h
+++ b/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h
@@ -3,11 +3,11 @@
#pragma once
#include <zencore/compactbinarybuilder.h>
-#include <zencore/logging.h>
#include <zencore/logging/sink.h>
#include <zencore/thread.h>
ZEN_THIRD_PARTY_INCLUDES_START
+#include <EASTL/fixed_vector.h>
#include <asio.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
@@ -17,6 +17,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include <mutex>
#include <string>
#include <thread>
+#include <vector>
namespace zen {
@@ -52,22 +53,7 @@ public:
void Log(const logging::LogMessage& Msg) override
{
- logging::MemoryBuffer Formatted;
- {
- RwLock::SharedLockScope Lock(m_FormatterLock);
- if (m_Formatter)
- {
- m_Formatter->Format(Msg, Formatted);
- }
- else
- {
- // Fallback: use raw payload
- auto Payload = Msg.GetPayload();
- Formatted.append(Payload.data(), Payload.data() + Payload.size());
- }
- }
-
- std::string_view Text(Formatted.data(), Formatted.size());
+ std::string_view Text = Msg.GetPayload();
// Strip trailing newlines
while (!Text.empty() && (Text.back() == '\n' || Text.back() == '\r'))
@@ -75,20 +61,22 @@ public:
Text.remove_suffix(1);
}
- // Build CbObject with text, source, and level fields
+ uint64_t Seq = m_NextSequence.fetch_add(1, std::memory_order_relaxed);
+
+ // Build CbObject with text, source, level, and sequence number fields
CbObjectWriter Writer;
Writer.AddString("text", Text);
Writer.AddString("source", m_Source);
- Writer.AddString("level", ToStringView(Msg.GetLevel()));
+ Writer.AddString("level", logging::ToStringView(Msg.GetLevel()));
+ Writer.AddInteger("seq", Seq);
CbObject Obj = Writer.Save();
// Enqueue for async write
{
std::lock_guard<std::mutex> Lock(m_QueueMutex);
- if (m_Queue.size() >= m_MaxQueueSize)
+ while (m_Queue.size() >= m_MaxQueueSize)
{
m_Queue.pop_front();
- m_DroppedMessages.fetch_add(1, std::memory_order_relaxed);
}
m_Queue.push_back(std::move(Obj));
}
@@ -100,10 +88,9 @@ public:
// Nothing to flush — writes happen asynchronously
}
- void SetFormatter(std::unique_ptr<logging::Formatter> InFormatter) override
+ void SetFormatter(std::unique_ptr<logging::Formatter> /*InFormatter*/) override
{
- RwLock::ExclusiveLockScope Lock(m_FormatterLock);
- m_Formatter = std::move(InFormatter);
+ // Not used — we output the raw payload directly
}
private:
@@ -131,13 +118,6 @@ private:
Batch.swap(m_Queue);
}
- uint32_t Dropped = m_DroppedMessages.exchange(0, std::memory_order_relaxed);
- if (Dropped > 0)
- {
- // We could/should log here, but that could cause a feedback loop which
- // would trigger subsequent dropped message warnings
- }
-
if (!m_Connected && !Connect())
{
if (m_Stopping)
@@ -147,16 +127,21 @@ private:
continue; // drop batch — will retry on next batch
}
+ // Build a gathered buffer sequence so the entire batch is written
+ // in a single socket operation (or as few as the OS needs).
+ eastl::fixed_vector<asio::const_buffer, 64> Buffers;
+ Buffers.reserve(Batch.size());
for (auto& Obj : Batch)
{
- MemoryView View = Obj.GetView();
- asio::error_code Ec;
- asio::write(m_Socket, asio::buffer(View.GetData(), View.GetSize()), Ec);
- if (Ec)
- {
- m_Connected = false;
- break; // drop remaining messages in batch
- }
+ MemoryView View = Obj.GetView();
+ Buffers.emplace_back(View.GetData(), View.GetSize());
+ }
+
+ asio::error_code Ec;
+ asio::write(m_Socket, Buffers, Ec);
+ if (Ec)
+ {
+ m_Connected = false;
}
}
}
@@ -191,15 +176,14 @@ private:
std::string m_Source;
uint32_t m_MaxQueueSize;
- // Formatter (protected by RwLock since Log is called from multiple threads)
- RwLock m_FormatterLock;
- std::unique_ptr<logging::Formatter> m_Formatter;
+ // Sequence counter — incremented atomically by Log() callers.
+ // Gaps in the sequence seen by the receiver indicate dropped messages.
+ std::atomic<uint64_t> m_NextSequence{0};
// Queue shared between Log() callers and IO thread
std::mutex m_QueueMutex;
std::condition_variable m_QueueCv;
std::deque<CbObject> m_Queue;
- std::atomic<uint32_t> m_DroppedMessages{0};
bool m_Stopping = false;
std::chrono::steady_clock::time_point m_DrainDeadline;
diff --git a/src/zenutil/sessionsclient.cpp b/src/zenutil/sessionsclient.cpp
new file mode 100644
index 000000000..c62cc4099
--- /dev/null
+++ b/src/zenutil/sessionsclient.cpp
@@ -0,0 +1,377 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/sessionsclient.h>
+
+#include <zencore/blockingqueue.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
+#include <zencore/logging/logmsg.h>
+#include <zencore/thread.h>
+#include <zenhttp/httpclient.h>
+
+#include <thread>
+#include <vector>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+//
+// SessionLogSink — batching log sink that forwards to /sessions/{id}/log
+//
+
+static const char*
+LogLevelToString(logging::LogLevel Level)
+{
+ switch (Level)
+ {
+ case logging::Trace:
+ return "trace";
+ case logging::Debug:
+ return "debug";
+ case logging::Info:
+ return "info";
+ case logging::Warn:
+ return "warn";
+ case logging::Err:
+ return "error";
+ case logging::Critical:
+ return "critical";
+ default:
+ return "info";
+ }
+}
+
+struct BufferedLogEntry
+{
+ enum class Type : uint8_t
+ {
+ Log,
+ Flush,
+ Shutdown
+ };
+
+ Type Type = Type::Log;
+ std::string Level;
+ std::string Message;
+};
+
+class SessionLogSink final : public logging::Sink
+{
+public:
+ SessionLogSink(std::string TargetUrl, std::string LogPath) : m_LogPath(std::move(LogPath))
+ {
+ HttpClientSettings Settings;
+ Settings.ConnectTimeout = std::chrono::milliseconds(3000);
+ m_Http = std::make_unique<HttpClient>(std::move(TargetUrl), Settings);
+
+ SetLevel(logging::Info);
+
+ m_WorkerThread = std::thread([this]() {
+ zen::SetCurrentThreadName("SessionLog");
+ WorkerLoop();
+ });
+ }
+
+ ~SessionLogSink() override
+ {
+ BufferedLogEntry ShutdownMsg;
+ ShutdownMsg.Type = BufferedLogEntry::Type::Shutdown;
+ m_Queue.Enqueue(std::move(ShutdownMsg));
+
+ if (m_WorkerThread.joinable())
+ {
+ m_WorkerThread.join();
+ }
+ }
+
+ void Log(const logging::LogMessage& Msg) override
+ {
+ BufferedLogEntry Entry;
+ Entry.Type = BufferedLogEntry::Type::Log;
+ Entry.Level = LogLevelToString(Msg.GetLevel());
+ Entry.Message = std::string(Msg.GetPayload());
+ m_Queue.Enqueue(std::move(Entry));
+ }
+
+ void Flush() override
+ {
+ // Best-effort: enqueue a flush marker so the worker sends any pending entries
+ BufferedLogEntry FlushMsg;
+ FlushMsg.Type = BufferedLogEntry::Type::Flush;
+ m_Queue.Enqueue(std::move(FlushMsg));
+ }
+
+ void SetFormatter(std::unique_ptr<logging::Formatter> /*InFormatter*/) override
+ {
+ // No formatting needed — we send raw message text
+ }
+
+private:
+ static constexpr size_t BatchSize = 50;
+
+ void WorkerLoop()
+ {
+ std::vector<BufferedLogEntry> Batch;
+ Batch.reserve(BatchSize);
+
+ BufferedLogEntry Msg;
+ while (m_Queue.WaitAndDequeue(Msg))
+ {
+ if (Msg.Type == BufferedLogEntry::Type::Shutdown)
+ {
+ // Drain remaining log entries
+ BufferedLogEntry Remaining;
+ while (m_Queue.WaitAndDequeue(Remaining))
+ {
+ if (Remaining.Type == BufferedLogEntry::Type::Log)
+ {
+ Batch.push_back(std::move(Remaining));
+ }
+ }
+ if (!Batch.empty())
+ {
+ SendBatch(Batch);
+ }
+ return;
+ }
+
+ if (Msg.Type == BufferedLogEntry::Type::Flush)
+ {
+ if (!Batch.empty())
+ {
+ SendBatch(Batch);
+ Batch.clear();
+ }
+ continue;
+ }
+
+ // Log entry
+ Batch.push_back(std::move(Msg));
+
+ if (Batch.size() >= BatchSize)
+ {
+ SendBatch(Batch);
+ Batch.clear();
+ }
+ else
+ {
+ // Drain any additional queued entries without blocking
+ while (Batch.size() < BatchSize && m_Queue.Size() > 0)
+ {
+ BufferedLogEntry Extra;
+ if (m_Queue.WaitAndDequeue(Extra))
+ {
+ if (Extra.Type == BufferedLogEntry::Type::Shutdown)
+ {
+ if (!Batch.empty())
+ {
+ SendBatch(Batch);
+ }
+ // Drain remaining
+ while (m_Queue.WaitAndDequeue(Extra))
+ {
+ if (Extra.Type == BufferedLogEntry::Type::Log)
+ {
+ Batch.push_back(std::move(Extra));
+ }
+ }
+ if (!Batch.empty())
+ {
+ SendBatch(Batch);
+ }
+ return;
+ }
+ if (Extra.Type == BufferedLogEntry::Type::Log)
+ {
+ Batch.push_back(std::move(Extra));
+ }
+ else if (Extra.Type == BufferedLogEntry::Type::Flush)
+ {
+ break;
+ }
+ }
+ }
+
+ if (!Batch.empty())
+ {
+ SendBatch(Batch);
+ Batch.clear();
+ }
+ }
+ }
+ }
+
+ void SendBatch(const std::vector<BufferedLogEntry>& Batch)
+ {
+ try
+ {
+ CbObjectWriter Writer;
+ Writer.BeginArray("entries");
+ for (const BufferedLogEntry& Entry : Batch)
+ {
+ Writer.BeginObject();
+ Writer << "level" << Entry.Level;
+ Writer << "message" << Entry.Message;
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+
+ HttpClient::Response Result = m_Http->Post(m_LogPath, Writer.Save());
+ (void)Result; // Best-effort
+ }
+ catch (const std::exception&)
+ {
+ // Best-effort — silently discard on failure
+ }
+ }
+
+ std::string m_LogPath;
+ std::unique_ptr<HttpClient> m_Http;
+ BlockingQueue<BufferedLogEntry> m_Queue;
+ std::thread m_WorkerThread;
+};
+
+SessionsServiceClient::SessionsServiceClient(Options Opts)
+: m_Log(logging::Get("sessionsclient"))
+, m_Options(std::move(Opts))
+, m_SessionPath(fmt::format("sessions/{}", m_Options.SessionId))
+{
+ HttpClientSettings Settings;
+ Settings.ConnectTimeout = std::chrono::milliseconds(3000);
+ m_Http = std::make_unique<HttpClient>(m_Options.TargetUrl, Settings);
+}
+
+SessionsServiceClient::~SessionsServiceClient() = default;
+
+CbObject
+SessionsServiceClient::BuildRequestBody(CbObjectView Metadata) const
+{
+ CbObjectWriter Writer;
+ Writer << "appname" << m_Options.AppName;
+ if (!m_Options.Mode.empty())
+ {
+ Writer << "mode" << m_Options.Mode;
+ }
+ if (m_Options.JobId != Oid::Zero)
+ {
+ Writer << "jobid" << m_Options.JobId;
+ }
+ if (Metadata.GetSize() > 0)
+ {
+ Writer.AddObject("metadata", Metadata);
+ }
+ return Writer.Save();
+}
+
+bool
+SessionsServiceClient::Announce(CbObjectView Metadata)
+{
+ try
+ {
+ CbObject Body = BuildRequestBody(Metadata);
+
+ HttpClient::Response Result = m_Http->Post(m_SessionPath, std::move(Body));
+
+ if (Result.Error)
+ {
+ ZEN_WARN("sessions announce failed for '{}': HTTP error {} - {}",
+ m_Options.TargetUrl,
+ static_cast<int>(Result.Error->ErrorCode),
+ Result.Error->ErrorMessage);
+ return false;
+ }
+ if (!IsHttpOk(Result.StatusCode))
+ {
+ ZEN_WARN("sessions announce failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode));
+ return false;
+ }
+
+ ZEN_INFO("session announced to '{}'", m_Options.TargetUrl);
+ return true;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("sessions announce failed for '{}': {}", m_Options.TargetUrl, Ex.what());
+ return false;
+ }
+}
+
+bool
+SessionsServiceClient::UpdateMetadata(CbObjectView Metadata)
+{
+ try
+ {
+ CbObject Body = BuildRequestBody(Metadata);
+
+ MemoryView View = Body.GetView();
+ IoBuffer Payload = IoBufferBuilder::MakeCloneFromMemory(View, ZenContentType::kCbObject);
+
+ HttpClient::Response Result = m_Http->Put(m_SessionPath, Payload);
+
+ if (Result.Error)
+ {
+ ZEN_WARN("sessions update failed for '{}': HTTP error {} - {}",
+ m_Options.TargetUrl,
+ static_cast<int>(Result.Error->ErrorCode),
+ Result.Error->ErrorMessage);
+ return false;
+ }
+ if (!IsHttpOk(Result.StatusCode))
+ {
+ ZEN_WARN("sessions update failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode));
+ return false;
+ }
+
+ return true;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("sessions update failed for '{}': {}", m_Options.TargetUrl, Ex.what());
+ return false;
+ }
+}
+
+bool
+SessionsServiceClient::Remove()
+{
+ try
+ {
+ HttpClient::Response Result = m_Http->Delete(m_SessionPath);
+
+ if (Result.Error)
+ {
+ ZEN_WARN("sessions remove failed for '{}': HTTP error {} - {}",
+ m_Options.TargetUrl,
+ static_cast<int>(Result.Error->ErrorCode),
+ Result.Error->ErrorMessage);
+ return false;
+ }
+ if (!IsHttpOk(Result.StatusCode))
+ {
+ ZEN_WARN("sessions remove failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode));
+ return false;
+ }
+
+ ZEN_INFO("session removed from '{}'", m_Options.TargetUrl);
+ return true;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("sessions remove failed for '{}': {}", m_Options.TargetUrl, Ex.what());
+ return false;
+ }
+}
+
+logging::SinkPtr
+SessionsServiceClient::CreateLogSink()
+{
+ std::string LogPath = m_SessionPath + "/log";
+ return Ref(new SessionLogSink(m_Options.TargetUrl, std::move(LogPath)));
+}
+
+} // namespace zen
diff --git a/src/zenutil/splitconsole/logstreamlistener.cpp b/src/zenutil/splitconsole/logstreamlistener.cpp
index 9f1d1a02c..04718b543 100644
--- a/src/zenutil/splitconsole/logstreamlistener.cpp
+++ b/src/zenutil/splitconsole/logstreamlistener.cpp
@@ -2,6 +2,7 @@
#include <zenutil/splitconsole/logstreamlistener.h>
+#include <zenbase/refcount.h>
#include <zencore/compactbinary.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
@@ -11,7 +12,6 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <asio.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
-#include <atomic>
#include <vector>
namespace zen {
@@ -19,21 +19,17 @@ namespace zen {
//////////////////////////////////////////////////////////////////////////
// LogStreamSession — reads CbObject-framed messages from a single TCP connection
-class LogStreamSession : public std::enable_shared_from_this<LogStreamSession>
+class LogStreamSession : public RefCounted
{
public:
- LogStreamSession(asio::ip::tcp::socket Socket, std::atomic<LogStreamHandler*>& Handler)
- : m_Socket(std::move(Socket))
- , m_Handler(Handler)
- {
- }
+ LogStreamSession(asio::ip::tcp::socket Socket, LogStreamTarget& Target) : m_Socket(std::move(Socket)), m_Target(Target) {}
void Start() { DoRead(); }
private:
void DoRead()
{
- auto Self = shared_from_this();
+ Ref<LogStreamSession> Self(this);
m_Socket.async_read_some(asio::buffer(m_ReadBuf.data() + m_BufferUsed, m_ReadBuf.size() - m_BufferUsed),
[Self](const asio::error_code& Ec, std::size_t BytesRead) {
if (Ec)
@@ -71,8 +67,17 @@ private:
std::string_view Text = Obj["text"].AsString();
std::string_view Source = Obj["source"].AsString();
+ // Check sequence number for gaps (dropped messages)
+ uint64_t Seq = Obj["seq"].AsUInt64();
+ if (Seq > m_NextExpectedSeq)
+ {
+ uint64_t Dropped = Seq - m_NextExpectedSeq;
+ m_Target.AppendLogLine(fmt::format("[{}] *** {} log message(s) dropped ***", Source.empty() ? "log" : Source, Dropped));
+ }
+ m_NextExpectedSeq = Seq + 1;
+
// Split multi-line messages into individual AppendLogLine calls so that
- // each line gets its own row in the SplitConsole ring buffer.
+ // each line gets its own row in the target's log output.
while (!Text.empty())
{
std::string_view Line = Text;
@@ -98,17 +103,13 @@ private:
continue;
}
- LogStreamHandler* Handler = m_Handler.load(std::memory_order_acquire);
- if (Handler)
+ if (!Source.empty())
{
- if (!Source.empty())
- {
- Handler->AppendLogLine(fmt::format("[{}] {}", Source, Line));
- }
- else
- {
- Handler->AppendLogLine(std::string(Line));
- }
+ m_Target.AppendLogLine(fmt::format("[{}] {}", Source, Line));
+ }
+ else
+ {
+ m_Target.AppendLogLine(Line);
}
}
@@ -128,10 +129,11 @@ private:
}
}
- asio::ip::tcp::socket m_Socket;
- std::atomic<LogStreamHandler*>& m_Handler;
- std::array<uint8_t, 65536> m_ReadBuf{};
- std::size_t m_BufferUsed = 0;
+ asio::ip::tcp::socket m_Socket;
+ LogStreamTarget& m_Target;
+ std::array<uint8_t, 65536> m_ReadBuf{};
+ std::size_t m_BufferUsed = 0;
+ uint64_t m_NextExpectedSeq = 0;
};
//////////////////////////////////////////////////////////////////////////
@@ -140,7 +142,10 @@ private:
struct LogStreamListener::Impl
{
// Owned io_context mode — creates and runs its own thread
- explicit Impl(uint16_t Port) : m_OwnedIoContext(std::make_unique<asio::io_context>()), m_Acceptor(*m_OwnedIoContext)
+ Impl(LogStreamTarget& Target, uint16_t Port)
+ : m_Target(Target)
+ , m_OwnedIoContext(std::make_unique<asio::io_context>())
+ , m_Acceptor(*m_OwnedIoContext)
{
SetupAcceptor(Port);
m_IoThread = std::thread([this]() {
@@ -150,7 +155,10 @@ struct LogStreamListener::Impl
}
// External io_context mode — caller drives the io_context
- Impl(asio::io_context& IoContext, uint16_t Port) : m_Acceptor(IoContext) { SetupAcceptor(Port); }
+ Impl(LogStreamTarget& Target, asio::io_context& IoContext, uint16_t Port) : m_Target(Target), m_Acceptor(IoContext)
+ {
+ SetupAcceptor(Port);
+ }
~Impl() { Shutdown(); }
@@ -177,11 +185,12 @@ struct LogStreamListener::Impl
uint16_t GetPort() const { return m_Port; }
- void SetHandler(LogStreamHandler* Handler) { m_Handler.store(Handler, std::memory_order_release); }
-
private:
void SetupAcceptor(uint16_t Port)
{
+ auto& IoCtx = m_OwnedIoContext ? *m_OwnedIoContext : m_Acceptor.get_executor().context();
+ ZEN_UNUSED(IoCtx);
+
// Try dual-stack IPv6 first (accepts both IPv4 and IPv6), fall back to IPv4-only
asio::error_code Ec;
m_Acceptor.open(asio::ip::tcp::v6(), Ec);
@@ -218,7 +227,7 @@ private:
return; // acceptor closed
}
- auto Session = std::make_shared<LogStreamSession>(std::move(Socket), m_Handler);
+ Ref<LogStreamSession> Session(new LogStreamSession(std::move(Socket), m_Target));
Session->Start();
if (!m_Stopped.load())
@@ -228,7 +237,7 @@ private:
});
}
- std::atomic<LogStreamHandler*> m_Handler{nullptr};
+ LogStreamTarget& m_Target;
std::unique_ptr<asio::io_context> m_OwnedIoContext; // null when using external io_context
asio::ip::tcp::acceptor m_Acceptor;
std::thread m_IoThread;
@@ -239,22 +248,17 @@ private:
//////////////////////////////////////////////////////////////////////////
// LogStreamListener
-LogStreamListener::LogStreamListener(uint16_t Port) : m_Impl(std::make_unique<Impl>(Port))
+LogStreamListener::LogStreamListener(LogStreamTarget& Target, uint16_t Port) : m_Impl(std::make_unique<Impl>(Target, Port))
{
}
-LogStreamListener::LogStreamListener(asio::io_context& IoContext, uint16_t Port) : m_Impl(std::make_unique<Impl>(IoContext, Port))
+LogStreamListener::LogStreamListener(LogStreamTarget& Target, asio::io_context& IoContext, uint16_t Port)
+: m_Impl(std::make_unique<Impl>(Target, IoContext, Port))
{
}
LogStreamListener::~LogStreamListener() = default;
-void
-LogStreamListener::SetHandler(LogStreamHandler* Handler)
-{
- m_Impl->SetHandler(Handler);
-}
-
uint16_t
LogStreamListener::GetPort() const
{
@@ -268,3 +272,155 @@ LogStreamListener::Shutdown()
}
} // namespace zen
+
+#if ZEN_WITH_TESTS
+
+# include <zencore/testing.h>
+# include <zenutil/splitconsole/tcplogstreamsink.h>
+
+namespace zen {
+
+void
+logstreamlistener_forcelink()
+{
+}
+
+namespace {
+
+ class CollectingTarget : public LogStreamTarget
+ {
+ public:
+ void AppendLogLine(std::string_view Text) override
+ {
+ std::lock_guard<std::mutex> Lock(m_Mutex);
+ m_Lines.emplace_back(Text);
+ m_Cv.notify_all();
+ }
+
+ std::vector<std::string> WaitForLines(size_t Count, std::chrono::milliseconds Timeout = std::chrono::milliseconds(5000))
+ {
+ std::unique_lock<std::mutex> Lock(m_Mutex);
+ m_Cv.wait_for(Lock, Timeout, [&]() { return m_Lines.size() >= Count; });
+ return m_Lines;
+ }
+
+ private:
+ std::mutex m_Mutex;
+ std::condition_variable m_Cv;
+ std::vector<std::string> m_Lines;
+ };
+
+ logging::LogMessage MakeLogMessage(std::string_view Text, logging::LogLevel Level = logging::Info)
+ {
+ static logging::LogPoint Point{{}, Level, {}};
+ Point.Level = Level;
+ return logging::LogMessage(Point, "test", Text);
+ }
+
+} // namespace
+
+TEST_SUITE_BEGIN("util.logstreamlistener");
+
+TEST_CASE("BasicMessageDelivery")
+{
+ CollectingTarget Target;
+ LogStreamListener Listener(Target);
+
+ {
+ TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "TestSource", 64);
+ Sink.Log(MakeLogMessage("hello world"));
+ Sink.Log(MakeLogMessage("second line"));
+ }
+
+ auto Lines = Target.WaitForLines(2);
+ REQUIRE(Lines.size() == 2);
+ CHECK(Lines[0] == "[TestSource] hello world");
+ CHECK(Lines[1] == "[TestSource] second line");
+}
+
+TEST_CASE("MultiLineMessageSplit")
+{
+ CollectingTarget Target;
+ LogStreamListener Listener(Target);
+
+ {
+ TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "src", 64);
+ Sink.Log(MakeLogMessage("line1\nline2\nline3"));
+ }
+
+ auto Lines = Target.WaitForLines(3);
+ REQUIRE(Lines.size() == 3);
+ CHECK(Lines[0] == "[src] line1");
+ CHECK(Lines[1] == "[src] line2");
+ CHECK(Lines[2] == "[src] line3");
+}
+
+TEST_CASE("DroppedMessageDetection")
+{
+ // Test sequence-gap detection deterministically by sending raw CbObjects
+ // with an explicit gap in sequence numbers, bypassing TcpLogStreamSink.
+ CollectingTarget Target;
+ LogStreamListener Listener(Target);
+
+ {
+ asio::io_context IoContext;
+ asio::ip::tcp::socket Socket(IoContext);
+ Socket.connect(asio::ip::tcp::endpoint(asio::ip::make_address("127.0.0.1"), Listener.GetPort()));
+
+ // Send seq=0, then seq=5 — the listener should detect a gap of 4
+ for (uint64_t Seq : {uint64_t(0), uint64_t(5)})
+ {
+ CbObjectWriter Writer;
+ Writer.AddString("text", fmt::format("msg{}", Seq));
+ Writer.AddString("source", "src");
+ Writer.AddInteger("seq", Seq);
+ CbObject Obj = Writer.Save();
+ MemoryView View = Obj.GetView();
+
+ asio::write(Socket, asio::buffer(View.GetData(), View.GetSize()));
+ }
+ }
+
+ // Expect: msg0, drop notice, msg5
+ auto Lines = Target.WaitForLines(3);
+ REQUIRE(Lines.size() >= 3);
+ CHECK(Lines[0] == "[src] msg0");
+ CHECK(Lines[1].find("4 log message(s) dropped") != std::string::npos);
+ CHECK(Lines[2] == "[src] msg5");
+}
+
+TEST_CASE("SequenceNumbersAreContiguous")
+{
+ CollectingTarget Target;
+ LogStreamListener Listener(Target);
+
+ constexpr int NumMessages = 5;
+ {
+ TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "seq", 64);
+ for (int i = 0; i < NumMessages; i++)
+ {
+ Sink.Log(MakeLogMessage(fmt::format("msg{}", i)));
+ }
+ }
+
+ auto Lines = Target.WaitForLines(NumMessages);
+ REQUIRE(Lines.size() == NumMessages);
+
+ // No "dropped" notices should appear when nothing is dropped
+ for (auto& Line : Lines)
+ {
+ CHECK(Line.find("dropped") == std::string::npos);
+ }
+
+ // Verify ordering
+ for (int i = 0; i < NumMessages; i++)
+ {
+ CHECK(Lines[i] == fmt::format("[seq] msg{}", i));
+ }
+}
+
+TEST_SUITE_END();
+
+} // namespace zen
+
+#endif
diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp
index 734813b69..c4d01554d 100644
--- a/src/zenutil/zenutil.cpp
+++ b/src/zenutil/zenutil.cpp
@@ -7,8 +7,9 @@
# include <zenutil/cloud/imdscredentials.h>
# include <zenutil/cloud/s3client.h>
# include <zenutil/cloud/sigv4.h>
-# include <zenutil/rpcrecording.h>
# include <zenutil/config/commandlineoptions.h>
+# include <zenutil/rpcrecording.h>
+# include <zenutil/splitconsole/logstreamlistener.h>
# include <zenutil/wildcard.h>
namespace zen {
@@ -19,6 +20,7 @@ zenutil_forcelinktests()
cache::rpcrecord_forcelink();
commandlineoptions_forcelink();
imdscredentials_forcelink();
+ logstreamlistener_forcelink();
s3client_forcelink();
sigv4_forcelink();
wildcard_forcelink();