aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/include
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-21 21:43:22 +0100
committerGitHub Enterprise <[email protected]>2026-03-21 21:43:22 +0100
commit14ca5b35d0fc477ba30f10b80f937b523fd7e930 (patch)
tree8aab2acfec8be1af4bf0dffdb4badc3b64bf8385 /src/zenutil/include
parentfix null stats provider crash when build store is not configured (#875) (diff)
downloadzen-14ca5b35d0fc477ba30f10b80f937b523fd7e930.tar.xz
zen-14ca5b35d0fc477ba30f10b80f937b523fd7e930.zip
Interprocess pipe support (for stdout/stderr capture) (#866)
- **RAII pipe handles for child process stdout/stderr capture**: `StdoutPipeHandles` is now a proper RAII type with automatic cleanup, move semantics, and partial close support. This makes it safe to use pipes for capturing child process output without risking handle/fd leaks. - **Optional separate stderr pipe**: `CreateProcOptions` now accepts a `StderrPipe` field so callers can capture stdout and stderr independently. When null (default), stderr shares the stdout pipe as before. - **LogStreamListener with pluggable handler**: The TCP log stream listener accepts connections from remote processes and delivers parsed log lines through a `LogStreamHandler` interface, set dynamically via `SetHandler()`. This allows any client to receive log messages without depending on a specific console implementation. - **TcpLogStreamSink for zen::logging**: A logging sink that forwards log messages to a `LogStreamListener` over TCP, using the native `zen::logging::Sink` infrastructure with proper thread-safe synchronization. - **Reliable child process exit codes on Linux**: `waitpid` result handling is fixed so `ProcessHandle::GetExitCode()` returns the real exit code. `ProcessHandle::Reset()` reaps zombies directly, replacing the global `IgnoreChildSignals()` which prevented exit code collection entirely. Also fixes a TOCTOU race in `ProcessHandle::Wait()` on Linux/Mac. - **Pipe capture test suite**: Tests covering stdout/stderr capture via pipes (both shared and separate modes), RAII cleanup, move semantics, and exit code propagation using `zentest-appstub` as the child process. - **Service command integration tests**: Shell-based integration tests for `zen service` covering the full lifecycle (install, status, start, stop, uninstall) on all three platforms — Linux (systemd), macOS (launchd), and Windows (SCM via PowerShell). - **Test script reorganization**: Platform-specific test scripts moved from `scripts/test_scripts/` into `scripts/test_linux/`, `test_mac/`, and `test_windows/`.
Diffstat (limited to 'src/zenutil/include')
-rw-r--r--src/zenutil/include/zenutil/config/loggingconfig.h6
-rw-r--r--src/zenutil/include/zenutil/logging.h3
-rw-r--r--src/zenutil/include/zenutil/splitconsole/logstreamlistener.h66
-rw-r--r--src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h209
4 files changed, 281 insertions, 3 deletions
diff --git a/src/zenutil/include/zenutil/config/loggingconfig.h b/src/zenutil/include/zenutil/config/loggingconfig.h
index b55b2d9f7..33a5eb172 100644
--- a/src/zenutil/include/zenutil/config/loggingconfig.h
+++ b/src/zenutil/include/zenutil/config/loggingconfig.h
@@ -16,10 +16,12 @@ struct ZenLoggingConfig
{
bool NoConsoleOutput = false; // Control default use of stdout for diagnostics
bool QuietConsole = false; // Configure console logger output to level WARN
+ bool ForceColor = false; // Force colored output even when stdout is not a terminal
std::filesystem::path AbsLogFile; // Absolute path to main log file
std::string Loggers[logging::LogLevelCount];
- std::string LogId; // Id for tagging log output
- std::string OtelEndpointUri; // OpenTelemetry endpoint URI
+ std::string LogId; // Id for tagging log output
+ std::string OtelEndpointUri; // OpenTelemetry endpoint URI
+ std::string LogStreamEndpoint; // TCP log stream endpoint (host:port)
};
void ApplyLoggingOptions(cxxopts::Options& options, ZenLoggingConfig& LoggingConfig);
diff --git a/src/zenutil/include/zenutil/logging.h b/src/zenutil/include/zenutil/logging.h
index 95419c274..282ae1b9a 100644
--- a/src/zenutil/include/zenutil/logging.h
+++ b/src/zenutil/include/zenutil/logging.h
@@ -28,7 +28,8 @@ struct LoggingOptions
bool AllowAsync = true;
bool NoConsoleOutput = false;
bool QuietConsole = false;
- std::filesystem::path AbsLogFile; // Absolute path to main log file
+ bool ForceColor = false; // Force colored output even when stdout is not a terminal
+ std::filesystem::path AbsLogFile; // Absolute path to main log file
std::string LogId;
};
diff --git a/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h b/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h
new file mode 100644
index 000000000..06544308c
--- /dev/null
+++ b/src/zenutil/include/zenutil/splitconsole/logstreamlistener.h
@@ -0,0 +1,66 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <asio/io_context.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+/// Interface for receiving log lines from a LogStreamListener.
+/// Clients implement this to route received log messages to their desired output.
+class LogStreamHandler
+{
+public:
+ virtual ~LogStreamHandler() = default;
+
+ virtual void AppendLogLine(std::string Line) = 0;
+};
+
+/// TCP listener that accepts connections from remote processes streaming log messages.
+/// Each message is a CbObject with fields: "text" (string), "source" (string), "level" (string, optional).
+///
+/// A LogStreamHandler can be set to receive parsed log lines. If no handler is set,
+/// received messages are silently discarded.
+///
+/// Two modes of operation:
+/// - Owned thread: pass only Port; an internal IO thread is created.
+/// - External io_context: pass an existing asio::io_context; no thread is created,
+/// the caller is responsible for running the io_context.
+class LogStreamListener
+{
+public:
+ /// Start listening with an internal IO thread.
+ explicit LogStreamListener(uint16_t Port = 0);
+
+ /// Start listening on an externally-driven io_context (no thread created).
+ LogStreamListener(asio::io_context& IoContext, uint16_t Port = 0);
+
+ ~LogStreamListener();
+
+ LogStreamListener(const LogStreamListener&) = delete;
+ LogStreamListener& operator=(const LogStreamListener&) = delete;
+
+ /// Set the handler that will receive parsed log lines. May be called at any time.
+ /// Pass nullptr to stop delivering messages.
+ void SetHandler(LogStreamHandler* Handler);
+
+ /// Returns the actual port the listener is bound to.
+ uint16_t GetPort() const;
+
+ /// Gracefully stop accepting new connections and shut down existing sessions.
+ void Shutdown();
+
+private:
+ struct Impl;
+ std::unique_ptr<Impl> m_Impl;
+};
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h b/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h
new file mode 100644
index 000000000..2ab7d469e
--- /dev/null
+++ b/src/zenutil/include/zenutil/splitconsole/tcplogstreamsink.h
@@ -0,0 +1,209 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/logging.h>
+#include <zencore/logging/sink.h>
+#include <zencore/thread.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <atomic>
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+#include <string>
+#include <thread>
+
+namespace zen {
+
+/// Logging sink that connects to a LogStreamListener via TCP and sends CbObject-framed log messages.
+/// Connection is lazy (on first enqueued message) and silently reconnects on failure.
+/// Messages are serialized on the caller thread and written asynchronously from a dedicated IO thread.
+/// A bounded queue drops the oldest messages on overflow to prevent unbounded memory growth.
+class TcpLogStreamSink : public logging::Sink
+{
+public:
+ TcpLogStreamSink(const std::string& Host, uint16_t Port, std::string Source, uint32_t MaxQueueSize = 4096)
+ : m_Host(Host)
+ , m_Port(Port)
+ , m_Source(std::move(Source))
+ , m_MaxQueueSize(MaxQueueSize)
+ {
+ m_IoThread = std::thread([this]() { IoThreadMain(); });
+ }
+
+ ~TcpLogStreamSink() override
+ {
+ {
+ std::lock_guard<std::mutex> Lock(m_QueueMutex);
+ m_Stopping = true;
+ m_DrainDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(2);
+ }
+ m_QueueCv.notify_one();
+ if (m_IoThread.joinable())
+ {
+ m_IoThread.join();
+ }
+ }
+
+ void Log(const logging::LogMessage& Msg) override
+ {
+ logging::MemoryBuffer Formatted;
+ {
+ RwLock::SharedLockScope Lock(m_FormatterLock);
+ if (m_Formatter)
+ {
+ m_Formatter->Format(Msg, Formatted);
+ }
+ else
+ {
+ // Fallback: use raw payload
+ auto Payload = Msg.GetPayload();
+ Formatted.append(Payload.data(), Payload.data() + Payload.size());
+ }
+ }
+
+ std::string_view Text(Formatted.data(), Formatted.size());
+
+ // Strip trailing newlines
+ while (!Text.empty() && (Text.back() == '\n' || Text.back() == '\r'))
+ {
+ Text.remove_suffix(1);
+ }
+
+ // Build CbObject with text, source, and level fields
+ CbObjectWriter Writer;
+ Writer.AddString("text", Text);
+ Writer.AddString("source", m_Source);
+ Writer.AddString("level", ToStringView(Msg.GetLevel()));
+ CbObject Obj = Writer.Save();
+
+ // Enqueue for async write
+ {
+ std::lock_guard<std::mutex> Lock(m_QueueMutex);
+ if (m_Queue.size() >= m_MaxQueueSize)
+ {
+ m_Queue.pop_front();
+ m_DroppedMessages.fetch_add(1, std::memory_order_relaxed);
+ }
+ m_Queue.push_back(std::move(Obj));
+ }
+ m_QueueCv.notify_one();
+ }
+
+ void Flush() override
+ {
+ // Nothing to flush — writes happen asynchronously
+ }
+
+ void SetFormatter(std::unique_ptr<logging::Formatter> InFormatter) override
+ {
+ RwLock::ExclusiveLockScope Lock(m_FormatterLock);
+ m_Formatter = std::move(InFormatter);
+ }
+
+private:
+ void IoThreadMain()
+ {
+ zen::SetCurrentThreadName("TcpLogSink");
+
+ for (;;)
+ {
+ std::deque<CbObject> Batch;
+ {
+ std::unique_lock<std::mutex> Lock(m_QueueMutex);
+ m_QueueCv.wait(Lock, [this]() { return m_Stopping || !m_Queue.empty(); });
+
+ if (m_Stopping && m_Queue.empty())
+ {
+ break;
+ }
+
+ if (m_Stopping && std::chrono::steady_clock::now() >= m_DrainDeadline)
+ {
+ break;
+ }
+
+ Batch.swap(m_Queue);
+ }
+
+ uint32_t Dropped = m_DroppedMessages.exchange(0, std::memory_order_relaxed);
+ if (Dropped > 0)
+ {
+ // We could/should log here, but that could cause a feedback loop which
+ // would trigger subsequent dropped message warnings
+ }
+
+ if (!m_Connected && !Connect())
+ {
+ if (m_Stopping)
+ {
+ break; // don't retry during shutdown
+ }
+ continue; // drop batch — will retry on next batch
+ }
+
+ for (auto& Obj : Batch)
+ {
+ MemoryView View = Obj.GetView();
+ asio::error_code Ec;
+ asio::write(m_Socket, asio::buffer(View.GetData(), View.GetSize()), Ec);
+ if (Ec)
+ {
+ m_Connected = false;
+ break; // drop remaining messages in batch
+ }
+ }
+ }
+ }
+
+ bool Connect()
+ {
+ try
+ {
+ asio::ip::tcp::resolver Resolver(m_IoContext);
+ auto Endpoints = Resolver.resolve(m_Host, std::to_string(m_Port));
+ asio::connect(m_Socket, Endpoints);
+ m_Connected = true;
+ return true;
+ }
+ catch (const std::exception&)
+ {
+ // Reset the socket for next attempt
+ m_Socket = asio::ip::tcp::socket(m_IoContext);
+ m_Connected = false;
+ return false;
+ }
+ }
+
+ // IO thread state (only accessed from m_IoThread)
+ asio::io_context m_IoContext;
+ asio::ip::tcp::socket m_Socket{m_IoContext};
+ bool m_Connected = false;
+
+ // Configuration (immutable after construction)
+ std::string m_Host;
+ uint16_t m_Port;
+ std::string m_Source;
+ uint32_t m_MaxQueueSize;
+
+ // Formatter (protected by RwLock since Log is called from multiple threads)
+ RwLock m_FormatterLock;
+ std::unique_ptr<logging::Formatter> m_Formatter;
+
+ // Queue shared between Log() callers and IO thread
+ std::mutex m_QueueMutex;
+ std::condition_variable m_QueueCv;
+ std::deque<CbObject> m_Queue;
+ std::atomic<uint32_t> m_DroppedMessages{0};
+ bool m_Stopping = false;
+ std::chrono::steady_clock::time_point m_DrainDeadline;
+
+ std::thread m_IoThread;
+};
+
+} // namespace zen