aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/splitconsole
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-21 21:43:22 +0100
committerGitHub Enterprise <[email protected]>2026-03-21 21:43:22 +0100
commit14ca5b35d0fc477ba30f10b80f937b523fd7e930 (patch)
tree8aab2acfec8be1af4bf0dffdb4badc3b64bf8385 /src/zenutil/splitconsole
parentfix null stats provider crash when build store is not configured (#875) (diff)
downloadzen-14ca5b35d0fc477ba30f10b80f937b523fd7e930.tar.xz
zen-14ca5b35d0fc477ba30f10b80f937b523fd7e930.zip
Interprocess pipe support (for stdout/stderr capture) (#866)
- **RAII pipe handles for child process stdout/stderr capture**: `StdoutPipeHandles` is now a proper RAII type with automatic cleanup, move semantics, and partial close support. This makes it safe to use pipes for capturing child process output without risking handle/fd leaks. - **Optional separate stderr pipe**: `CreateProcOptions` now accepts a `StderrPipe` field so callers can capture stdout and stderr independently. When null (default), stderr shares the stdout pipe as before. - **LogStreamListener with pluggable handler**: The TCP log stream listener accepts connections from remote processes and delivers parsed log lines through a `LogStreamHandler` interface, set dynamically via `SetHandler()`. This allows any client to receive log messages without depending on a specific console implementation. - **TcpLogStreamSink for zen::logging**: A logging sink that forwards log messages to a `LogStreamListener` over TCP, using the native `zen::logging::Sink` infrastructure with proper thread-safe synchronization. - **Reliable child process exit codes on Linux**: `waitpid` result handling is fixed so `ProcessHandle::GetExitCode()` returns the real exit code. `ProcessHandle::Reset()` reaps zombies directly, replacing the global `IgnoreChildSignals()` which prevented exit code collection entirely. Also fixes a TOCTOU race in `ProcessHandle::Wait()` on Linux/Mac. - **Pipe capture test suite**: Tests covering stdout/stderr capture via pipes (both shared and separate modes), RAII cleanup, move semantics, and exit code propagation using `zentest-appstub` as the child process. - **Service command integration tests**: Shell-based integration tests for `zen service` covering the full lifecycle (install, status, start, stop, uninstall) on all three platforms — Linux (systemd), macOS (launchd), and Windows (SCM via PowerShell). - **Test script reorganization**: Platform-specific test scripts moved from `scripts/test_scripts/` into `scripts/test_linux/`, `test_mac/`, and `test_windows/`.
Diffstat (limited to 'src/zenutil/splitconsole')
-rw-r--r--src/zenutil/splitconsole/logstreamlistener.cpp270
1 files changed, 270 insertions, 0 deletions
diff --git a/src/zenutil/splitconsole/logstreamlistener.cpp b/src/zenutil/splitconsole/logstreamlistener.cpp
new file mode 100644
index 000000000..9f1d1a02c
--- /dev/null
+++ b/src/zenutil/splitconsole/logstreamlistener.cpp
@@ -0,0 +1,270 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/splitconsole/logstreamlistener.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/thread.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <atomic>
+#include <vector>
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+// LogStreamSession — reads CbObject-framed messages from a single TCP connection
+
+class LogStreamSession : public std::enable_shared_from_this<LogStreamSession>
+{
+public:
+ LogStreamSession(asio::ip::tcp::socket Socket, std::atomic<LogStreamHandler*>& Handler)
+ : m_Socket(std::move(Socket))
+ , m_Handler(Handler)
+ {
+ }
+
+ void Start() { DoRead(); }
+
+private:
+ void DoRead()
+ {
+ auto Self = shared_from_this();
+ m_Socket.async_read_some(asio::buffer(m_ReadBuf.data() + m_BufferUsed, m_ReadBuf.size() - m_BufferUsed),
+ [Self](const asio::error_code& Ec, std::size_t BytesRead) {
+ if (Ec)
+ {
+ return; // connection closed or error — session ends
+ }
+ Self->m_BufferUsed += BytesRead;
+ Self->ProcessBuffer();
+ Self->DoRead();
+ });
+ }
+
+ void ProcessBuffer()
+ {
+ // Try to consume as many complete CbObject messages as possible
+ while (m_BufferUsed > 0)
+ {
+ MemoryView View = MakeMemoryView(m_ReadBuf.data(), m_BufferUsed);
+ CbFieldType Type;
+ uint64_t Size = 0;
+
+ if (!TryMeasureCompactBinary(View, Type, Size))
+ {
+ break; // need more data
+ }
+
+ if (Size > m_BufferUsed)
+ {
+ break; // need more data
+ }
+
+ // Parse the CbObject
+ CbObject Obj = CbObject(SharedBuffer::Clone(MakeMemoryView(m_ReadBuf.data(), Size)));
+
+ std::string_view Text = Obj["text"].AsString();
+ std::string_view Source = Obj["source"].AsString();
+
+ // Split multi-line messages into individual AppendLogLine calls so that
+ // each line gets its own row in the SplitConsole ring buffer.
+ while (!Text.empty())
+ {
+ std::string_view Line = Text;
+ auto Pos = Text.find('\n');
+ if (Pos != std::string_view::npos)
+ {
+ Line = Text.substr(0, Pos);
+ Text.remove_prefix(Pos + 1);
+ }
+ else
+ {
+ Text = {};
+ }
+
+ // Strip trailing CR from CRLF
+ if (!Line.empty() && Line.back() == '\r')
+ {
+ Line.remove_suffix(1);
+ }
+
+ if (Line.empty())
+ {
+ continue;
+ }
+
+ LogStreamHandler* Handler = m_Handler.load(std::memory_order_acquire);
+ if (Handler)
+ {
+ if (!Source.empty())
+ {
+ Handler->AppendLogLine(fmt::format("[{}] {}", Source, Line));
+ }
+ else
+ {
+ Handler->AppendLogLine(std::string(Line));
+ }
+ }
+ }
+
+ // Remove consumed bytes from buffer
+ std::size_t Consumed = static_cast<std::size_t>(Size);
+ std::memmove(m_ReadBuf.data(), m_ReadBuf.data() + Consumed, m_BufferUsed - Consumed);
+ m_BufferUsed -= Consumed;
+ }
+
+ // If buffer is full and we can't parse a message, the message is too large — drop connection
+ if (m_BufferUsed == m_ReadBuf.size())
+ {
+ ZEN_WARN("LogStreamSession: buffer full with no complete message, dropping connection");
+ asio::error_code Ec;
+ m_Socket.close(Ec);
+ m_BufferUsed = 0;
+ }
+ }
+
+ asio::ip::tcp::socket m_Socket;
+ std::atomic<LogStreamHandler*>& m_Handler;
+ std::array<uint8_t, 65536> m_ReadBuf{};
+ std::size_t m_BufferUsed = 0;
+};
+
+//////////////////////////////////////////////////////////////////////////
+// LogStreamListener::Impl
+
+struct LogStreamListener::Impl
+{
+ // Owned io_context mode — creates and runs its own thread
+ explicit Impl(uint16_t Port) : m_OwnedIoContext(std::make_unique<asio::io_context>()), m_Acceptor(*m_OwnedIoContext)
+ {
+ SetupAcceptor(Port);
+ m_IoThread = std::thread([this]() {
+ zen::SetCurrentThreadName("LogStreamIO");
+ m_OwnedIoContext->run();
+ });
+ }
+
+ // External io_context mode — caller drives the io_context
+ Impl(asio::io_context& IoContext, uint16_t Port) : m_Acceptor(IoContext) { SetupAcceptor(Port); }
+
+ ~Impl() { Shutdown(); }
+
+ void Shutdown()
+ {
+ if (m_Stopped.exchange(true))
+ {
+ return;
+ }
+
+ asio::error_code Ec;
+ m_Acceptor.close(Ec);
+
+ if (m_OwnedIoContext)
+ {
+ m_OwnedIoContext->stop();
+ }
+
+ if (m_IoThread.joinable())
+ {
+ m_IoThread.join();
+ }
+ }
+
+ uint16_t GetPort() const { return m_Port; }
+
+ void SetHandler(LogStreamHandler* Handler) { m_Handler.store(Handler, std::memory_order_release); }
+
+private:
+ void SetupAcceptor(uint16_t Port)
+ {
+ // Try dual-stack IPv6 first (accepts both IPv4 and IPv6), fall back to IPv4-only
+ asio::error_code Ec;
+ m_Acceptor.open(asio::ip::tcp::v6(), Ec);
+ if (!Ec)
+ {
+ m_Acceptor.set_option(asio::ip::v6_only(false), Ec);
+ if (!Ec)
+ {
+ m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::tcp::v6(), Port), Ec);
+ }
+ }
+
+ if (Ec)
+ {
+ // Fall back to IPv4-only
+ if (m_Acceptor.is_open())
+ {
+ m_Acceptor.close();
+ }
+ m_Acceptor.open(asio::ip::tcp::v4());
+ m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::tcp::v4(), Port));
+ }
+
+ m_Acceptor.listen();
+ m_Port = m_Acceptor.local_endpoint().port();
+ StartAccept();
+ }
+
+ void StartAccept()
+ {
+ m_Acceptor.async_accept([this](const asio::error_code& Ec, asio::ip::tcp::socket Socket) {
+ if (Ec)
+ {
+ return; // acceptor closed
+ }
+
+ auto Session = std::make_shared<LogStreamSession>(std::move(Socket), m_Handler);
+ Session->Start();
+
+ if (!m_Stopped.load())
+ {
+ StartAccept();
+ }
+ });
+ }
+
+ std::atomic<LogStreamHandler*> m_Handler{nullptr};
+ std::unique_ptr<asio::io_context> m_OwnedIoContext; // null when using external io_context
+ asio::ip::tcp::acceptor m_Acceptor;
+ std::thread m_IoThread;
+ uint16_t m_Port = 0;
+ std::atomic<bool> m_Stopped{false};
+};
+
+//////////////////////////////////////////////////////////////////////////
+// LogStreamListener
+
+LogStreamListener::LogStreamListener(uint16_t Port) : m_Impl(std::make_unique<Impl>(Port))
+{
+}
+
+LogStreamListener::LogStreamListener(asio::io_context& IoContext, uint16_t Port) : m_Impl(std::make_unique<Impl>(IoContext, Port))
+{
+}
+
+LogStreamListener::~LogStreamListener() = default;
+
+void
+LogStreamListener::SetHandler(LogStreamHandler* Handler)
+{
+ m_Impl->SetHandler(Handler);
+}
+
+uint16_t
+LogStreamListener::GetPort() const
+{
+ return m_Impl->GetPort();
+}
+
+void
+LogStreamListener::Shutdown()
+{
+ m_Impl->Shutdown();
+}
+
+} // namespace zen