aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/process/asyncpipereader.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-24 15:47:23 +0100
committerGitHub Enterprise <[email protected]>2026-03-24 15:47:23 +0100
commit21c2abb1bde697c31bee562465cb986a0429a299 (patch)
tree3734d235e79a8fbed307ae5c248936d356553b61 /src/zenutil/process/asyncpipereader.cpp
parentv5.7.25 hotpatch (#874) (diff)
downloadzen-21c2abb1bde697c31bee562465cb986a0429a299.tar.xz
zen-21c2abb1bde697c31bee562465cb986a0429a299.zip
Subprocess Manager (#889)
Adds a `SubprocessManager` for managing child processes with ASIO-integrated async exit detection, stdout/stderr pipe capture, and periodic metrics sampling. Also introduces `ProcessGroup` for OS-backed process grouping (Windows JobObjects / POSIX process groups). ### SubprocessManager - Async process exit detection using platform-native mechanisms (Windows `object_handle`, Linux `pidfd_open`, macOS `kqueue EVFILT_PROC`) — no polling - Stdout/stderr capture via async pipe readers with per-process or default callbacks - Periodic round-robin metrics sampling (CPU, memory) across managed processes - Spawn, adopt, remove, kill, and enumerate managed processes ### ProcessGroup - OS-level process grouping: Windows JobObject (kill-on-close guarantee), POSIX `setpgid` (bulk signal delivery) - Atomic group kill via `TerminateJobObject` (Windows) or `kill(-pgid, sig)` (POSIX) - Per-group aggregate metrics and enumeration ### ProcessHandle improvements - Added explicit constructors from `int` (pid) and `void*` (native handle) - Added move constructor and move assignment operator ### ProcessMetricsTracker - Cross-platform process metrics (CPU time, working set, page faults) via `QueryProcessMetrics()` - ASIO timer-driven periodic sampling with configurable interval and batch size - Aggregate metrics across tracked processes ### Other changes - Fixed `zentest-appstub` writing a spurious `Versions` file to cwd on every invocation
Diffstat (limited to 'src/zenutil/process/asyncpipereader.cpp')
-rw-r--r--src/zenutil/process/asyncpipereader.cpp276
1 files changed, 276 insertions, 0 deletions
diff --git a/src/zenutil/process/asyncpipereader.cpp b/src/zenutil/process/asyncpipereader.cpp
new file mode 100644
index 000000000..2fdcda30d
--- /dev/null
+++ b/src/zenutil/process/asyncpipereader.cpp
@@ -0,0 +1,276 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "asyncpipereader.h"
+
+#include <zencore/logging.h>
+
+#include <array>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+# include <asio/io_context.hpp>
+# include <asio/windows/stream_handle.hpp>
+#else
+# include <fcntl.h>
+# include <unistd.h>
+# include <asio/io_context.hpp>
+# include <asio/posix/stream_descriptor.hpp>
+#endif
+
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+static constexpr size_t kReadBufferSize = 4096;
+
+// ============================================================================
+// POSIX: non-blocking pipe + stream_descriptor
+// ============================================================================
+
+#if !ZEN_PLATFORM_WINDOWS
+
+struct AsyncPipeReader::Impl
+{
+ asio::io_context& m_IoContext;
+ std::unique_ptr<asio::posix::stream_descriptor> m_Descriptor;
+ std::function<void(std::string_view)> m_DataCallback;
+ std::function<void()> m_EofCallback;
+ std::array<char, kReadBufferSize> m_Buffer{};
+
+ explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {}
+
+ ~Impl() { Stop(); }
+
+ void Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view)> DataCallback, std::function<void()> EofCallback)
+ {
+ m_DataCallback = std::move(DataCallback);
+ m_EofCallback = std::move(EofCallback);
+
+ int Fd = Pipe.ReadFd;
+
+ // Close the write end — child already has it
+ Pipe.CloseWriteEnd();
+
+ // Set non-blocking
+ int Flags = fcntl(Fd, F_GETFL, 0);
+ fcntl(Fd, F_SETFL, Flags | O_NONBLOCK);
+
+ // Take ownership of the fd. Detach it from StdoutPipeHandles so it
+ // doesn't get double-closed.
+ Pipe.ReadFd = -1;
+
+ m_Descriptor = std::make_unique<asio::posix::stream_descriptor>(m_IoContext, Fd);
+ EnqueueRead();
+ }
+
+ void Stop()
+ {
+ if (m_Descriptor)
+ {
+ asio::error_code Ec;
+ m_Descriptor->cancel(Ec);
+ m_Descriptor.reset();
+ }
+ }
+
+ void EnqueueRead()
+ {
+ if (!m_Descriptor)
+ {
+ return;
+ }
+
+ m_Descriptor->async_read_some(asio::buffer(m_Buffer), [this](const asio::error_code& Ec, size_t BytesRead) {
+ if (Ec)
+ {
+ if (Ec != asio::error::operation_aborted && m_EofCallback)
+ {
+ m_EofCallback();
+ }
+ return;
+ }
+
+ if (BytesRead > 0 && m_DataCallback)
+ {
+ m_DataCallback(std::string_view(m_Buffer.data(), BytesRead));
+ }
+
+ EnqueueRead();
+ });
+ }
+};
+
+bool
+CreateOverlappedStdoutPipe(StdoutPipeHandles& OutPipe)
+{
+ // On POSIX, regular pipes work fine with non-blocking I/O
+ return CreateStdoutPipe(OutPipe);
+}
+
+// ============================================================================
+// Windows: overlapped named pipe + asio::windows::stream_handle
+//
+// Anonymous pipes (CreatePipe) do not support overlapped I/O. Instead, we
+// create a named pipe pair with FILE_FLAG_OVERLAPPED on the read (server) end.
+// The write (client) end is inheritable and used as the child's stdout/stderr.
+//
+// Callers must use CreateOverlappedStdoutPipe() instead of CreateStdoutPipe()
+// so the pipe is overlapped from the start. Passing a non-overlapped anonymous
+// pipe to Start() will fail.
+// ============================================================================
+
+#else // ZEN_PLATFORM_WINDOWS
+
+static std::atomic<uint64_t> s_PipeSerial{0};
+
+bool
+CreateOverlappedStdoutPipe(StdoutPipeHandles& OutPipe)
+{
+ // Generate a unique pipe name
+ uint64_t Serial = s_PipeSerial.fetch_add(1);
+ wchar_t PipeName[128];
+ swprintf_s(PipeName,
+ _countof(PipeName),
+ L"\\\\.\\pipe\\zen_async_%u_%llu",
+ GetCurrentProcessId(),
+ static_cast<unsigned long long>(Serial));
+
+ // Create the server (read) end with FILE_FLAG_OVERLAPPED
+ HANDLE ReadHandle = CreateNamedPipeW(PipeName,
+ PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_WAIT,
+ 1, // max instances
+ 0, // out buffer size
+ kReadBufferSize,
+ 0, // default timeout
+ nullptr);
+
+ if (ReadHandle == INVALID_HANDLE_VALUE)
+ {
+ ZEN_WARN("CreateNamedPipeW failed: {}", GetLastError());
+ return false;
+ }
+
+ // The read end should not be inherited by the child
+ SetHandleInformation(ReadHandle, HANDLE_FLAG_INHERIT, 0);
+
+ // Open the client (write) end — inheritable, for the child process
+ SECURITY_ATTRIBUTES Sa;
+ Sa.nLength = sizeof(Sa);
+ Sa.lpSecurityDescriptor = nullptr;
+ Sa.bInheritHandle = TRUE;
+
+ HANDLE WriteHandle = CreateFileW(PipeName,
+ GENERIC_WRITE,
+ 0, // no sharing
+ &Sa, // inheritable
+ OPEN_EXISTING,
+ 0, // no special flags on write end
+ nullptr);
+
+ if (WriteHandle == INVALID_HANDLE_VALUE)
+ {
+ DWORD Err = GetLastError();
+ CloseHandle(ReadHandle);
+ ZEN_WARN("CreateFileW for pipe client end failed: {}", Err);
+ return false;
+ }
+
+ OutPipe.ReadHandle = ReadHandle;
+ OutPipe.WriteHandle = WriteHandle;
+ return true;
+}
+
+struct AsyncPipeReader::Impl
+{
+ asio::io_context& m_IoContext;
+ std::unique_ptr<asio::windows::stream_handle> m_StreamHandle;
+ std::function<void(std::string_view)> m_DataCallback;
+ std::function<void()> m_EofCallback;
+ std::array<char, kReadBufferSize> m_Buffer{};
+
+ explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {}
+
+ ~Impl() { Stop(); }
+
+ void Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view)> DataCallback, std::function<void()> EofCallback)
+ {
+ m_DataCallback = std::move(DataCallback);
+ m_EofCallback = std::move(EofCallback);
+
+ HANDLE ReadHandle = static_cast<HANDLE>(Pipe.ReadHandle);
+
+ // Close the write end — child already has it
+ Pipe.CloseWriteEnd();
+
+ // Take ownership of the read handle
+ Pipe.ReadHandle = nullptr;
+
+ m_StreamHandle = std::make_unique<asio::windows::stream_handle>(m_IoContext, ReadHandle);
+ EnqueueRead();
+ }
+
+ void Stop()
+ {
+ if (m_StreamHandle)
+ {
+ asio::error_code Ec;
+ m_StreamHandle->cancel(Ec);
+ m_StreamHandle.reset();
+ }
+ }
+
+ void EnqueueRead()
+ {
+ if (!m_StreamHandle)
+ {
+ return;
+ }
+
+ m_StreamHandle->async_read_some(asio::buffer(m_Buffer), [this](const asio::error_code& Ec, size_t BytesRead) {
+ if (Ec)
+ {
+ if (Ec != asio::error::operation_aborted && m_EofCallback)
+ {
+ m_EofCallback();
+ }
+ return;
+ }
+
+ if (BytesRead > 0 && m_DataCallback)
+ {
+ m_DataCallback(std::string_view(m_Buffer.data(), BytesRead));
+ }
+
+ EnqueueRead();
+ });
+ }
+};
+
+#endif
+
+// ============================================================================
+// Common wrapper
+// ============================================================================
+
+AsyncPipeReader::AsyncPipeReader(asio::io_context& IoContext) : m_Impl(std::make_unique<Impl>(IoContext))
+{
+}
+
+AsyncPipeReader::~AsyncPipeReader() = default;
+
+void
+AsyncPipeReader::Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view)> DataCallback, std::function<void()> EofCallback)
+{
+ m_Impl->Start(std::move(Pipe), std::move(DataCallback), std::move(EofCallback));
+}
+
+void
+AsyncPipeReader::Stop()
+{
+ m_Impl->Stop();
+}
+
+} // namespace zen