diff options
| author | Stefan Boberg <[email protected]> | 2026-03-21 21:43:22 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-21 21:43:22 +0100 |
| commit | 14ca5b35d0fc477ba30f10b80f937b523fd7e930 (patch) | |
| tree | 8aab2acfec8be1af4bf0dffdb4badc3b64bf8385 /src/zenutil/include | |
| parent | fix null stats provider crash when build store is not configured (#875) (diff) | |
| download | zen-14ca5b35d0fc477ba30f10b80f937b523fd7e930.tar.xz zen-14ca5b35d0fc477ba30f10b80f937b523fd7e930.zip | |
Interprocess pipe support (for stdout/stderr capture) (#866)
- **RAII pipe handles for child process stdout/stderr capture**: `StdoutPipeHandles` is now a proper RAII type with automatic cleanup, move semantics, and partial close support. This makes it safe to use pipes for capturing child process output without risking handle/fd leaks.
- **Optional separate stderr pipe**: `CreateProcOptions` now accepts a `StderrPipe` field so callers can capture stdout and stderr independently. When null (default), stderr shares the stdout pipe as before.
- **LogStreamListener with pluggable handler**: The TCP log stream listener accepts connections from remote processes and delivers parsed log lines through a `LogStreamHandler` interface, set dynamically via `SetHandler()`. This allows any client to receive log messages without depending on a specific console implementation.
- **TcpLogStreamSink for zen::logging**: A logging sink that forwards log messages to a `LogStreamListener` over TCP, using the native `zen::logging::Sink` infrastructure with proper thread-safe synchronization.
- **Reliable child process exit codes on Linux**: `waitpid` result handling is fixed so `ProcessHandle::GetExitCode()` returns the real exit code. `ProcessHandle::Reset()` reaps zombies directly, replacing the global `IgnoreChildSignals()` which prevented exit code collection entirely. Also fixes a TOCTOU race in `ProcessHandle::Wait()` on Linux/Mac.
- **Pipe capture test suite**: Tests covering stdout/stderr capture via pipes (both shared and separate modes), RAII cleanup, move semantics, and exit code propagation using `zentest-appstub` as the child process.
- **Service command integration tests**: Shell-based integration tests for `zen service` covering the full lifecycle (install, status, start, stop, uninstall) on all three platforms — Linux (systemd), macOS (launchd), and Windows (SCM via PowerShell).
- **Test script reorganization**: Platform-specific test scripts moved from `scripts/test_scripts/` into `scripts/test_linux/`, `test_mac/`, and `test_windows/`.
Diffstat (limited to 'src/zenutil/include')
4 files changed, 281 insertions, 3 deletions
diff --git a/src/zenutil/include/zenutil/config/loggingconfig.h b/src/zenutil/include/zenutil/config/loggingconfig.h index b55b2d9f7..33a5eb172 100644 --- a/src/zenutil/include/zenutil/config/loggingconfig.h +++ b/src/zenutil/include/zenutil/config/loggingconfig.h @@ -16,10 +16,12 @@ struct ZenLoggingConfig { bool NoConsoleOutput = false; // Control default use of stdout for diagnostics bool QuietConsole = false; // Configure console logger output to level WARN + bool ForceColor = false; // Force colored output even when stdout is not a terminal std::filesystem::path AbsLogFile; // Absolute path to main log file std::string Loggers[logging::LogLevelCount]; - std::string LogId; // Id for tagging log output - std::string OtelEndpointUri; // OpenTelemetry endpoint URI + std::string LogId; // Id for tagging log output + std::string OtelEndpointUri; // OpenTelemetry endpoint URI + std::string LogStreamEndpoint; // TCP log stream endpoint (host:port) }; void ApplyLoggingOptions(cxxopts::Options& options, ZenLoggingConfig& LoggingConfig); diff --git a/src/zenutil/include/zenutil/logging.h b/src/zenutil/include/zenutil/logging.h index 95419c274..282ae1b9a 100644 --- a/src/zenutil/include/zenutil/logging.h +++ b/src/zenutil/include/zenutil/logging.h @@ -28,7 +28,8 @@ struct LoggingOptions bool AllowAsync = true; bool NoConsoleOutput = false; bool QuietConsole = false; - std::filesystem::path AbsLogFile; // Absolute path to main log file + bool ForceColor = false; // Force colored output even when stdout is not a terminal + std::filesystem::path AbsLogFile; // Absolute path to main log file std::string LogId; }; diff --git a/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h b/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h new file mode 100644 index 000000000..06544308c --- /dev/null +++ b/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h @@ -0,0 +1,66 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#include <cstdint> +#include <memory> +#include <string> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <asio/io_context.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +/// Interface for receiving log lines from a LogStreamListener. +/// Clients implement this to route received log messages to their desired output. +class LogStreamHandler +{ +public: + virtual ~LogStreamHandler() = default; + + virtual void AppendLogLine(std::string Line) = 0; +}; + +/// TCP listener that accepts connections from remote processes streaming log messages. +/// Each message is a CbObject with fields: "text" (string), "source" (string), "level" (string, optional). +/// +/// A LogStreamHandler can be set to receive parsed log lines. If no handler is set, +/// received messages are silently discarded. +/// +/// Two modes of operation: +/// - Owned thread: pass only Port; an internal IO thread is created. +/// - External io_context: pass an existing asio::io_context; no thread is created, +/// the caller is responsible for running the io_context. +class LogStreamListener +{ +public: + /// Start listening with an internal IO thread. + explicit LogStreamListener(uint16_t Port = 0); + + /// Start listening on an externally-driven io_context (no thread created). + LogStreamListener(asio::io_context& IoContext, uint16_t Port = 0); + + ~LogStreamListener(); + + LogStreamListener(const LogStreamListener&) = delete; + LogStreamListener& operator=(const LogStreamListener&) = delete; + + /// Set the handler that will receive parsed log lines. May be called at any time. + /// Pass nullptr to stop delivering messages. + void SetHandler(LogStreamHandler* Handler); + + /// Returns the actual port the listener is bound to. + uint16_t GetPort() const; + + /// Gracefully stop accepting new connections and shut down existing sessions. + void Shutdown(); + +private: + struct Impl; + std::unique_ptr<Impl> m_Impl; +}; + +} // namespace zen diff --git a/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h b/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h new file mode 100644 index 000000000..2ab7d469e --- /dev/null +++ b/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h @@ -0,0 +1,209 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compactbinarybuilder.h> +#include <zencore/logging.h> +#include <zencore/logging/sink.h> +#include <zencore/thread.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <asio.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <atomic> +#include <condition_variable> +#include <deque> +#include <mutex> +#include <string> +#include <thread> + +namespace zen { + +/// Logging sink that connects to a LogStreamListener via TCP and sends CbObject-framed log messages. +/// Connection is lazy (on first enqueued message) and silently reconnects on failure. +/// Messages are serialized on the caller thread and written asynchronously from a dedicated IO thread. +/// A bounded queue drops the oldest messages on overflow to prevent unbounded memory growth. +class TcpLogStreamSink : public logging::Sink +{ +public: + TcpLogStreamSink(const std::string& Host, uint16_t Port, std::string Source, uint32_t MaxQueueSize = 4096) + : m_Host(Host) + , m_Port(Port) + , m_Source(std::move(Source)) + , m_MaxQueueSize(MaxQueueSize) + { + m_IoThread = std::thread([this]() { IoThreadMain(); }); + } + + ~TcpLogStreamSink() override + { + { + std::lock_guard<std::mutex> Lock(m_QueueMutex); + m_Stopping = true; + m_DrainDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(2); + } + m_QueueCv.notify_one(); + if (m_IoThread.joinable()) + { + m_IoThread.join(); + } + } + + void Log(const logging::LogMessage& Msg) override + { + logging::MemoryBuffer Formatted; + { + RwLock::SharedLockScope Lock(m_FormatterLock); + if (m_Formatter) + { + m_Formatter->Format(Msg, Formatted); + } + else + { + // Fallback: use raw payload + auto Payload = Msg.GetPayload(); + Formatted.append(Payload.data(), Payload.data() + Payload.size()); + } + } + + std::string_view Text(Formatted.data(), Formatted.size()); + + // Strip trailing newlines + while (!Text.empty() && (Text.back() == '\n' || Text.back() == '\r')) + { + Text.remove_suffix(1); + } + + // Build CbObject with text, source, and level fields + CbObjectWriter Writer; + Writer.AddString("text", Text); + Writer.AddString("source", m_Source); + Writer.AddString("level", ToStringView(Msg.GetLevel())); + CbObject Obj = Writer.Save(); + + // Enqueue for async write + { + std::lock_guard<std::mutex> Lock(m_QueueMutex); + if (m_Queue.size() >= m_MaxQueueSize) + { + m_Queue.pop_front(); + m_DroppedMessages.fetch_add(1, std::memory_order_relaxed); + } + m_Queue.push_back(std::move(Obj)); + } + m_QueueCv.notify_one(); + } + + void Flush() override + { + // Nothing to flush — writes happen asynchronously + } + + void SetFormatter(std::unique_ptr<logging::Formatter> InFormatter) override + { + RwLock::ExclusiveLockScope Lock(m_FormatterLock); + m_Formatter = std::move(InFormatter); + } + +private: + void IoThreadMain() + { + zen::SetCurrentThreadName("TcpLogSink"); + + for (;;) + { + std::deque<CbObject> Batch; + { + std::unique_lock<std::mutex> Lock(m_QueueMutex); + m_QueueCv.wait(Lock, [this]() { return m_Stopping || !m_Queue.empty(); }); + + if (m_Stopping && m_Queue.empty()) + { + break; + } + + if (m_Stopping && std::chrono::steady_clock::now() >= m_DrainDeadline) + { + break; + } + + Batch.swap(m_Queue); + } + + uint32_t Dropped = m_DroppedMessages.exchange(0, std::memory_order_relaxed); + if (Dropped > 0) + { + // We could/should log here, but that could cause a feedback loop which + // would trigger subsequent dropped message warnings + } + + if (!m_Connected && !Connect()) + { + if (m_Stopping) + { + break; // don't retry during shutdown + } + continue; // drop batch — will retry on next batch + } + + for (auto& Obj : Batch) + { + MemoryView View = Obj.GetView(); + asio::error_code Ec; + asio::write(m_Socket, asio::buffer(View.GetData(), View.GetSize()), Ec); + if (Ec) + { + m_Connected = false; + break; // drop remaining messages in batch + } + } + } + } + + bool Connect() + { + try + { + asio::ip::tcp::resolver Resolver(m_IoContext); + auto Endpoints = Resolver.resolve(m_Host, std::to_string(m_Port)); + asio::connect(m_Socket, Endpoints); + m_Connected = true; + return true; + } + catch (const std::exception&) + { + // Reset the socket for next attempt + m_Socket = asio::ip::tcp::socket(m_IoContext); + m_Connected = false; + return false; + } + } + + // IO thread state (only accessed from m_IoThread) + asio::io_context m_IoContext; + asio::ip::tcp::socket m_Socket{m_IoContext}; + bool m_Connected = false; + + // Configuration (immutable after construction) + std::string m_Host; + uint16_t m_Port; + std::string m_Source; + uint32_t m_MaxQueueSize; + + // Formatter (protected by RwLock since Log is called from multiple threads) + RwLock m_FormatterLock; + std::unique_ptr<logging::Formatter> m_Formatter; + + // Queue shared between Log() callers and IO thread + std::mutex m_QueueMutex; + std::condition_variable m_QueueCv; + std::deque<CbObject> m_Queue; + std::atomic<uint32_t> m_DroppedMessages{0}; + bool m_Stopping = false; + std::chrono::steady_clock::time_point m_DrainDeadline; + + std::thread m_IoThread; +}; + +} // namespace zen |