diff options
| author | Stefan Boberg <[email protected]> | 2026-03-21 21:43:22 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-21 21:43:22 +0100 |
| commit | 14ca5b35d0fc477ba30f10b80f937b523fd7e930 (patch) | |
| tree | 8aab2acfec8be1af4bf0dffdb4badc3b64bf8385 /src/zenutil/splitconsole | |
| parent | fix null stats provider crash when build store is not configured (#875) (diff) | |
| download | zen-14ca5b35d0fc477ba30f10b80f937b523fd7e930.tar.xz zen-14ca5b35d0fc477ba30f10b80f937b523fd7e930.zip | |
Interprocess pipe support (for stdout/stderr capture) (#866)
- **RAII pipe handles for child process stdout/stderr capture**: `StdoutPipeHandles` is now a proper RAII type with automatic cleanup, move semantics, and partial close support. This makes it safe to use pipes for capturing child process output without risking handle/fd leaks.
- **Optional separate stderr pipe**: `CreateProcOptions` now accepts a `StderrPipe` field so callers can capture stdout and stderr independently. When null (default), stderr shares the stdout pipe as before.
- **LogStreamListener with pluggable handler**: The TCP log stream listener accepts connections from remote processes and delivers parsed log lines through a `LogStreamHandler` interface, set dynamically via `SetHandler()`. This allows any client to receive log messages without depending on a specific console implementation.
- **TcpLogStreamSink for zen::logging**: A logging sink that forwards log messages to a `LogStreamListener` over TCP, using the native `zen::logging::Sink` infrastructure with proper thread-safe synchronization.
- **Reliable child process exit codes on Linux**: `waitpid` result handling is fixed so `ProcessHandle::GetExitCode()` returns the real exit code. `ProcessHandle::Reset()` reaps zombies directly, replacing the global `IgnoreChildSignals()` which prevented exit code collection entirely. Also fixes a TOCTOU race in `ProcessHandle::Wait()` on Linux/Mac.
- **Pipe capture test suite**: Tests covering stdout/stderr capture via pipes (both shared and separate modes), RAII cleanup, move semantics, and exit code propagation using `zentest-appstub` as the child process.
- **Service command integration tests**: Shell-based integration tests for `zen service` covering the full lifecycle (install, status, start, stop, uninstall) on all three platforms — Linux (systemd), macOS (launchd), and Windows (SCM via PowerShell).
- **Test script reorganization**: Platform-specific test scripts moved from `scripts/test_scripts/` into `scripts/test_linux/`, `test_mac/`, and `test_windows/`.
Diffstat (limited to 'src/zenutil/splitconsole')
| -rw-r--r-- | src/zenutil/splitconsole/logstreamlistener.cpp | 270 |
1 files changed, 270 insertions, 0 deletions
diff --git a/src/zenutil/splitconsole/logstreamlistener.cpp b/src/zenutil/splitconsole/logstreamlistener.cpp new file mode 100644 index 000000000..9f1d1a02c --- /dev/null +++ b/src/zenutil/splitconsole/logstreamlistener.cpp @@ -0,0 +1,270 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/splitconsole/logstreamlistener.h> + +#include <zencore/compactbinary.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/thread.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <asio.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <atomic> +#include <vector> + +namespace zen { + +////////////////////////////////////////////////////////////////////////// +// LogStreamSession — reads CbObject-framed messages from a single TCP connection + +class LogStreamSession : public std::enable_shared_from_this<LogStreamSession> +{ +public: + LogStreamSession(asio::ip::tcp::socket Socket, std::atomic<LogStreamHandler*>& Handler) + : m_Socket(std::move(Socket)) + , m_Handler(Handler) + { + } + + void Start() { DoRead(); } + +private: + void DoRead() + { + auto Self = shared_from_this(); + m_Socket.async_read_some(asio::buffer(m_ReadBuf.data() + m_BufferUsed, m_ReadBuf.size() - m_BufferUsed), + [Self](const asio::error_code& Ec, std::size_t BytesRead) { + if (Ec) + { + return; // connection closed or error — session ends + } + Self->m_BufferUsed += BytesRead; + Self->ProcessBuffer(); + Self->DoRead(); + }); + } + + void ProcessBuffer() + { + // Try to consume as many complete CbObject messages as possible + while (m_BufferUsed > 0) + { + MemoryView View = MakeMemoryView(m_ReadBuf.data(), m_BufferUsed); + CbFieldType Type; + uint64_t Size = 0; + + if (!TryMeasureCompactBinary(View, Type, Size)) + { + break; // need more data + } + + if (Size > m_BufferUsed) + { + break; // need more data + } + + // Parse the CbObject + CbObject Obj = CbObject(SharedBuffer::Clone(MakeMemoryView(m_ReadBuf.data(), Size))); + + std::string_view Text = Obj["text"].AsString(); + std::string_view Source = Obj["source"].AsString(); + + // Split multi-line messages into individual AppendLogLine calls so that + // each line gets its own row in the SplitConsole ring buffer. + while (!Text.empty()) + { + std::string_view Line = Text; + auto Pos = Text.find('\n'); + if (Pos != std::string_view::npos) + { + Line = Text.substr(0, Pos); + Text.remove_prefix(Pos + 1); + } + else + { + Text = {}; + } + + // Strip trailing CR from CRLF + if (!Line.empty() && Line.back() == '\r') + { + Line.remove_suffix(1); + } + + if (Line.empty()) + { + continue; + } + + LogStreamHandler* Handler = m_Handler.load(std::memory_order_acquire); + if (Handler) + { + if (!Source.empty()) + { + Handler->AppendLogLine(fmt::format("[{}] {}", Source, Line)); + } + else + { + Handler->AppendLogLine(std::string(Line)); + } + } + } + + // Remove consumed bytes from buffer + std::size_t Consumed = static_cast<std::size_t>(Size); + std::memmove(m_ReadBuf.data(), m_ReadBuf.data() + Consumed, m_BufferUsed - Consumed); + m_BufferUsed -= Consumed; + } + + // If buffer is full and we can't parse a message, the message is too large — drop connection + if (m_BufferUsed == m_ReadBuf.size()) + { + ZEN_WARN("LogStreamSession: buffer full with no complete message, dropping connection"); + asio::error_code Ec; + m_Socket.close(Ec); + m_BufferUsed = 0; + } + } + + asio::ip::tcp::socket m_Socket; + std::atomic<LogStreamHandler*>& m_Handler; + std::array<uint8_t, 65536> m_ReadBuf{}; + std::size_t m_BufferUsed = 0; +}; + +////////////////////////////////////////////////////////////////////////// +// LogStreamListener::Impl + +struct LogStreamListener::Impl +{ + // Owned io_context mode — creates and runs its own thread + explicit Impl(uint16_t Port) : m_OwnedIoContext(std::make_unique<asio::io_context>()), m_Acceptor(*m_OwnedIoContext) + { + SetupAcceptor(Port); + m_IoThread = std::thread([this]() { + zen::SetCurrentThreadName("LogStreamIO"); + m_OwnedIoContext->run(); + }); + } + + // External io_context mode — caller drives the io_context + Impl(asio::io_context& IoContext, uint16_t Port) : m_Acceptor(IoContext) { SetupAcceptor(Port); } + + ~Impl() { Shutdown(); } + + void Shutdown() + { + if (m_Stopped.exchange(true)) + { + return; + } + + asio::error_code Ec; + m_Acceptor.close(Ec); + + if (m_OwnedIoContext) + { + m_OwnedIoContext->stop(); + } + + if (m_IoThread.joinable()) + { + m_IoThread.join(); + } + } + + uint16_t GetPort() const { return m_Port; } + + void SetHandler(LogStreamHandler* Handler) { m_Handler.store(Handler, std::memory_order_release); } + +private: + void SetupAcceptor(uint16_t Port) + { + // Try dual-stack IPv6 first (accepts both IPv4 and IPv6), fall back to IPv4-only + asio::error_code Ec; + m_Acceptor.open(asio::ip::tcp::v6(), Ec); + if (!Ec) + { + m_Acceptor.set_option(asio::ip::v6_only(false), Ec); + if (!Ec) + { + m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::tcp::v6(), Port), Ec); + } + } + + if (Ec) + { + // Fall back to IPv4-only + if (m_Acceptor.is_open()) + { + m_Acceptor.close(); + } + m_Acceptor.open(asio::ip::tcp::v4()); + m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::tcp::v4(), Port)); + } + + m_Acceptor.listen(); + m_Port = m_Acceptor.local_endpoint().port(); + StartAccept(); + } + + void StartAccept() + { + m_Acceptor.async_accept([this](const asio::error_code& Ec, asio::ip::tcp::socket Socket) { + if (Ec) + { + return; // acceptor closed + } + + auto Session = std::make_shared<LogStreamSession>(std::move(Socket), m_Handler); + Session->Start(); + + if (!m_Stopped.load()) + { + StartAccept(); + } + }); + } + + std::atomic<LogStreamHandler*> m_Handler{nullptr}; + std::unique_ptr<asio::io_context> m_OwnedIoContext; // null when using external io_context + asio::ip::tcp::acceptor m_Acceptor; + std::thread m_IoThread; + uint16_t m_Port = 0; + std::atomic<bool> m_Stopped{false}; +}; + +////////////////////////////////////////////////////////////////////////// +// LogStreamListener + +LogStreamListener::LogStreamListener(uint16_t Port) : m_Impl(std::make_unique<Impl>(Port)) +{ +} + +LogStreamListener::LogStreamListener(asio::io_context& IoContext, uint16_t Port) : m_Impl(std::make_unique<Impl>(IoContext, Port)) +{ +} + +LogStreamListener::~LogStreamListener() = default; + +void +LogStreamListener::SetHandler(LogStreamHandler* Handler) +{ + m_Impl->SetHandler(Handler); +} + +uint16_t +LogStreamListener::GetPort() const +{ + return m_Impl->GetPort(); +} + +void +LogStreamListener::Shutdown() +{ + m_Impl->Shutdown(); +} + +} // namespace zen |