diff options
| author | Stefan Boberg <[email protected]> | 2026-03-23 14:19:57 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-23 14:19:57 +0100 |
| commit | 2a445406e09328cb4cf320300f2678997d6775b7 (patch) | |
| tree | a92f02d94c92144cb6ae32160397298533e4c822 /src/zenutil | |
| parent | add hub instance crash recovery (#885) (diff) | |
| download | zen-2a445406e09328cb4cf320300f2678997d6775b7.tar.xz zen-2a445406e09328cb4cf320300f2678997d6775b7.zip | |
Dashboard refresh (logs, storage, network, object store, docs) (#835)
## Summary
This PR adds a session management service, several new dashboard pages, and a number of infrastructure improvements.
### Sessions Service
- `SessionsServiceClient` in `zenutil` announces sessions to a remote zenserver with a 15s heartbeat (POST/PUT/DELETE lifecycle)
- Storage server registers itself with its own local sessions service on startup
- Session mode attribute coupled to server mode (Compute, Proxy, Hub, etc.)
- Ended sessions tracked with `ended_at` timestamp; status filtering (Active/Ended/All)
- `--sessions-url` config option for remote session announcement
- In-process log sink (`InProcSessionLogSink`) forwards server log output to the server's own session, visible in the dashboard
### Session Log Viewer
- POST/GET endpoints for session logs (`/sessions/{id}/log`) supporting raw text and structured JSON/CbObject with batch `entries` array
- In-memory log storage per session (capped at 10k entries) with cursor-based pagination for efficient incremental fetching
- Log panel in the sessions dashboard with incremental DOM updates, auto-scroll (Follow toggle), newest-first toggle, text filter, and log-level coloring
- Auto-selects the server's own session on page load
### TCP Log Streaming
- `LogStreamListener` and `TcpLogStreamSink` for log delivery over TCP
- Sequence numbers on each message with drop detection and synthetic "dropped" notice on gaps
- Gathered buffer writes to reduce syscall overhead when flushing batches
- Tests covering basic delivery, multi-line splitting, drop detection, and sequencing
### New Dashboard Pages
- **Sessions**: master-detail layout with selectable rows, metadata panel, live WebSocket updates, paging, abbreviated date formatting, and "this" pill for the local session
- **Object Store**: summary stats tiles and bucket table with click-to-expand inline object listing (`GET /obj/`)
- **Storage**: per-volume disk usage breakdown (`GET /admin/storage`), Garbage Collection status section (next-run countdown, last-run stats), and GC History table with paginated rows and expandable detail panels
- **Network**: overview tiles, per-service request table, proxy connections, and live WebSocket updates; distinct client IPs and session counts via HyperLogLog
### Documentation Page
- In-dashboard Docs page with sidebar navigation, markdown rendering (via `marked`), Mermaid diagram support (theme-aware), collapsible sections, text filtering with highlighting, and cross-document linking
- New user-facing docs: `overview.md` (with architecture and per-mode diagrams), `sessions.md`, `cache.md`, `projects.md`; updated `compute.md`
- Dev docs moved to `docs/dev/`
### Infrastructure & Bug Fixes
- **Deflate compression** for the embedded frontend zip (~3.4MB → ~950KB); zlib inflate support added to `ZipFs` with cached decompressed buffers
- **Local IP addresses**: `GetLocalIpAddresses()` (Windows via `GetAdaptersAddresses`, Linux/Mac via `getifaddrs`); surfaced in `/status/status`, `/health/info`, and the dashboard banner
- **Dashboard nav**: unified into `zen-nav` web component with `MutationObserver` for dynamically added links, CSS `::part()` to merge banner/nav border radii, and prefix-based active link detection
- Stats broadcast refactored from manual JSON string concatenation to `CbObjectWriter`; `CbObject`-to-JS conversion improved for `TimeSpan`, `DateTime`, and large integers
- Stats WebSocket boilerplate consolidated into `ZenPage.connect_stats_ws()`
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/include/zenutil/sessionsclient.h | 65 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/splitconsole/logstreamlistener.h | 27 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h | 70 | ||||
| -rw-r--r-- | src/zenutil/sessionsclient.cpp | 377 | ||||
| -rw-r--r-- | src/zenutil/splitconsole/logstreamlistener.cpp | 230 | ||||
| -rw-r--r-- | src/zenutil/zenutil.cpp | 4 |
6 files changed, 676 insertions, 97 deletions
diff --git a/src/zenutil/include/zenutil/sessionsclient.h b/src/zenutil/include/zenutil/sessionsclient.h new file mode 100644 index 000000000..aca45e61d --- /dev/null +++ b/src/zenutil/include/zenutil/sessionsclient.h @@ -0,0 +1,65 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compactbinary.h> +#include <zencore/logging.h> +#include <zencore/logging/sink.h> + +#include <zencore/uid.h> + +#include <memory> +#include <string> + +namespace zen { + +class HttpClient; + +/// Client for announcing and maintaining a session on a remote zenserver's /sessions/ endpoint. +/// Follows the same best-effort pattern as ZenComputeServer's coordinator announce. +class SessionsServiceClient +{ +public: + struct Options + { + std::string TargetUrl; // Base URL of the target zenserver (e.g. "http://localhost:8558") + std::string AppName; // Application name to register + std::string Mode; // Server mode (e.g. "Server", "Compute", "Proxy") + Oid SessionId = Oid::Zero; // Session ID to register under + Oid JobId = Oid::Zero; // Optional job ID + }; + + explicit SessionsServiceClient(Options Opts); + ~SessionsServiceClient(); + + SessionsServiceClient(const SessionsServiceClient&) = delete; + SessionsServiceClient& operator=(const SessionsServiceClient&) = delete; + + /// POST /sessions/{id} — register or re-announce the session with optional metadata. + [[nodiscard]] bool Announce(CbObjectView Metadata = {}); + + /// PUT /sessions/{id} — update metadata on an existing session. + [[nodiscard]] bool UpdateMetadata(CbObjectView Metadata = {}); + + /// DELETE /sessions/{id} — remove the session. + [[nodiscard]] bool Remove(); + + /// Create a logging sink that forwards log messages to the session's log endpoint. + /// The sink batches messages on a background thread and POSTs them periodically. + /// The returned sink can be added to any logger via Logger::AddSink(). + logging::SinkPtr CreateLogSink(); + + const Options& GetOptions() const { return m_Options; } + const std::string& GetSessionPath() const { return m_SessionPath; } + +private: + CbObject BuildRequestBody(CbObjectView Metadata) const; + + LoggerRef Log() { return m_Log; } + LoggerRef m_Log; + Options m_Options; + std::string m_SessionPath; // "sessions/<hex>" + std::unique_ptr<HttpClient> m_Http; +}; + +} // namespace zen diff --git a/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h b/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h index 06544308c..f3b960f51 100644 --- a/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h +++ b/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h @@ -6,7 +6,7 @@ #include <cstdint> #include <memory> -#include <string> +#include <string_view> ZEN_THIRD_PARTY_INCLUDES_START #include <asio/io_context.hpp> @@ -14,44 +14,37 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { -/// Interface for receiving log lines from a LogStreamListener. -/// Clients implement this to route received log messages to their desired output. -class LogStreamHandler +/// Abstract target for log lines received over a TCP log stream. +class LogStreamTarget { public: - virtual ~LogStreamHandler() = default; + virtual ~LogStreamTarget() = default; - virtual void AppendLogLine(std::string Line) = 0; + /// Called (potentially from any thread) when a log line is received. + virtual void AppendLogLine(std::string_view Text) = 0; }; /// TCP listener that accepts connections from remote processes streaming log messages. /// Each message is a CbObject with fields: "text" (string), "source" (string), "level" (string, optional). /// -/// A LogStreamHandler can be set to receive parsed log lines. If no handler is set, -/// received messages are silently discarded. -/// /// Two modes of operation: -/// - Owned thread: pass only Port; an internal IO thread is created. +/// - Owned thread: pass only Target and Port; an internal IO thread is created. /// - External io_context: pass an existing asio::io_context; no thread is created, /// the caller is responsible for running the io_context. class LogStreamListener { public: /// Start listening with an internal IO thread. - explicit LogStreamListener(uint16_t Port = 0); + LogStreamListener(LogStreamTarget& Target, uint16_t Port = 0); /// Start listening on an externally-driven io_context (no thread created). - LogStreamListener(asio::io_context& IoContext, uint16_t Port = 0); + LogStreamListener(LogStreamTarget& Target, asio::io_context& IoContext, uint16_t Port = 0); ~LogStreamListener(); LogStreamListener(const LogStreamListener&) = delete; LogStreamListener& operator=(const LogStreamListener&) = delete; - /// Set the handler that will receive parsed log lines. May be called at any time. - /// Pass nullptr to stop delivering messages. - void SetHandler(LogStreamHandler* Handler); - /// Returns the actual port the listener is bound to. uint16_t GetPort() const; @@ -63,4 +56,6 @@ private: std::unique_ptr<Impl> m_Impl; }; +void logstreamlistener_forcelink(); + } // namespace zen diff --git a/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h b/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h index 2ab7d469e..f4ac5ff22 100644 --- a/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h +++ b/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h @@ -3,11 +3,11 @@ #pragma once #include <zencore/compactbinarybuilder.h> -#include <zencore/logging.h> #include <zencore/logging/sink.h> #include <zencore/thread.h> ZEN_THIRD_PARTY_INCLUDES_START +#include <EASTL/fixed_vector.h> #include <asio.hpp> ZEN_THIRD_PARTY_INCLUDES_END @@ -17,6 +17,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #include <mutex> #include <string> #include <thread> +#include <vector> namespace zen { @@ -52,22 +53,7 @@ public: void Log(const logging::LogMessage& Msg) override { - logging::MemoryBuffer Formatted; - { - RwLock::SharedLockScope Lock(m_FormatterLock); - if (m_Formatter) - { - m_Formatter->Format(Msg, Formatted); - } - else - { - // Fallback: use raw payload - auto Payload = Msg.GetPayload(); - Formatted.append(Payload.data(), Payload.data() + Payload.size()); - } - } - - std::string_view Text(Formatted.data(), Formatted.size()); + std::string_view Text = Msg.GetPayload(); // Strip trailing newlines while (!Text.empty() && (Text.back() == '\n' || Text.back() == '\r')) @@ -75,20 +61,22 @@ public: Text.remove_suffix(1); } - // Build CbObject with text, source, and level fields + uint64_t Seq = m_NextSequence.fetch_add(1, std::memory_order_relaxed); + + // Build CbObject with text, source, level, and sequence number fields CbObjectWriter Writer; Writer.AddString("text", Text); Writer.AddString("source", m_Source); - Writer.AddString("level", ToStringView(Msg.GetLevel())); + Writer.AddString("level", logging::ToStringView(Msg.GetLevel())); + Writer.AddInteger("seq", Seq); CbObject Obj = Writer.Save(); // Enqueue for async write { std::lock_guard<std::mutex> Lock(m_QueueMutex); - if (m_Queue.size() >= m_MaxQueueSize) + while (m_Queue.size() >= m_MaxQueueSize) { m_Queue.pop_front(); - m_DroppedMessages.fetch_add(1, std::memory_order_relaxed); } m_Queue.push_back(std::move(Obj)); } @@ -100,10 +88,9 @@ public: // Nothing to flush — writes happen asynchronously } - void SetFormatter(std::unique_ptr<logging::Formatter> InFormatter) override + void SetFormatter(std::unique_ptr<logging::Formatter> /*InFormatter*/) override { - RwLock::ExclusiveLockScope Lock(m_FormatterLock); - m_Formatter = std::move(InFormatter); + // Not used — we output the raw payload directly } private: @@ -131,13 +118,6 @@ private: Batch.swap(m_Queue); } - uint32_t Dropped = m_DroppedMessages.exchange(0, std::memory_order_relaxed); - if (Dropped > 0) - { - // We could/should log here, but that could cause a feedback loop which - // would trigger subsequent dropped message warnings - } - if (!m_Connected && !Connect()) { if (m_Stopping) @@ -147,16 +127,21 @@ private: continue; // drop batch — will retry on next batch } + // Build a gathered buffer sequence so the entire batch is written + // in a single socket operation (or as few as the OS needs). + eastl::fixed_vector<asio::const_buffer, 64> Buffers; + Buffers.reserve(Batch.size()); for (auto& Obj : Batch) { - MemoryView View = Obj.GetView(); - asio::error_code Ec; - asio::write(m_Socket, asio::buffer(View.GetData(), View.GetSize()), Ec); - if (Ec) - { - m_Connected = false; - break; // drop remaining messages in batch - } + MemoryView View = Obj.GetView(); + Buffers.emplace_back(View.GetData(), View.GetSize()); + } + + asio::error_code Ec; + asio::write(m_Socket, Buffers, Ec); + if (Ec) + { + m_Connected = false; } } } @@ -191,15 +176,14 @@ private: std::string m_Source; uint32_t m_MaxQueueSize; - // Formatter (protected by RwLock since Log is called from multiple threads) - RwLock m_FormatterLock; - std::unique_ptr<logging::Formatter> m_Formatter; + // Sequence counter — incremented atomically by Log() callers. + // Gaps in the sequence seen by the receiver indicate dropped messages. + std::atomic<uint64_t> m_NextSequence{0}; // Queue shared between Log() callers and IO thread std::mutex m_QueueMutex; std::condition_variable m_QueueCv; std::deque<CbObject> m_Queue; - std::atomic<uint32_t> m_DroppedMessages{0}; bool m_Stopping = false; std::chrono::steady_clock::time_point m_DrainDeadline; diff --git a/src/zenutil/sessionsclient.cpp b/src/zenutil/sessionsclient.cpp new file mode 100644 index 000000000..c62cc4099 --- /dev/null +++ b/src/zenutil/sessionsclient.cpp @@ -0,0 +1,377 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/sessionsclient.h> + +#include <zencore/blockingqueue.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> +#include <zencore/logging/logmsg.h> +#include <zencore/thread.h> +#include <zenhttp/httpclient.h> + +#include <thread> +#include <vector> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +////////////////////////////////////////////////////////////////////////// +// +// SessionLogSink — batching log sink that forwards to /sessions/{id}/log +// + +static const char* +LogLevelToString(logging::LogLevel Level) +{ + switch (Level) + { + case logging::Trace: + return "trace"; + case logging::Debug: + return "debug"; + case logging::Info: + return "info"; + case logging::Warn: + return "warn"; + case logging::Err: + return "error"; + case logging::Critical: + return "critical"; + default: + return "info"; + } +} + +struct BufferedLogEntry +{ + enum class Type : uint8_t + { + Log, + Flush, + Shutdown + }; + + Type Type = Type::Log; + std::string Level; + std::string Message; +}; + +class SessionLogSink final : public logging::Sink +{ +public: + SessionLogSink(std::string TargetUrl, std::string LogPath) : m_LogPath(std::move(LogPath)) + { + HttpClientSettings Settings; + Settings.ConnectTimeout = std::chrono::milliseconds(3000); + m_Http = std::make_unique<HttpClient>(std::move(TargetUrl), Settings); + + SetLevel(logging::Info); + + m_WorkerThread = std::thread([this]() { + zen::SetCurrentThreadName("SessionLog"); + WorkerLoop(); + }); + } + + ~SessionLogSink() override + { + BufferedLogEntry ShutdownMsg; + ShutdownMsg.Type = BufferedLogEntry::Type::Shutdown; + m_Queue.Enqueue(std::move(ShutdownMsg)); + + if (m_WorkerThread.joinable()) + { + m_WorkerThread.join(); + } + } + + void Log(const logging::LogMessage& Msg) override + { + BufferedLogEntry Entry; + Entry.Type = BufferedLogEntry::Type::Log; + Entry.Level = LogLevelToString(Msg.GetLevel()); + Entry.Message = std::string(Msg.GetPayload()); + m_Queue.Enqueue(std::move(Entry)); + } + + void Flush() override + { + // Best-effort: enqueue a flush marker so the worker sends any pending entries + BufferedLogEntry FlushMsg; + FlushMsg.Type = BufferedLogEntry::Type::Flush; + m_Queue.Enqueue(std::move(FlushMsg)); + } + + void SetFormatter(std::unique_ptr<logging::Formatter> /*InFormatter*/) override + { + // No formatting needed — we send raw message text + } + +private: + static constexpr size_t BatchSize = 50; + + void WorkerLoop() + { + std::vector<BufferedLogEntry> Batch; + Batch.reserve(BatchSize); + + BufferedLogEntry Msg; + while (m_Queue.WaitAndDequeue(Msg)) + { + if (Msg.Type == BufferedLogEntry::Type::Shutdown) + { + // Drain remaining log entries + BufferedLogEntry Remaining; + while (m_Queue.WaitAndDequeue(Remaining)) + { + if (Remaining.Type == BufferedLogEntry::Type::Log) + { + Batch.push_back(std::move(Remaining)); + } + } + if (!Batch.empty()) + { + SendBatch(Batch); + } + return; + } + + if (Msg.Type == BufferedLogEntry::Type::Flush) + { + if (!Batch.empty()) + { + SendBatch(Batch); + Batch.clear(); + } + continue; + } + + // Log entry + Batch.push_back(std::move(Msg)); + + if (Batch.size() >= BatchSize) + { + SendBatch(Batch); + Batch.clear(); + } + else + { + // Drain any additional queued entries without blocking + while (Batch.size() < BatchSize && m_Queue.Size() > 0) + { + BufferedLogEntry Extra; + if (m_Queue.WaitAndDequeue(Extra)) + { + if (Extra.Type == BufferedLogEntry::Type::Shutdown) + { + if (!Batch.empty()) + { + SendBatch(Batch); + } + // Drain remaining + while (m_Queue.WaitAndDequeue(Extra)) + { + if (Extra.Type == BufferedLogEntry::Type::Log) + { + Batch.push_back(std::move(Extra)); + } + } + if (!Batch.empty()) + { + SendBatch(Batch); + } + return; + } + if (Extra.Type == BufferedLogEntry::Type::Log) + { + Batch.push_back(std::move(Extra)); + } + else if (Extra.Type == BufferedLogEntry::Type::Flush) + { + break; + } + } + } + + if (!Batch.empty()) + { + SendBatch(Batch); + Batch.clear(); + } + } + } + } + + void SendBatch(const std::vector<BufferedLogEntry>& Batch) + { + try + { + CbObjectWriter Writer; + Writer.BeginArray("entries"); + for (const BufferedLogEntry& Entry : Batch) + { + Writer.BeginObject(); + Writer << "level" << Entry.Level; + Writer << "message" << Entry.Message; + Writer.EndObject(); + } + Writer.EndArray(); + + HttpClient::Response Result = m_Http->Post(m_LogPath, Writer.Save()); + (void)Result; // Best-effort + } + catch (const std::exception&) + { + // Best-effort — silently discard on failure + } + } + + std::string m_LogPath; + std::unique_ptr<HttpClient> m_Http; + BlockingQueue<BufferedLogEntry> m_Queue; + std::thread m_WorkerThread; +}; + +SessionsServiceClient::SessionsServiceClient(Options Opts) +: m_Log(logging::Get("sessionsclient")) +, m_Options(std::move(Opts)) +, m_SessionPath(fmt::format("sessions/{}", m_Options.SessionId)) +{ + HttpClientSettings Settings; + Settings.ConnectTimeout = std::chrono::milliseconds(3000); + m_Http = std::make_unique<HttpClient>(m_Options.TargetUrl, Settings); +} + +SessionsServiceClient::~SessionsServiceClient() = default; + +CbObject +SessionsServiceClient::BuildRequestBody(CbObjectView Metadata) const +{ + CbObjectWriter Writer; + Writer << "appname" << m_Options.AppName; + if (!m_Options.Mode.empty()) + { + Writer << "mode" << m_Options.Mode; + } + if (m_Options.JobId != Oid::Zero) + { + Writer << "jobid" << m_Options.JobId; + } + if (Metadata.GetSize() > 0) + { + Writer.AddObject("metadata", Metadata); + } + return Writer.Save(); +} + +bool +SessionsServiceClient::Announce(CbObjectView Metadata) +{ + try + { + CbObject Body = BuildRequestBody(Metadata); + + HttpClient::Response Result = m_Http->Post(m_SessionPath, std::move(Body)); + + if (Result.Error) + { + ZEN_WARN("sessions announce failed for '{}': HTTP error {} - {}", + m_Options.TargetUrl, + static_cast<int>(Result.Error->ErrorCode), + Result.Error->ErrorMessage); + return false; + } + if (!IsHttpOk(Result.StatusCode)) + { + ZEN_WARN("sessions announce failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode)); + return false; + } + + ZEN_INFO("session announced to '{}'", m_Options.TargetUrl); + return true; + } + catch (const std::exception& Ex) + { + ZEN_WARN("sessions announce failed for '{}': {}", m_Options.TargetUrl, Ex.what()); + return false; + } +} + +bool +SessionsServiceClient::UpdateMetadata(CbObjectView Metadata) +{ + try + { + CbObject Body = BuildRequestBody(Metadata); + + MemoryView View = Body.GetView(); + IoBuffer Payload = IoBufferBuilder::MakeCloneFromMemory(View, ZenContentType::kCbObject); + + HttpClient::Response Result = m_Http->Put(m_SessionPath, Payload); + + if (Result.Error) + { + ZEN_WARN("sessions update failed for '{}': HTTP error {} - {}", + m_Options.TargetUrl, + static_cast<int>(Result.Error->ErrorCode), + Result.Error->ErrorMessage); + return false; + } + if (!IsHttpOk(Result.StatusCode)) + { + ZEN_WARN("sessions update failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode)); + return false; + } + + return true; + } + catch (const std::exception& Ex) + { + ZEN_WARN("sessions update failed for '{}': {}", m_Options.TargetUrl, Ex.what()); + return false; + } +} + +bool +SessionsServiceClient::Remove() +{ + try + { + HttpClient::Response Result = m_Http->Delete(m_SessionPath); + + if (Result.Error) + { + ZEN_WARN("sessions remove failed for '{}': HTTP error {} - {}", + m_Options.TargetUrl, + static_cast<int>(Result.Error->ErrorCode), + Result.Error->ErrorMessage); + return false; + } + if (!IsHttpOk(Result.StatusCode)) + { + ZEN_WARN("sessions remove failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode)); + return false; + } + + ZEN_INFO("session removed from '{}'", m_Options.TargetUrl); + return true; + } + catch (const std::exception& Ex) + { + ZEN_WARN("sessions remove failed for '{}': {}", m_Options.TargetUrl, Ex.what()); + return false; + } +} + +logging::SinkPtr +SessionsServiceClient::CreateLogSink() +{ + std::string LogPath = m_SessionPath + "/log"; + return Ref(new SessionLogSink(m_Options.TargetUrl, std::move(LogPath))); +} + +} // namespace zen diff --git a/src/zenutil/splitconsole/logstreamlistener.cpp b/src/zenutil/splitconsole/logstreamlistener.cpp index 9f1d1a02c..04718b543 100644 --- a/src/zenutil/splitconsole/logstreamlistener.cpp +++ b/src/zenutil/splitconsole/logstreamlistener.cpp @@ -2,6 +2,7 @@ #include <zenutil/splitconsole/logstreamlistener.h> +#include <zenbase/refcount.h> #include <zencore/compactbinary.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> @@ -11,7 +12,6 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <asio.hpp> ZEN_THIRD_PARTY_INCLUDES_END -#include <atomic> #include <vector> namespace zen { @@ -19,21 +19,17 @@ namespace zen { ////////////////////////////////////////////////////////////////////////// // LogStreamSession — reads CbObject-framed messages from a single TCP connection -class LogStreamSession : public std::enable_shared_from_this<LogStreamSession> +class LogStreamSession : public RefCounted { public: - LogStreamSession(asio::ip::tcp::socket Socket, std::atomic<LogStreamHandler*>& Handler) - : m_Socket(std::move(Socket)) - , m_Handler(Handler) - { - } + LogStreamSession(asio::ip::tcp::socket Socket, LogStreamTarget& Target) : m_Socket(std::move(Socket)), m_Target(Target) {} void Start() { DoRead(); } private: void DoRead() { - auto Self = shared_from_this(); + Ref<LogStreamSession> Self(this); m_Socket.async_read_some(asio::buffer(m_ReadBuf.data() + m_BufferUsed, m_ReadBuf.size() - m_BufferUsed), [Self](const asio::error_code& Ec, std::size_t BytesRead) { if (Ec) @@ -71,8 +67,17 @@ private: std::string_view Text = Obj["text"].AsString(); std::string_view Source = Obj["source"].AsString(); + // Check sequence number for gaps (dropped messages) + uint64_t Seq = Obj["seq"].AsUInt64(); + if (Seq > m_NextExpectedSeq) + { + uint64_t Dropped = Seq - m_NextExpectedSeq; + m_Target.AppendLogLine(fmt::format("[{}] *** {} log message(s) dropped ***", Source.empty() ? "log" : Source, Dropped)); + } + m_NextExpectedSeq = Seq + 1; + // Split multi-line messages into individual AppendLogLine calls so that - // each line gets its own row in the SplitConsole ring buffer. + // each line gets its own row in the target's log output. while (!Text.empty()) { std::string_view Line = Text; @@ -98,17 +103,13 @@ private: continue; } - LogStreamHandler* Handler = m_Handler.load(std::memory_order_acquire); - if (Handler) + if (!Source.empty()) { - if (!Source.empty()) - { - Handler->AppendLogLine(fmt::format("[{}] {}", Source, Line)); - } - else - { - Handler->AppendLogLine(std::string(Line)); - } + m_Target.AppendLogLine(fmt::format("[{}] {}", Source, Line)); + } + else + { + m_Target.AppendLogLine(Line); } } @@ -128,10 +129,11 @@ private: } } - asio::ip::tcp::socket m_Socket; - std::atomic<LogStreamHandler*>& m_Handler; - std::array<uint8_t, 65536> m_ReadBuf{}; - std::size_t m_BufferUsed = 0; + asio::ip::tcp::socket m_Socket; + LogStreamTarget& m_Target; + std::array<uint8_t, 65536> m_ReadBuf{}; + std::size_t m_BufferUsed = 0; + uint64_t m_NextExpectedSeq = 0; }; ////////////////////////////////////////////////////////////////////////// @@ -140,7 +142,10 @@ private: struct LogStreamListener::Impl { // Owned io_context mode — creates and runs its own thread - explicit Impl(uint16_t Port) : m_OwnedIoContext(std::make_unique<asio::io_context>()), m_Acceptor(*m_OwnedIoContext) + Impl(LogStreamTarget& Target, uint16_t Port) + : m_Target(Target) + , m_OwnedIoContext(std::make_unique<asio::io_context>()) + , m_Acceptor(*m_OwnedIoContext) { SetupAcceptor(Port); m_IoThread = std::thread([this]() { @@ -150,7 +155,10 @@ struct LogStreamListener::Impl } // External io_context mode — caller drives the io_context - Impl(asio::io_context& IoContext, uint16_t Port) : m_Acceptor(IoContext) { SetupAcceptor(Port); } + Impl(LogStreamTarget& Target, asio::io_context& IoContext, uint16_t Port) : m_Target(Target), m_Acceptor(IoContext) + { + SetupAcceptor(Port); + } ~Impl() { Shutdown(); } @@ -177,11 +185,12 @@ struct LogStreamListener::Impl uint16_t GetPort() const { return m_Port; } - void SetHandler(LogStreamHandler* Handler) { m_Handler.store(Handler, std::memory_order_release); } - private: void SetupAcceptor(uint16_t Port) { + auto& IoCtx = m_OwnedIoContext ? *m_OwnedIoContext : m_Acceptor.get_executor().context(); + ZEN_UNUSED(IoCtx); + // Try dual-stack IPv6 first (accepts both IPv4 and IPv6), fall back to IPv4-only asio::error_code Ec; m_Acceptor.open(asio::ip::tcp::v6(), Ec); @@ -218,7 +227,7 @@ private: return; // acceptor closed } - auto Session = std::make_shared<LogStreamSession>(std::move(Socket), m_Handler); + Ref<LogStreamSession> Session(new LogStreamSession(std::move(Socket), m_Target)); Session->Start(); if (!m_Stopped.load()) @@ -228,7 +237,7 @@ private: }); } - std::atomic<LogStreamHandler*> m_Handler{nullptr}; + LogStreamTarget& m_Target; std::unique_ptr<asio::io_context> m_OwnedIoContext; // null when using external io_context asio::ip::tcp::acceptor m_Acceptor; std::thread m_IoThread; @@ -239,22 +248,17 @@ private: ////////////////////////////////////////////////////////////////////////// // LogStreamListener -LogStreamListener::LogStreamListener(uint16_t Port) : m_Impl(std::make_unique<Impl>(Port)) +LogStreamListener::LogStreamListener(LogStreamTarget& Target, uint16_t Port) : m_Impl(std::make_unique<Impl>(Target, Port)) { } -LogStreamListener::LogStreamListener(asio::io_context& IoContext, uint16_t Port) : m_Impl(std::make_unique<Impl>(IoContext, Port)) +LogStreamListener::LogStreamListener(LogStreamTarget& Target, asio::io_context& IoContext, uint16_t Port) +: m_Impl(std::make_unique<Impl>(Target, IoContext, Port)) { } LogStreamListener::~LogStreamListener() = default; -void -LogStreamListener::SetHandler(LogStreamHandler* Handler) -{ - m_Impl->SetHandler(Handler); -} - uint16_t LogStreamListener::GetPort() const { @@ -268,3 +272,155 @@ LogStreamListener::Shutdown() } } // namespace zen + +#if ZEN_WITH_TESTS + +# include <zencore/testing.h> +# include <zenutil/splitconsole/tcplogstreamsink.h> + +namespace zen { + +void +logstreamlistener_forcelink() +{ +} + +namespace { + + class CollectingTarget : public LogStreamTarget + { + public: + void AppendLogLine(std::string_view Text) override + { + std::lock_guard<std::mutex> Lock(m_Mutex); + m_Lines.emplace_back(Text); + m_Cv.notify_all(); + } + + std::vector<std::string> WaitForLines(size_t Count, std::chrono::milliseconds Timeout = std::chrono::milliseconds(5000)) + { + std::unique_lock<std::mutex> Lock(m_Mutex); + m_Cv.wait_for(Lock, Timeout, [&]() { return m_Lines.size() >= Count; }); + return m_Lines; + } + + private: + std::mutex m_Mutex; + std::condition_variable m_Cv; + std::vector<std::string> m_Lines; + }; + + logging::LogMessage MakeLogMessage(std::string_view Text, logging::LogLevel Level = logging::Info) + { + static logging::LogPoint Point{{}, Level, {}}; + Point.Level = Level; + return logging::LogMessage(Point, "test", Text); + } + +} // namespace + +TEST_SUITE_BEGIN("util.logstreamlistener"); + +TEST_CASE("BasicMessageDelivery") +{ + CollectingTarget Target; + LogStreamListener Listener(Target); + + { + TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "TestSource", 64); + Sink.Log(MakeLogMessage("hello world")); + Sink.Log(MakeLogMessage("second line")); + } + + auto Lines = Target.WaitForLines(2); + REQUIRE(Lines.size() == 2); + CHECK(Lines[0] == "[TestSource] hello world"); + CHECK(Lines[1] == "[TestSource] second line"); +} + +TEST_CASE("MultiLineMessageSplit") +{ + CollectingTarget Target; + LogStreamListener Listener(Target); + + { + TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "src", 64); + Sink.Log(MakeLogMessage("line1\nline2\nline3")); + } + + auto Lines = Target.WaitForLines(3); + REQUIRE(Lines.size() == 3); + CHECK(Lines[0] == "[src] line1"); + CHECK(Lines[1] == "[src] line2"); + CHECK(Lines[2] == "[src] line3"); +} + +TEST_CASE("DroppedMessageDetection") +{ + // Test sequence-gap detection deterministically by sending raw CbObjects + // with an explicit gap in sequence numbers, bypassing TcpLogStreamSink. + CollectingTarget Target; + LogStreamListener Listener(Target); + + { + asio::io_context IoContext; + asio::ip::tcp::socket Socket(IoContext); + Socket.connect(asio::ip::tcp::endpoint(asio::ip::make_address("127.0.0.1"), Listener.GetPort())); + + // Send seq=0, then seq=5 — the listener should detect a gap of 4 + for (uint64_t Seq : {uint64_t(0), uint64_t(5)}) + { + CbObjectWriter Writer; + Writer.AddString("text", fmt::format("msg{}", Seq)); + Writer.AddString("source", "src"); + Writer.AddInteger("seq", Seq); + CbObject Obj = Writer.Save(); + MemoryView View = Obj.GetView(); + + asio::write(Socket, asio::buffer(View.GetData(), View.GetSize())); + } + } + + // Expect: msg0, drop notice, msg5 + auto Lines = Target.WaitForLines(3); + REQUIRE(Lines.size() >= 3); + CHECK(Lines[0] == "[src] msg0"); + CHECK(Lines[1].find("4 log message(s) dropped") != std::string::npos); + CHECK(Lines[2] == "[src] msg5"); +} + +TEST_CASE("SequenceNumbersAreContiguous") +{ + CollectingTarget Target; + LogStreamListener Listener(Target); + + constexpr int NumMessages = 5; + { + TcpLogStreamSink Sink("127.0.0.1", Listener.GetPort(), "seq", 64); + for (int i = 0; i < NumMessages; i++) + { + Sink.Log(MakeLogMessage(fmt::format("msg{}", i))); + } + } + + auto Lines = Target.WaitForLines(NumMessages); + REQUIRE(Lines.size() == NumMessages); + + // No "dropped" notices should appear when nothing is dropped + for (auto& Line : Lines) + { + CHECK(Line.find("dropped") == std::string::npos); + } + + // Verify ordering + for (int i = 0; i < NumMessages; i++) + { + CHECK(Lines[i] == fmt::format("[seq] msg{}", i)); + } +} + +TEST_SUITE_END(); + +} // namespace zen + +#endif diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index 734813b69..c4d01554d 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -7,8 +7,9 @@ # include <zenutil/cloud/imdscredentials.h> # include <zenutil/cloud/s3client.h> # include <zenutil/cloud/sigv4.h> -# include <zenutil/rpcrecording.h> # include <zenutil/config/commandlineoptions.h> +# include <zenutil/rpcrecording.h> +# include <zenutil/splitconsole/logstreamlistener.h> # include <zenutil/wildcard.h> namespace zen { @@ -19,6 +20,7 @@ zenutil_forcelinktests() cache::rpcrecord_forcelink(); commandlineoptions_forcelink(); imdscredentials_forcelink(); + logstreamlistener_forcelink(); s3client_forcelink(); sigv4_forcelink(); wildcard_forcelink(); |