aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/process
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-24 15:47:23 +0100
committerGitHub Enterprise <[email protected]>2026-03-24 15:47:23 +0100
commit21c2abb1bde697c31bee562465cb986a0429a299 (patch)
tree3734d235e79a8fbed307ae5c248936d356553b61 /src/zenutil/process
parentv5.7.25 hotpatch (#874) (diff)
downloadzen-21c2abb1bde697c31bee562465cb986a0429a299.tar.xz
zen-21c2abb1bde697c31bee562465cb986a0429a299.zip
Subprocess Manager (#889)
Adds a `SubprocessManager` for managing child processes with ASIO-integrated async exit detection, stdout/stderr pipe capture, and periodic metrics sampling. Also introduces `ProcessGroup` for OS-backed process grouping (Windows JobObjects / POSIX process groups). ### SubprocessManager - Async process exit detection using platform-native mechanisms (Windows `object_handle`, Linux `pidfd_open`, macOS `kqueue EVFILT_PROC`) — no polling - Stdout/stderr capture via async pipe readers with per-process or default callbacks - Periodic round-robin metrics sampling (CPU, memory) across managed processes - Spawn, adopt, remove, kill, and enumerate managed processes ### ProcessGroup - OS-level process grouping: Windows JobObject (kill-on-close guarantee), POSIX `setpgid` (bulk signal delivery) - Atomic group kill via `TerminateJobObject` (Windows) or `kill(-pgid, sig)` (POSIX) - Per-group aggregate metrics and enumeration ### ProcessHandle improvements - Added explicit constructors from `int` (pid) and `void*` (native handle) - Added move constructor and move assignment operator ### ProcessMetricsTracker - Cross-platform process metrics (CPU time, working set, page faults) via `QueryProcessMetrics()` - ASIO timer-driven periodic sampling with configurable interval and batch size - Aggregate metrics across tracked processes ### Other changes - Fixed `zentest-appstub` writing a spurious `Versions` file to cwd on every invocation
Diffstat (limited to 'src/zenutil/process')
-rw-r--r--src/zenutil/process/asyncpipereader.cpp276
-rw-r--r--src/zenutil/process/asyncpipereader.h62
-rw-r--r--src/zenutil/process/exitwatcher.cpp294
-rw-r--r--src/zenutil/process/exitwatcher.h48
-rw-r--r--src/zenutil/process/subprocessmanager.cpp1811
5 files changed, 2491 insertions, 0 deletions
diff --git a/src/zenutil/process/asyncpipereader.cpp b/src/zenutil/process/asyncpipereader.cpp
new file mode 100644
index 000000000..2fdcda30d
--- /dev/null
+++ b/src/zenutil/process/asyncpipereader.cpp
@@ -0,0 +1,276 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "asyncpipereader.h"
+
+#include <zencore/logging.h>
+
+#include <array>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+# include <asio/io_context.hpp>
+# include <asio/windows/stream_handle.hpp>
+#else
+# include <fcntl.h>
+# include <unistd.h>
+# include <asio/io_context.hpp>
+# include <asio/posix/stream_descriptor.hpp>
+#endif
+
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+static constexpr size_t kReadBufferSize = 4096;
+
+// ============================================================================
+// POSIX: non-blocking pipe + stream_descriptor
+// ============================================================================
+
+#if !ZEN_PLATFORM_WINDOWS
+
+struct AsyncPipeReader::Impl
+{
+ asio::io_context& m_IoContext;
+ std::unique_ptr<asio::posix::stream_descriptor> m_Descriptor;
+ std::function<void(std::string_view)> m_DataCallback;
+ std::function<void()> m_EofCallback;
+ std::array<char, kReadBufferSize> m_Buffer{};
+
+ explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {}
+
+ ~Impl() { Stop(); }
+
+ void Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view)> DataCallback, std::function<void()> EofCallback)
+ {
+ m_DataCallback = std::move(DataCallback);
+ m_EofCallback = std::move(EofCallback);
+
+ int Fd = Pipe.ReadFd;
+
+ // Close the write end — child already has it
+ Pipe.CloseWriteEnd();
+
+ // Set non-blocking
+ int Flags = fcntl(Fd, F_GETFL, 0);
+ fcntl(Fd, F_SETFL, Flags | O_NONBLOCK);
+
+ // Take ownership of the fd. Detach it from StdoutPipeHandles so it
+ // doesn't get double-closed.
+ Pipe.ReadFd = -1;
+
+ m_Descriptor = std::make_unique<asio::posix::stream_descriptor>(m_IoContext, Fd);
+ EnqueueRead();
+ }
+
+ void Stop()
+ {
+ if (m_Descriptor)
+ {
+ asio::error_code Ec;
+ m_Descriptor->cancel(Ec);
+ m_Descriptor.reset();
+ }
+ }
+
+ void EnqueueRead()
+ {
+ if (!m_Descriptor)
+ {
+ return;
+ }
+
+ m_Descriptor->async_read_some(asio::buffer(m_Buffer), [this](const asio::error_code& Ec, size_t BytesRead) {
+ if (Ec)
+ {
+ if (Ec != asio::error::operation_aborted && m_EofCallback)
+ {
+ m_EofCallback();
+ }
+ return;
+ }
+
+ if (BytesRead > 0 && m_DataCallback)
+ {
+ m_DataCallback(std::string_view(m_Buffer.data(), BytesRead));
+ }
+
+ EnqueueRead();
+ });
+ }
+};
+
+bool
+CreateOverlappedStdoutPipe(StdoutPipeHandles& OutPipe)
+{
+ // On POSIX, regular pipes work fine with non-blocking I/O
+ return CreateStdoutPipe(OutPipe);
+}
+
+// ============================================================================
+// Windows: overlapped named pipe + asio::windows::stream_handle
+//
+// Anonymous pipes (CreatePipe) do not support overlapped I/O. Instead, we
+// create a named pipe pair with FILE_FLAG_OVERLAPPED on the read (server) end.
+// The write (client) end is inheritable and used as the child's stdout/stderr.
+//
+// Callers must use CreateOverlappedStdoutPipe() instead of CreateStdoutPipe()
+// so the pipe is overlapped from the start. Passing a non-overlapped anonymous
+// pipe to Start() will fail.
+// ============================================================================
+
+#else // ZEN_PLATFORM_WINDOWS
+
+static std::atomic<uint64_t> s_PipeSerial{0};
+
+bool
+CreateOverlappedStdoutPipe(StdoutPipeHandles& OutPipe)
+{
+ // Generate a unique pipe name
+ uint64_t Serial = s_PipeSerial.fetch_add(1);
+ wchar_t PipeName[128];
+ swprintf_s(PipeName,
+ _countof(PipeName),
+ L"\\\\.\\pipe\\zen_async_%u_%llu",
+ GetCurrentProcessId(),
+ static_cast<unsigned long long>(Serial));
+
+ // Create the server (read) end with FILE_FLAG_OVERLAPPED
+ HANDLE ReadHandle = CreateNamedPipeW(PipeName,
+ PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_WAIT,
+ 1, // max instances
+ 0, // out buffer size
+ kReadBufferSize,
+ 0, // default timeout
+ nullptr);
+
+ if (ReadHandle == INVALID_HANDLE_VALUE)
+ {
+ ZEN_WARN("CreateNamedPipeW failed: {}", GetLastError());
+ return false;
+ }
+
+ // The read end should not be inherited by the child
+ SetHandleInformation(ReadHandle, HANDLE_FLAG_INHERIT, 0);
+
+ // Open the client (write) end — inheritable, for the child process
+ SECURITY_ATTRIBUTES Sa;
+ Sa.nLength = sizeof(Sa);
+ Sa.lpSecurityDescriptor = nullptr;
+ Sa.bInheritHandle = TRUE;
+
+ HANDLE WriteHandle = CreateFileW(PipeName,
+ GENERIC_WRITE,
+ 0, // no sharing
+ &Sa, // inheritable
+ OPEN_EXISTING,
+ 0, // no special flags on write end
+ nullptr);
+
+ if (WriteHandle == INVALID_HANDLE_VALUE)
+ {
+ DWORD Err = GetLastError();
+ CloseHandle(ReadHandle);
+ ZEN_WARN("CreateFileW for pipe client end failed: {}", Err);
+ return false;
+ }
+
+ OutPipe.ReadHandle = ReadHandle;
+ OutPipe.WriteHandle = WriteHandle;
+ return true;
+}
+
+struct AsyncPipeReader::Impl
+{
+ asio::io_context& m_IoContext;
+ std::unique_ptr<asio::windows::stream_handle> m_StreamHandle;
+ std::function<void(std::string_view)> m_DataCallback;
+ std::function<void()> m_EofCallback;
+ std::array<char, kReadBufferSize> m_Buffer{};
+
+ explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {}
+
+ ~Impl() { Stop(); }
+
+ void Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view)> DataCallback, std::function<void()> EofCallback)
+ {
+ m_DataCallback = std::move(DataCallback);
+ m_EofCallback = std::move(EofCallback);
+
+ HANDLE ReadHandle = static_cast<HANDLE>(Pipe.ReadHandle);
+
+ // Close the write end — child already has it
+ Pipe.CloseWriteEnd();
+
+ // Take ownership of the read handle
+ Pipe.ReadHandle = nullptr;
+
+ m_StreamHandle = std::make_unique<asio::windows::stream_handle>(m_IoContext, ReadHandle);
+ EnqueueRead();
+ }
+
+ void Stop()
+ {
+ if (m_StreamHandle)
+ {
+ asio::error_code Ec;
+ m_StreamHandle->cancel(Ec);
+ m_StreamHandle.reset();
+ }
+ }
+
+ void EnqueueRead()
+ {
+ if (!m_StreamHandle)
+ {
+ return;
+ }
+
+ m_StreamHandle->async_read_some(asio::buffer(m_Buffer), [this](const asio::error_code& Ec, size_t BytesRead) {
+ if (Ec)
+ {
+ if (Ec != asio::error::operation_aborted && m_EofCallback)
+ {
+ m_EofCallback();
+ }
+ return;
+ }
+
+ if (BytesRead > 0 && m_DataCallback)
+ {
+ m_DataCallback(std::string_view(m_Buffer.data(), BytesRead));
+ }
+
+ EnqueueRead();
+ });
+ }
+};
+
+#endif
+
+// ============================================================================
+// Common wrapper
+// ============================================================================
+
+AsyncPipeReader::AsyncPipeReader(asio::io_context& IoContext) : m_Impl(std::make_unique<Impl>(IoContext))
+{
+}
+
+AsyncPipeReader::~AsyncPipeReader() = default;
+
+void
+AsyncPipeReader::Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view)> DataCallback, std::function<void()> EofCallback)
+{
+ m_Impl->Start(std::move(Pipe), std::move(DataCallback), std::move(EofCallback));
+}
+
+void
+AsyncPipeReader::Stop()
+{
+ m_Impl->Stop();
+}
+
+} // namespace zen
diff --git a/src/zenutil/process/asyncpipereader.h b/src/zenutil/process/asyncpipereader.h
new file mode 100644
index 000000000..ad2ff8455
--- /dev/null
+++ b/src/zenutil/process/asyncpipereader.h
@@ -0,0 +1,62 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/process.h>
+#include <zencore/zencore.h>
+
+#include <functional>
+#include <memory>
+#include <string_view>
+
+namespace asio {
+class io_context;
+}
+
+namespace zen {
+
+/// Create an overlapped pipe pair suitable for async I/O on Windows.
+///
+/// Unlike CreateStdoutPipe() (which creates anonymous non-overlapped pipes),
+/// this creates a named pipe with FILE_FLAG_OVERLAPPED on the read end, so it
+/// can be used with asio::windows::stream_handle for fully async reads.
+/// The write end is inheritable and suitable for child process redirection.
+///
+/// On non-Windows platforms this simply delegates to CreateStdoutPipe().
+bool CreateOverlappedStdoutPipe(StdoutPipeHandles& OutPipe);
+
+/// Async pipe reader for capturing child process stdout/stderr.
+///
+/// Takes ownership of a pipe's read end and reads asynchronously:
+/// Linux/macOS: non-blocking fd + asio::posix::stream_descriptor
+/// Windows: overlapped named pipe + asio::windows::stream_handle
+///
+/// On Windows the pipe must have been created with CreateOverlappedStdoutPipe()
+/// for async I/O to work. Pipes from CreateStdoutPipe() will fail.
+///
+/// DataCallback is invoked for each chunk read (on the io_context).
+/// EofCallback is invoked when the pipe closes (child exited or pipe broken).
+class AsyncPipeReader
+{
+public:
+ explicit AsyncPipeReader(asio::io_context& IoContext);
+ ~AsyncPipeReader();
+
+ AsyncPipeReader(const AsyncPipeReader&) = delete;
+ AsyncPipeReader& operator=(const AsyncPipeReader&) = delete;
+
+ /// Take ownership of the pipe read-end and start async reading.
+ /// The write end is closed immediately (caller should have already launched
+ /// the child process). DataCallback receives raw chunks. EofCallback fires
+ /// once when the pipe reaches EOF.
+ void Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view Data)> DataCallback, std::function<void()> EofCallback);
+
+ /// Stop reading and close the pipe. Callbacks will not fire after this returns.
+ void Stop();
+
+private:
+ struct Impl;
+ std::unique_ptr<Impl> m_Impl;
+};
+
+} // namespace zen
diff --git a/src/zenutil/process/exitwatcher.cpp b/src/zenutil/process/exitwatcher.cpp
new file mode 100644
index 000000000..cef31ebca
--- /dev/null
+++ b/src/zenutil/process/exitwatcher.cpp
@@ -0,0 +1,294 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "exitwatcher.h"
+
+#include <zencore/logging.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+# include <asio/io_context.hpp>
+# include <asio/windows/object_handle.hpp>
+#elif ZEN_PLATFORM_LINUX
+# include <sys/syscall.h>
+# include <sys/wait.h>
+# include <unistd.h>
+# include <asio/io_context.hpp>
+# include <asio/posix/stream_descriptor.hpp>
+
+# ifndef SYS_pidfd_open
+# define SYS_pidfd_open 434 // x86_64
+# endif
+#elif ZEN_PLATFORM_MAC
+# include <sys/event.h>
+# include <sys/wait.h>
+# include <unistd.h>
+# include <asio/io_context.hpp>
+# include <asio/posix/stream_descriptor.hpp>
+#endif
+
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+// ============================================================================
+// Linux: pidfd_open + stream_descriptor
+// ============================================================================
+
+#if ZEN_PLATFORM_LINUX
+
+struct ProcessExitWatcher::Impl
+{
+ asio::io_context& m_IoContext;
+ std::unique_ptr<asio::posix::stream_descriptor> m_Descriptor;
+ int m_PidFd = -1;
+ int m_Pid = 0;
+
+ explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {}
+
+ ~Impl() { Cancel(); }
+
+ void Watch(const ProcessHandle& Handle, std::function<void(int ExitCode)> OnExit)
+ {
+ m_Pid = Handle.Pid();
+
+ // pidfd_open returns an fd that becomes readable when the process exits.
+ // Available since Linux 5.3.
+ m_PidFd = static_cast<int>(syscall(SYS_pidfd_open, m_Pid, 0));
+ if (m_PidFd < 0)
+ {
+ ZEN_WARN("pidfd_open failed for pid {}: {}", m_Pid, strerror(errno));
+ return;
+ }
+
+ m_Descriptor = std::make_unique<asio::posix::stream_descriptor>(m_IoContext, m_PidFd);
+
+ m_Descriptor->async_wait(asio::posix::stream_descriptor::wait_read,
+ [this, Callback = std::move(OnExit)](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return; // Cancelled or error
+ }
+
+ int ExitCode = -1;
+ int Status = 0;
+ // The pidfd told us the process exited. Reap it with waitpid.
+ if (waitpid(m_Pid, &Status, WNOHANG) > 0)
+ {
+ if (WIFEXITED(Status))
+ {
+ ExitCode = WEXITSTATUS(Status);
+ }
+ else if (WIFSIGNALED(Status))
+ {
+ constexpr int kSignalExitBase = 128;
+ ExitCode = kSignalExitBase + WTERMSIG(Status);
+ }
+ }
+
+ Callback(ExitCode);
+ });
+ }
+
+ void Cancel()
+ {
+ if (m_Descriptor)
+ {
+ asio::error_code Ec;
+ m_Descriptor->cancel(Ec);
+ m_Descriptor.reset();
+ // stream_descriptor closes the fd on destruction, so don't close m_PidFd separately
+ m_PidFd = -1;
+ }
+ else if (m_PidFd >= 0)
+ {
+ close(m_PidFd);
+ m_PidFd = -1;
+ }
+ }
+};
+
+// ============================================================================
+// Windows: object_handle::async_wait
+// ============================================================================
+
+#elif ZEN_PLATFORM_WINDOWS
+
+struct ProcessExitWatcher::Impl
+{
+ asio::io_context& m_IoContext;
+ std::unique_ptr<asio::windows::object_handle> m_ObjectHandle;
+ void* m_DuplicatedHandle = nullptr;
+
+ explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {}
+
+ ~Impl() { Cancel(); }
+
+ void Watch(const ProcessHandle& Handle, std::function<void(int ExitCode)> OnExit)
+ {
+ // Duplicate the process handle so ASIO can take ownership independently
+ HANDLE SourceHandle = static_cast<HANDLE>(Handle.Handle());
+ HANDLE CurrentProcess = GetCurrentProcess();
+ BOOL Success = DuplicateHandle(CurrentProcess,
+ SourceHandle,
+ CurrentProcess,
+ reinterpret_cast<LPHANDLE>(&m_DuplicatedHandle),
+ SYNCHRONIZE | PROCESS_QUERY_INFORMATION,
+ FALSE,
+ 0);
+
+ if (!Success)
+ {
+ ZEN_WARN("DuplicateHandle failed for pid {}: {}", Handle.Pid(), GetLastError());
+ return;
+ }
+
+ // object_handle takes ownership of the handle
+ m_ObjectHandle = std::make_unique<asio::windows::object_handle>(m_IoContext, m_DuplicatedHandle);
+
+ m_ObjectHandle->async_wait([this, DupHandle = m_DuplicatedHandle, Callback = std::move(OnExit)](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+
+ DWORD ExitCode = 0;
+ GetExitCodeProcess(static_cast<HANDLE>(DupHandle), &ExitCode);
+ Callback(static_cast<int>(ExitCode));
+ });
+ }
+
+ void Cancel()
+ {
+ if (m_ObjectHandle)
+ {
+ asio::error_code Ec;
+ m_ObjectHandle->cancel(Ec);
+ m_ObjectHandle.reset(); // Closes the duplicated handle
+ m_DuplicatedHandle = nullptr;
+ }
+ else if (m_DuplicatedHandle)
+ {
+ CloseHandle(static_cast<HANDLE>(m_DuplicatedHandle));
+ m_DuplicatedHandle = nullptr;
+ }
+ }
+};
+
+// ============================================================================
+// macOS: kqueue EVFILT_PROC + stream_descriptor
+// ============================================================================
+
+#elif ZEN_PLATFORM_MAC
+
+struct ProcessExitWatcher::Impl
+{
+ asio::io_context& m_IoContext;
+ std::unique_ptr<asio::posix::stream_descriptor> m_Descriptor;
+ int m_KqueueFd = -1;
+ int m_Pid = 0;
+
+ explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {}
+
+ ~Impl() { Cancel(); }
+
+ void Watch(const ProcessHandle& Handle, std::function<void(int ExitCode)> OnExit)
+ {
+ m_Pid = Handle.Pid();
+
+ m_KqueueFd = kqueue();
+ if (m_KqueueFd < 0)
+ {
+ ZEN_WARN("kqueue() failed for pid {}: {}", m_Pid, strerror(errno));
+ return;
+ }
+
+ // Register interest in the process exit event
+ struct kevent Change;
+ EV_SET(&Change, static_cast<uintptr_t>(m_Pid), EVFILT_PROC, EV_ADD | EV_ONESHOT, NOTE_EXIT, 0, nullptr);
+
+ if (kevent(m_KqueueFd, &Change, 1, nullptr, 0, nullptr) < 0)
+ {
+ ZEN_WARN("kevent register failed for pid {}: {}", m_Pid, strerror(errno));
+ close(m_KqueueFd);
+ m_KqueueFd = -1;
+ return;
+ }
+
+ m_Descriptor = std::make_unique<asio::posix::stream_descriptor>(m_IoContext, m_KqueueFd);
+
+ m_Descriptor->async_wait(asio::posix::stream_descriptor::wait_read,
+ [this, Callback = std::move(OnExit)](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+
+ // Drain the kqueue event
+ struct kevent Event;
+ struct timespec Timeout = {0, 0};
+ kevent(m_KqueueFd, nullptr, 0, &Event, 1, &Timeout);
+
+ int ExitCode = -1;
+ int Status = 0;
+ if (waitpid(m_Pid, &Status, WNOHANG) > 0)
+ {
+ if (WIFEXITED(Status))
+ {
+ ExitCode = WEXITSTATUS(Status);
+ }
+ else if (WIFSIGNALED(Status))
+ {
+ constexpr int kSignalExitBase = 128;
+ ExitCode = kSignalExitBase + WTERMSIG(Status);
+ }
+ }
+
+ Callback(ExitCode);
+ });
+ }
+
+ void Cancel()
+ {
+ if (m_Descriptor)
+ {
+ asio::error_code Ec;
+ m_Descriptor->cancel(Ec);
+ m_Descriptor.reset();
+ // stream_descriptor closes the kqueue fd on destruction
+ m_KqueueFd = -1;
+ }
+ else if (m_KqueueFd >= 0)
+ {
+ close(m_KqueueFd);
+ m_KqueueFd = -1;
+ }
+ }
+};
+
+#endif
+
+// ============================================================================
+// Common wrapper (delegates to Impl)
+// ============================================================================
+
+ProcessExitWatcher::ProcessExitWatcher(asio::io_context& IoContext) : m_Impl(std::make_unique<Impl>(IoContext))
+{
+}
+
+ProcessExitWatcher::~ProcessExitWatcher() = default;
+
+void
+ProcessExitWatcher::Watch(const ProcessHandle& Handle, std::function<void(int ExitCode)> OnExit)
+{
+ m_Impl->Watch(Handle, std::move(OnExit));
+}
+
+void
+ProcessExitWatcher::Cancel()
+{
+ m_Impl->Cancel();
+}
+
+} // namespace zen
diff --git a/src/zenutil/process/exitwatcher.h b/src/zenutil/process/exitwatcher.h
new file mode 100644
index 000000000..24906d7d0
--- /dev/null
+++ b/src/zenutil/process/exitwatcher.h
@@ -0,0 +1,48 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/process.h>
+#include <zencore/zencore.h>
+
+#include <functional>
+#include <memory>
+
+namespace asio {
+class io_context;
+}
+
+namespace zen {
+
+/// Async process exit watcher.
+///
+/// Uses platform-specific mechanisms for scalable, non-polling exit detection:
+/// Linux: pidfd_open() + asio::posix::stream_descriptor
+/// Windows: asio::windows::object_handle
+/// macOS: kqueue EVFILT_PROC/NOTE_EXIT + asio::posix::stream_descriptor
+///
+/// The callback is invoked exactly once when the process exits, posted to the
+/// io_context. Call Cancel() to suppress the callback.
+class ProcessExitWatcher
+{
+public:
+ explicit ProcessExitWatcher(asio::io_context& IoContext);
+ ~ProcessExitWatcher();
+
+ ProcessExitWatcher(const ProcessExitWatcher&) = delete;
+ ProcessExitWatcher& operator=(const ProcessExitWatcher&) = delete;
+
+ /// Begin watching the given process. The callback is posted to the io_context
+ /// when the process exits. Only one Watch() may be active at a time.
+ void Watch(const ProcessHandle& Handle, std::function<void(int ExitCode)> OnExit);
+
+ /// Cancel any outstanding watch. The callback will not be invoked after this
+ /// returns. Safe to call if no watch is active.
+ void Cancel();
+
+private:
+ struct Impl;
+ std::unique_ptr<Impl> m_Impl;
+};
+
+} // namespace zen
diff --git a/src/zenutil/process/subprocessmanager.cpp b/src/zenutil/process/subprocessmanager.cpp
new file mode 100644
index 000000000..3a91b0a61
--- /dev/null
+++ b/src/zenutil/process/subprocessmanager.cpp
@@ -0,0 +1,1811 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/process/subprocessmanager.h>
+
+#include "asyncpipereader.h"
+#include "exitwatcher.h"
+
+#include <zencore/logging.h>
+#include <zencore/thread.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+
+#include <algorithm>
+#include <atomic>
+#include <numeric>
+#include <random>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+#else
+# include <csignal>
+#endif
+#include <asio/io_context.hpp>
+#include <asio/steady_timer.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+// ============================================================================
+// ManagedProcess::Impl
+// ============================================================================
+
+struct ManagedProcess::Impl
+{
+ asio::io_context& m_IoContext;
+ ProcessHandle m_Handle;
+ ProcessExitWatcher m_ExitWatcher;
+ ProcessExitCallback m_ExitCallback;
+ std::atomic<bool> m_Exited{false};
+
+ // Stdout capture
+ std::unique_ptr<AsyncPipeReader> m_StdoutReader;
+ ProcessDataCallback m_StdoutCallback;
+ mutable RwLock m_StdoutLock;
+ std::string m_CapturedStdout;
+
+ // Stderr capture
+ std::unique_ptr<AsyncPipeReader> m_StderrReader;
+ ProcessDataCallback m_StderrCallback;
+ mutable RwLock m_StderrLock;
+ std::string m_CapturedStderr;
+
+ // Metrics
+ ProcessMetrics m_LastMetrics;
+ std::atomic<float> m_CpuUsagePercent{-1.0f};
+ uint64_t m_PrevUserTimeMs = 0;
+ uint64_t m_PrevKernelTimeMs = 0;
+ uint64_t m_PrevSampleTicks = 0;
+
+ // User tag
+ std::string m_Tag;
+
+ explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext), m_ExitWatcher(IoContext) {}
+
+ void OnStdoutData(ManagedProcess& Self, std::string_view Data)
+ {
+ if (m_StdoutCallback)
+ {
+ m_StdoutCallback(Self, Data);
+ }
+ else
+ {
+ RwLock::ExclusiveLockScope $(m_StdoutLock);
+ m_CapturedStdout.append(Data);
+ }
+ }
+
+ void OnStderrData(ManagedProcess& Self, std::string_view Data)
+ {
+ if (m_StderrCallback)
+ {
+ m_StderrCallback(Self, Data);
+ }
+ else
+ {
+ RwLock::ExclusiveLockScope $(m_StderrLock);
+ m_CapturedStderr.append(Data);
+ }
+ }
+
+ void SampleMetrics()
+ {
+ if (m_Exited.load())
+ {
+ return;
+ }
+
+ ProcessMetrics Metrics;
+ GetProcessMetrics(m_Handle, Metrics);
+
+ uint64_t NowTicks = GetHifreqTimerValue();
+
+ if (m_PrevSampleTicks > 0)
+ {
+ uint64_t ElapsedMs = Stopwatch::GetElapsedTimeMs(NowTicks - m_PrevSampleTicks);
+ uint64_t DeltaCpuTimeMs = (Metrics.UserTimeMs + Metrics.KernelTimeMs) - (m_PrevUserTimeMs + m_PrevKernelTimeMs);
+ if (ElapsedMs > 0)
+ {
+ m_CpuUsagePercent.store(static_cast<float>(static_cast<double>(DeltaCpuTimeMs) / ElapsedMs * 100.0));
+ }
+ }
+
+ m_PrevUserTimeMs = Metrics.UserTimeMs;
+ m_PrevKernelTimeMs = Metrics.KernelTimeMs;
+ m_PrevSampleTicks = NowTicks;
+ m_LastMetrics = Metrics;
+ }
+
+ [[nodiscard]] int Pid() const { return m_Handle.Pid(); }
+
+ [[nodiscard]] bool IsRunning() const { return !m_Exited.load() && m_Handle.IsValid() && m_Handle.IsRunning(); }
+
+ [[nodiscard]] std::string GetCapturedStdout() const
+ {
+ RwLock::SharedLockScope $(m_StdoutLock);
+ return m_CapturedStdout;
+ }
+
+ [[nodiscard]] std::string GetCapturedStderr() const
+ {
+ RwLock::SharedLockScope $(m_StderrLock);
+ return m_CapturedStderr;
+ }
+
+ void CancelAll()
+ {
+ m_ExitWatcher.Cancel();
+ if (m_StdoutReader)
+ {
+ m_StdoutReader->Stop();
+ }
+ if (m_StderrReader)
+ {
+ m_StderrReader->Stop();
+ }
+ }
+};
+
+// ============================================================================
+// ManagedProcess
+// ============================================================================
+
+ManagedProcess::ManagedProcess(std::unique_ptr<Impl> InImpl) : m_Impl(std::move(InImpl))
+{
+}
+
+ManagedProcess::~ManagedProcess()
+{
+ if (m_Impl)
+ {
+ m_Impl->CancelAll();
+ }
+}
+
+int
+ManagedProcess::Pid() const
+{
+ return m_Impl->Pid();
+}
+
+bool
+ManagedProcess::IsRunning() const
+{
+ return m_Impl->IsRunning();
+}
+
+const ProcessHandle&
+ManagedProcess::GetHandle() const
+{
+ return m_Impl->m_Handle;
+}
+
+ProcessMetrics
+ManagedProcess::GetLatestMetrics() const
+{
+ return m_Impl->m_LastMetrics;
+}
+
+float
+ManagedProcess::GetCpuUsagePercent() const
+{
+ return m_Impl->m_CpuUsagePercent.load();
+}
+
+void
+ManagedProcess::SetStdoutCallback(ProcessDataCallback Callback)
+{
+ m_Impl->m_StdoutCallback = std::move(Callback);
+}
+
+void
+ManagedProcess::SetStderrCallback(ProcessDataCallback Callback)
+{
+ m_Impl->m_StderrCallback = std::move(Callback);
+}
+
+std::string
+ManagedProcess::GetCapturedStdout() const
+{
+ return m_Impl->GetCapturedStdout();
+}
+
+std::string
+ManagedProcess::GetCapturedStderr() const
+{
+ return m_Impl->GetCapturedStderr();
+}
+
+bool
+ManagedProcess::Kill()
+{
+ return m_Impl->m_Handle.Kill();
+}
+
+bool
+ManagedProcess::Terminate(int ExitCode)
+{
+ return m_Impl->m_Handle.Terminate(ExitCode);
+}
+
+void
+ManagedProcess::SetTag(std::string Tag)
+{
+ m_Impl->m_Tag = std::move(Tag);
+}
+
+std::string_view
+ManagedProcess::GetTag() const
+{
+ return m_Impl->m_Tag;
+}
+
+// ============================================================================
+// SubprocessManager::Impl
+// ============================================================================
+
+struct SubprocessManager::Impl
+{
+ asio::io_context& m_IoContext;
+ SubprocessManagerConfig m_Config;
+
+ // Ungrouped processes
+ mutable RwLock m_Lock;
+ std::unordered_map<int, std::unique_ptr<ManagedProcess>> m_Processes;
+
+ // Groups
+ mutable RwLock m_GroupsLock;
+ std::unordered_map<std::string, std::unique_ptr<ProcessGroup>> m_Groups;
+
+ // Cross-group metrics index: all pids (grouped + ungrouped) for round-robin sampling
+ mutable RwLock m_MetricsLock;
+ std::unordered_map<int, ManagedProcess*> m_AllProcesses; // non-owning
+ std::vector<int> m_KeyOrder;
+ size_t m_NextSampleIndex = 0;
+
+ ProcessDataCallback m_DefaultStdoutCallback;
+ ProcessDataCallback m_DefaultStderrCallback;
+
+ std::unique_ptr<asio::steady_timer> m_MetricsTimer;
+ std::atomic<bool> m_Running{true};
+
+ explicit Impl(asio::io_context& IoContext, SubprocessManagerConfig Config);
+ ~Impl();
+
+ ManagedProcess* AddProcess(std::unique_ptr<ManagedProcess> Process);
+ void RegisterForMetrics(int Pid, ManagedProcess* Ptr);
+ void UnregisterFromMetrics(int Pid);
+ ManagedProcess* FindProcess(int Pid) const;
+
+ void SetupExitWatcher(ManagedProcess* Proc, ProcessExitCallback OnExit);
+ void SetupStdoutReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe);
+ void SetupStderrReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe);
+
+ ManagedProcess* Spawn(const std::filesystem::path& Executable,
+ std::string_view CommandLine,
+ CreateProcOptions& Options,
+ ProcessExitCallback OnExit);
+ ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit);
+ void Remove(int Pid);
+ void RemoveAll();
+
+ void SetDefaultStdoutCallback(ProcessDataCallback Callback) { m_DefaultStdoutCallback = std::move(Callback); }
+ void SetDefaultStderrCallback(ProcessDataCallback Callback) { m_DefaultStderrCallback = std::move(Callback); }
+
+ void EnqueueMetricsTimer();
+ void SampleBatch();
+ std::vector<TrackedProcessEntry> GetMetricsSnapshot() const;
+ AggregateProcessMetrics GetAggregateMetrics() const;
+ [[nodiscard]] size_t GetProcessCount() const;
+ void Enumerate(std::function<void(const ManagedProcess&)> Callback) const;
+
+ ProcessGroup* CreateGroup(std::string Name);
+ void DestroyGroup(std::string_view Name);
+ ProcessGroup* FindGroup(std::string_view Name) const;
+ void EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const;
+};
+
+// ============================================================================
+// SubprocessManager::Impl method definitions
+// ============================================================================
+
+SubprocessManager::Impl::Impl(asio::io_context& IoContext, SubprocessManagerConfig Config) : m_IoContext(IoContext), m_Config(Config)
+{
+ if (m_Config.MetricsSampleIntervalMs > 0)
+ {
+ m_MetricsTimer = std::make_unique<asio::steady_timer>(IoContext);
+ EnqueueMetricsTimer();
+ }
+}
+
+SubprocessManager::Impl::~Impl()
+{
+ m_Running = false;
+ if (m_MetricsTimer)
+ {
+ m_MetricsTimer->cancel();
+ }
+
+ // Destroy groups first (they reference m_Manager back to us)
+ {
+ RwLock::ExclusiveLockScope $(m_GroupsLock);
+ m_Groups.clear();
+ }
+
+ RemoveAll();
+}
+
+ManagedProcess*
+SubprocessManager::Impl::AddProcess(std::unique_ptr<ManagedProcess> Process)
+{
+ int Pid = Process->Pid();
+ ManagedProcess* Ptr = Process.get();
+
+ {
+ RwLock::ExclusiveLockScope $(m_Lock);
+ m_Processes[Pid] = std::move(Process);
+ }
+
+ RegisterForMetrics(Pid, Ptr);
+ return Ptr;
+}
+
+void
+SubprocessManager::Impl::RegisterForMetrics(int Pid, ManagedProcess* Ptr)
+{
+ RwLock::ExclusiveLockScope $(m_MetricsLock);
+ m_AllProcesses[Pid] = Ptr;
+ m_KeyOrder.push_back(Pid);
+}
+
+void
+SubprocessManager::Impl::UnregisterFromMetrics(int Pid)
+{
+ RwLock::ExclusiveLockScope $(m_MetricsLock);
+ m_AllProcesses.erase(Pid);
+ m_KeyOrder.erase(std::remove(m_KeyOrder.begin(), m_KeyOrder.end(), Pid), m_KeyOrder.end());
+ if (m_NextSampleIndex >= m_KeyOrder.size())
+ {
+ m_NextSampleIndex = 0;
+ }
+}
+
+ManagedProcess*
+SubprocessManager::Impl::FindProcess(int Pid) const
+{
+ RwLock::SharedLockScope $(m_MetricsLock);
+ auto It = m_AllProcesses.find(Pid);
+ if (It != m_AllProcesses.end())
+ {
+ return It->second;
+ }
+ return nullptr;
+}
+
+void
+SubprocessManager::Impl::SetupExitWatcher(ManagedProcess* Proc, ProcessExitCallback OnExit)
+{
+ int Pid = Proc->Pid();
+
+ Proc->m_Impl->m_ExitWatcher.Watch(Proc->m_Impl->m_Handle, [this, Pid, Callback = std::move(OnExit)](int ExitCode) {
+ ManagedProcess* Found = FindProcess(Pid);
+
+ if (Found)
+ {
+ Found->m_Impl->m_Exited.store(true);
+ Callback(*Found, ExitCode);
+ }
+ });
+}
+
+void
+SubprocessManager::Impl::SetupStdoutReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe)
+{
+ int Pid = Proc->Pid();
+ Proc->m_Impl->m_StdoutReader = std::make_unique<AsyncPipeReader>(m_IoContext);
+ Proc->m_Impl->m_StdoutReader->Start(
+ std::move(Pipe),
+ [this, Pid](std::string_view Data) {
+ ManagedProcess* Found = FindProcess(Pid);
+ if (Found)
+ {
+ if (Found->m_Impl->m_StdoutCallback)
+ {
+ Found->m_Impl->m_StdoutCallback(*Found, Data);
+ }
+ else if (m_DefaultStdoutCallback)
+ {
+ m_DefaultStdoutCallback(*Found, Data);
+ }
+ else
+ {
+ Found->m_Impl->OnStdoutData(*Found, Data);
+ }
+ }
+ },
+ [] {});
+}
+
+void
+SubprocessManager::Impl::SetupStderrReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe)
+{
+ int Pid = Proc->Pid();
+ Proc->m_Impl->m_StderrReader = std::make_unique<AsyncPipeReader>(m_IoContext);
+ Proc->m_Impl->m_StderrReader->Start(
+ std::move(Pipe),
+ [this, Pid](std::string_view Data) {
+ ManagedProcess* Found = FindProcess(Pid);
+ if (Found)
+ {
+ if (Found->m_Impl->m_StderrCallback)
+ {
+ Found->m_Impl->m_StderrCallback(*Found, Data);
+ }
+ else if (m_DefaultStderrCallback)
+ {
+ m_DefaultStderrCallback(*Found, Data);
+ }
+ else
+ {
+ Found->m_Impl->OnStderrData(*Found, Data);
+ }
+ }
+ },
+ [] {});
+}
+
+ManagedProcess*
+SubprocessManager::Impl::Spawn(const std::filesystem::path& Executable,
+ std::string_view CommandLine,
+ CreateProcOptions& Options,
+ ProcessExitCallback OnExit)
+{
+ bool HasStdout = Options.StdoutPipe != nullptr;
+ bool HasStderr = Options.StderrPipe != nullptr;
+
+ CreateProcResult Result = CreateProc(Executable, CommandLine, Options);
+
+ auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_IoContext);
+#if ZEN_PLATFORM_WINDOWS
+ ImplPtr->m_Handle.Initialize(Result);
+#else
+ ImplPtr->m_Handle.Initialize(static_cast<int>(Result));
+#endif
+
+ auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr)));
+
+ ManagedProcess* Ptr = AddProcess(std::move(Proc));
+ SetupExitWatcher(Ptr, std::move(OnExit));
+
+ if (HasStdout)
+ {
+ SetupStdoutReader(Ptr, std::move(*Options.StdoutPipe));
+ }
+ if (HasStderr)
+ {
+ SetupStderrReader(Ptr, std::move(*Options.StderrPipe));
+ }
+
+ return Ptr;
+}
+
+ManagedProcess*
+SubprocessManager::Impl::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit)
+{
+ int Pid = Handle.Pid();
+
+ auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_IoContext);
+ ImplPtr->m_Handle.Initialize(Pid);
+
+ // Reset the original handle so caller doesn't double-close
+ Handle.Reset();
+
+ auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr)));
+
+ ManagedProcess* Ptr = AddProcess(std::move(Proc));
+ SetupExitWatcher(Ptr, std::move(OnExit));
+
+ return Ptr;
+}
+
+void
+SubprocessManager::Impl::Remove(int Pid)
+{
+ UnregisterFromMetrics(Pid);
+
+ RwLock::ExclusiveLockScope $(m_Lock);
+ auto It = m_Processes.find(Pid);
+ if (It != m_Processes.end())
+ {
+ It->second->m_Impl->CancelAll();
+ m_Processes.erase(It);
+ }
+}
+
+void
+SubprocessManager::Impl::RemoveAll()
+{
+ {
+ RwLock::ExclusiveLockScope $(m_Lock);
+ for (auto& [Pid, Proc] : m_Processes)
+ {
+ Proc->m_Impl->CancelAll();
+ }
+ m_Processes.clear();
+ }
+
+ {
+ RwLock::ExclusiveLockScope $(m_MetricsLock);
+ m_AllProcesses.clear();
+ m_KeyOrder.clear();
+ m_NextSampleIndex = 0;
+ }
+}
+
+void
+SubprocessManager::Impl::EnqueueMetricsTimer()
+{
+ if (!m_MetricsTimer || !m_Running.load())
+ {
+ return;
+ }
+
+ m_MetricsTimer->expires_after(std::chrono::milliseconds(m_Config.MetricsSampleIntervalMs));
+ m_MetricsTimer->async_wait([this](const asio::error_code& Ec) {
+ if (Ec || !m_Running.load())
+ {
+ return;
+ }
+
+ SampleBatch();
+ EnqueueMetricsTimer();
+ });
+}
+
+void
+SubprocessManager::Impl::SampleBatch()
+{
+ RwLock::SharedLockScope $(m_MetricsLock);
+
+ if (m_KeyOrder.empty())
+ {
+ return;
+ }
+
+ size_t Remaining = std::min(static_cast<size_t>(m_Config.MetricsBatchSize), m_KeyOrder.size());
+
+ while (Remaining > 0)
+ {
+ if (m_NextSampleIndex >= m_KeyOrder.size())
+ {
+ m_NextSampleIndex = 0;
+ }
+
+ int Pid = m_KeyOrder[m_NextSampleIndex];
+ auto It = m_AllProcesses.find(Pid);
+
+ if (It != m_AllProcesses.end())
+ {
+ It->second->m_Impl->SampleMetrics();
+ }
+
+ m_NextSampleIndex++;
+ Remaining--;
+ }
+}
+
+std::vector<TrackedProcessEntry>
+SubprocessManager::Impl::GetMetricsSnapshot() const
+{
+ std::vector<TrackedProcessEntry> Result;
+
+ RwLock::SharedLockScope $(m_MetricsLock);
+ Result.reserve(m_AllProcesses.size());
+
+ for (const auto& [Pid, Proc] : m_AllProcesses)
+ {
+ TrackedProcessEntry Entry;
+ Entry.Pid = Pid;
+ Entry.Metrics = Proc->m_Impl->m_LastMetrics;
+ Entry.CpuUsagePercent = Proc->m_Impl->m_CpuUsagePercent.load();
+ Result.push_back(std::move(Entry));
+ }
+
+ return Result;
+}
+
+AggregateProcessMetrics
+SubprocessManager::Impl::GetAggregateMetrics() const
+{
+ AggregateProcessMetrics Agg;
+
+ RwLock::SharedLockScope $(m_MetricsLock);
+
+ for (const auto& [Pid, Proc] : m_AllProcesses)
+ {
+ const ProcessMetrics& M = Proc->m_Impl->m_LastMetrics;
+ Agg.TotalWorkingSetSize += M.WorkingSetSize;
+ Agg.TotalPeakWorkingSetSize += M.PeakWorkingSetSize;
+ Agg.TotalUserTimeMs += M.UserTimeMs;
+ Agg.TotalKernelTimeMs += M.KernelTimeMs;
+ Agg.ProcessCount++;
+ }
+
+ return Agg;
+}
+
+size_t
+SubprocessManager::Impl::GetProcessCount() const
+{
+ RwLock::SharedLockScope $(m_MetricsLock);
+ return m_AllProcesses.size();
+}
+
+void
+SubprocessManager::Impl::Enumerate(std::function<void(const ManagedProcess&)> Callback) const
+{
+ RwLock::SharedLockScope $(m_MetricsLock);
+ for (const auto& [Pid, Proc] : m_AllProcesses)
+ {
+ Callback(*Proc);
+ }
+}
+
+ProcessGroup*
+SubprocessManager::Impl::CreateGroup(std::string Name)
+{
+ auto GroupImpl = std::make_unique<ProcessGroup::Impl>(std::move(Name), *this, m_IoContext);
+ ProcessGroup* Ptr = nullptr;
+
+ auto Group = std::unique_ptr<ProcessGroup>(new ProcessGroup(std::move(GroupImpl)));
+ Ptr = Group.get();
+
+ RwLock::ExclusiveLockScope $(m_GroupsLock);
+ m_Groups[std::string(Ptr->GetName())] = std::move(Group);
+
+ return Ptr;
+}
+
+void
+SubprocessManager::Impl::DestroyGroup(std::string_view Name)
+{
+ RwLock::ExclusiveLockScope $(m_GroupsLock);
+ auto It = m_Groups.find(std::string(Name));
+ if (It != m_Groups.end())
+ {
+ It->second->KillAll();
+ m_Groups.erase(It);
+ }
+}
+
+ProcessGroup*
+SubprocessManager::Impl::FindGroup(std::string_view Name) const
+{
+ RwLock::SharedLockScope $(m_GroupsLock);
+ auto It = m_Groups.find(std::string(Name));
+ if (It != m_Groups.end())
+ {
+ return It->second.get();
+ }
+ return nullptr;
+}
+
+void
+SubprocessManager::Impl::EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const
+{
+ RwLock::SharedLockScope $(m_GroupsLock);
+ for (const auto& [Name, Group] : m_Groups)
+ {
+ Callback(*Group);
+ }
+}
+
+// ============================================================================
+// SubprocessManager
+// ============================================================================
+
+SubprocessManager::SubprocessManager(asio::io_context& IoContext, SubprocessManagerConfig Config)
+: m_Impl(std::make_unique<Impl>(IoContext, Config))
+{
+}
+
+SubprocessManager::~SubprocessManager() = default;
+
+ManagedProcess*
+SubprocessManager::Spawn(const std::filesystem::path& Executable,
+ std::string_view CommandLine,
+ CreateProcOptions& Options,
+ ProcessExitCallback OnExit)
+{
+ ZEN_TRACE_CPU("SubprocessManager::Spawn");
+ return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit));
+}
+
+ManagedProcess*
+SubprocessManager::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit)
+{
+ ZEN_TRACE_CPU("SubprocessManager::Adopt");
+ return m_Impl->Adopt(std::move(Handle), std::move(OnExit));
+}
+
+void
+SubprocessManager::Remove(int Pid)
+{
+ ZEN_TRACE_CPU("SubprocessManager::Remove");
+ m_Impl->Remove(Pid);
+}
+
+void
+SubprocessManager::RemoveAll()
+{
+ ZEN_TRACE_CPU("SubprocessManager::RemoveAll");
+ m_Impl->RemoveAll();
+}
+
+void
+SubprocessManager::SetDefaultStdoutCallback(ProcessDataCallback Callback)
+{
+ m_Impl->SetDefaultStdoutCallback(std::move(Callback));
+}
+
+void
+SubprocessManager::SetDefaultStderrCallback(ProcessDataCallback Callback)
+{
+ m_Impl->SetDefaultStderrCallback(std::move(Callback));
+}
+
+std::vector<TrackedProcessEntry>
+SubprocessManager::GetMetricsSnapshot() const
+{
+ return m_Impl->GetMetricsSnapshot();
+}
+
+AggregateProcessMetrics
+SubprocessManager::GetAggregateMetrics() const
+{
+ return m_Impl->GetAggregateMetrics();
+}
+
+size_t
+SubprocessManager::GetProcessCount() const
+{
+ return m_Impl->GetProcessCount();
+}
+
+void
+SubprocessManager::Enumerate(std::function<void(const ManagedProcess&)> Callback) const
+{
+ m_Impl->Enumerate(std::move(Callback));
+}
+
+ProcessGroup*
+SubprocessManager::CreateGroup(std::string Name)
+{
+ ZEN_TRACE_CPU("SubprocessManager::CreateGroup");
+ return m_Impl->CreateGroup(std::move(Name));
+}
+
+void
+SubprocessManager::DestroyGroup(std::string_view Name)
+{
+ ZEN_TRACE_CPU("SubprocessManager::DestroyGroup");
+ m_Impl->DestroyGroup(Name);
+}
+
+ProcessGroup*
+SubprocessManager::FindGroup(std::string_view Name) const
+{
+ return m_Impl->FindGroup(Name);
+}
+
+void
+SubprocessManager::EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const
+{
+ m_Impl->EnumerateGroups(std::move(Callback));
+}
+
+// ============================================================================
+// ProcessGroup::Impl
+// ============================================================================
+
+struct ProcessGroup::Impl
+{
+ std::string m_Name;
+ SubprocessManager::Impl& m_Manager;
+ asio::io_context& m_IoContext;
+
+ mutable RwLock m_Lock;
+ std::unordered_map<int, std::unique_ptr<ManagedProcess>> m_Processes;
+
+#if ZEN_PLATFORM_WINDOWS
+ JobObject m_JobObject;
+#else
+ int m_Pgid = 0;
+#endif
+
+ Impl(std::string Name, SubprocessManager::Impl& Manager, asio::io_context& IoContext);
+ ~Impl();
+
+ ManagedProcess* AddProcess(std::unique_ptr<ManagedProcess> Process);
+
+ ManagedProcess* Spawn(const std::filesystem::path& Executable,
+ std::string_view CommandLine,
+ CreateProcOptions& Options,
+ ProcessExitCallback OnExit);
+ ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit);
+ void Remove(int Pid);
+ void KillAll();
+
+ AggregateProcessMetrics GetAggregateMetrics() const;
+ std::vector<TrackedProcessEntry> GetMetricsSnapshot() const;
+ [[nodiscard]] size_t GetProcessCount() const;
+ void Enumerate(std::function<void(const ManagedProcess&)> Callback) const;
+};
+
+// ============================================================================
+// ProcessGroup::Impl method definitions
+// ============================================================================
+
+ProcessGroup::Impl::Impl(std::string Name, SubprocessManager::Impl& Manager, asio::io_context& IoContext)
+: m_Name(std::move(Name))
+, m_Manager(Manager)
+, m_IoContext(IoContext)
+{
+#if ZEN_PLATFORM_WINDOWS
+ m_JobObject.Initialize();
+#endif
+}
+
+ProcessGroup::Impl::~Impl()
+{
+ KillAll();
+}
+
+ManagedProcess*
+ProcessGroup::Impl::AddProcess(std::unique_ptr<ManagedProcess> Process)
+{
+ int Pid = Process->Pid();
+ ManagedProcess* Ptr = Process.get();
+
+ {
+ RwLock::ExclusiveLockScope $(m_Lock);
+ m_Processes[Pid] = std::move(Process);
+ }
+
+ m_Manager.RegisterForMetrics(Pid, Ptr);
+ return Ptr;
+}
+
+ManagedProcess*
+ProcessGroup::Impl::Spawn(const std::filesystem::path& Executable,
+ std::string_view CommandLine,
+ CreateProcOptions& Options,
+ ProcessExitCallback OnExit)
+{
+ bool HasStdout = Options.StdoutPipe != nullptr;
+ bool HasStderr = Options.StderrPipe != nullptr;
+
+#if ZEN_PLATFORM_WINDOWS
+ if (m_JobObject.IsValid())
+ {
+ Options.AssignToJob = &m_JobObject;
+ }
+#else
+ if (m_Pgid > 0)
+ {
+ Options.ProcessGroupId = m_Pgid;
+ }
+#endif
+
+ CreateProcResult Result = CreateProc(Executable, CommandLine, Options);
+
+ auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_IoContext);
+#if ZEN_PLATFORM_WINDOWS
+ ImplPtr->m_Handle.Initialize(Result);
+#else
+ int Pid = static_cast<int>(Result);
+ ImplPtr->m_Handle.Initialize(Pid);
+
+ // First process becomes the group leader
+ if (m_Pgid == 0)
+ {
+ m_Pgid = Pid;
+ }
+#endif
+
+ auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr)));
+
+ ManagedProcess* Ptr = AddProcess(std::move(Proc));
+ m_Manager.SetupExitWatcher(Ptr, std::move(OnExit));
+
+ if (HasStdout)
+ {
+ m_Manager.SetupStdoutReader(Ptr, std::move(*Options.StdoutPipe));
+ }
+ if (HasStderr)
+ {
+ m_Manager.SetupStderrReader(Ptr, std::move(*Options.StderrPipe));
+ }
+
+ return Ptr;
+}
+
+ManagedProcess*
+ProcessGroup::Impl::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit)
+{
+ int Pid = Handle.Pid();
+
+ auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_IoContext);
+ ImplPtr->m_Handle.Initialize(Pid);
+ Handle.Reset();
+
+#if ZEN_PLATFORM_WINDOWS
+ if (m_JobObject.IsValid())
+ {
+ m_JobObject.AssignProcess(ImplPtr->m_Handle.Handle());
+ }
+#endif
+
+ auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr)));
+
+ ManagedProcess* Ptr = AddProcess(std::move(Proc));
+ m_Manager.SetupExitWatcher(Ptr, std::move(OnExit));
+
+ return Ptr;
+}
+
+void
+ProcessGroup::Impl::Remove(int Pid)
+{
+ m_Manager.UnregisterFromMetrics(Pid);
+
+ RwLock::ExclusiveLockScope $(m_Lock);
+ auto It = m_Processes.find(Pid);
+ if (It != m_Processes.end())
+ {
+ It->second->m_Impl->CancelAll();
+ m_Processes.erase(It);
+ }
+}
+
+void
+ProcessGroup::Impl::KillAll()
+{
+#if ZEN_PLATFORM_WINDOWS
+ if (m_JobObject.IsValid())
+ {
+ TerminateJobObject(static_cast<HANDLE>(m_JobObject.Handle()), 1);
+ }
+#else
+ if (m_Pgid > 0)
+ {
+ kill(-m_Pgid, SIGTERM);
+ }
+#endif
+ // Also kill individually as fallback and clean up
+ RwLock::ExclusiveLockScope $(m_Lock);
+ for (auto& [Pid, Proc] : m_Processes)
+ {
+ if (Proc->IsRunning())
+ {
+ Proc->Kill();
+ }
+ m_Manager.UnregisterFromMetrics(Pid);
+ Proc->m_Impl->CancelAll();
+ }
+ m_Processes.clear();
+}
+
+AggregateProcessMetrics
+ProcessGroup::Impl::GetAggregateMetrics() const
+{
+ AggregateProcessMetrics Agg;
+
+ RwLock::SharedLockScope $(m_Lock);
+
+ for (const auto& [Pid, Proc] : m_Processes)
+ {
+ const ProcessMetrics& M = Proc->m_Impl->m_LastMetrics;
+ Agg.TotalWorkingSetSize += M.WorkingSetSize;
+ Agg.TotalPeakWorkingSetSize += M.PeakWorkingSetSize;
+ Agg.TotalUserTimeMs += M.UserTimeMs;
+ Agg.TotalKernelTimeMs += M.KernelTimeMs;
+ Agg.ProcessCount++;
+ }
+
+ return Agg;
+}
+
+std::vector<TrackedProcessEntry>
+ProcessGroup::Impl::GetMetricsSnapshot() const
+{
+ std::vector<TrackedProcessEntry> Result;
+
+ RwLock::SharedLockScope $(m_Lock);
+ Result.reserve(m_Processes.size());
+
+ for (const auto& [Pid, Proc] : m_Processes)
+ {
+ TrackedProcessEntry Entry;
+ Entry.Pid = Pid;
+ Entry.Metrics = Proc->m_Impl->m_LastMetrics;
+ Entry.CpuUsagePercent = Proc->m_Impl->m_CpuUsagePercent.load();
+ Result.push_back(std::move(Entry));
+ }
+
+ return Result;
+}
+
+size_t
+ProcessGroup::Impl::GetProcessCount() const
+{
+ RwLock::SharedLockScope $(m_Lock);
+ return m_Processes.size();
+}
+
+void
+ProcessGroup::Impl::Enumerate(std::function<void(const ManagedProcess&)> Callback) const
+{
+ RwLock::SharedLockScope $(m_Lock);
+ for (const auto& [Pid, Proc] : m_Processes)
+ {
+ Callback(*Proc);
+ }
+}
+
+// ============================================================================
+// ProcessGroup
+// ============================================================================
+
+ProcessGroup::ProcessGroup(std::unique_ptr<Impl> InImpl) : m_Impl(std::move(InImpl))
+{
+}
+
+ProcessGroup::~ProcessGroup() = default;
+
+std::string_view
+ProcessGroup::GetName() const
+{
+ return m_Impl->m_Name;
+}
+
+ManagedProcess*
+ProcessGroup::Spawn(const std::filesystem::path& Executable,
+ std::string_view CommandLine,
+ CreateProcOptions& Options,
+ ProcessExitCallback OnExit)
+{
+ ZEN_TRACE_CPU("ProcessGroup::Spawn");
+ return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit));
+}
+
+ManagedProcess*
+ProcessGroup::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit)
+{
+ ZEN_TRACE_CPU("ProcessGroup::Adopt");
+ return m_Impl->Adopt(std::move(Handle), std::move(OnExit));
+}
+
+void
+ProcessGroup::Remove(int Pid)
+{
+ ZEN_TRACE_CPU("ProcessGroup::Remove");
+ m_Impl->Remove(Pid);
+}
+
+void
+ProcessGroup::KillAll()
+{
+ ZEN_TRACE_CPU("ProcessGroup::KillAll");
+ m_Impl->KillAll();
+}
+
+AggregateProcessMetrics
+ProcessGroup::GetAggregateMetrics() const
+{
+ return m_Impl->GetAggregateMetrics();
+}
+
+std::vector<TrackedProcessEntry>
+ProcessGroup::GetMetricsSnapshot() const
+{
+ return m_Impl->GetMetricsSnapshot();
+}
+
+size_t
+ProcessGroup::GetProcessCount() const
+{
+ return m_Impl->GetProcessCount();
+}
+
+void
+ProcessGroup::Enumerate(std::function<void(const ManagedProcess&)> Callback) const
+{
+ m_Impl->Enumerate(std::move(Callback));
+}
+
+} // namespace zen
+
+// ============================================================================
+// Tests
+// ============================================================================
+
+#if ZEN_WITH_TESTS
+
+# include <zencore/testing.h>
+
+# include <chrono>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <asio/io_context.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+using namespace zen;
+using namespace std::literals;
+
+void
+zen::subprocessmanager_forcelink()
+{
+}
+
+namespace {
+
+std::filesystem::path
+GetAppStubPath()
+{
+ std::error_code Ec;
+ std::filesystem::path SelfPath = GetProcessExecutablePath(zen::GetCurrentProcessId(), Ec);
+ return SelfPath.parent_path() / "zentest-appstub" ZEN_EXE_SUFFIX_LITERAL;
+}
+
+} // namespace
+
+TEST_SUITE_BEGIN("util.subprocessmanager");
+
+TEST_CASE("SubprocessManager.SpawnAndDetectExit")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ std::filesystem::path AppStub = GetAppStubPath();
+ std::string CmdLine = AppStub.string() + " -f=42";
+
+ int ReceivedExitCode = -1;
+ bool CallbackFired = false;
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int ExitCode) {
+ ReceivedExitCode = ExitCode;
+ CallbackFired = true;
+ });
+
+ IoContext.run_for(5s);
+
+ CHECK(CallbackFired);
+ CHECK(ReceivedExitCode == 42);
+}
+
+TEST_CASE("SubprocessManager.SpawnAndDetectCleanExit")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ std::filesystem::path AppStub = GetAppStubPath();
+ std::string CmdLine = AppStub.string();
+
+ int ReceivedExitCode = -1;
+ bool CallbackFired = false;
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int ExitCode) {
+ ReceivedExitCode = ExitCode;
+ CallbackFired = true;
+ });
+
+ IoContext.run_for(5s);
+
+ CHECK(CallbackFired);
+ CHECK(ReceivedExitCode == 0);
+}
+
+TEST_CASE("SubprocessManager.StdoutCapture")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ std::filesystem::path AppStub = GetAppStubPath();
+ std::string CmdLine = AppStub.string() + " -echo=hello_world";
+
+ StdoutPipeHandles StdoutPipe;
+ REQUIRE(CreateOverlappedStdoutPipe(StdoutPipe));
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+ Options.StdoutPipe = &StdoutPipe;
+
+ bool Exited = false;
+
+ ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; });
+
+ IoContext.run_for(5s);
+
+ CHECK(Exited);
+ std::string Captured = Proc->GetCapturedStdout();
+ CHECK(Captured.find("hello_world") != std::string::npos);
+}
+
+TEST_CASE("SubprocessManager.StderrCapture")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ std::filesystem::path AppStub = GetAppStubPath();
+ std::string CmdLine = AppStub.string() + " -echoerr=error_msg";
+
+ StdoutPipeHandles StdoutPipe;
+ StdoutPipeHandles StderrPipe;
+ REQUIRE(CreateOverlappedStdoutPipe(StdoutPipe));
+ REQUIRE(CreateOverlappedStdoutPipe(StderrPipe));
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+ Options.StdoutPipe = &StdoutPipe;
+ Options.StderrPipe = &StderrPipe;
+
+ bool Exited = false;
+
+ ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; });
+
+ IoContext.run_for(5s);
+
+ CHECK(Exited);
+ std::string CapturedErr = Proc->GetCapturedStderr();
+ CHECK(CapturedErr.find("error_msg") != std::string::npos);
+}
+
+TEST_CASE("SubprocessManager.StdoutCallback")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ std::filesystem::path AppStub = GetAppStubPath();
+ std::string CmdLine = AppStub.string() + " -echo=callback_test";
+
+ StdoutPipeHandles StdoutPipe;
+ REQUIRE(CreateOverlappedStdoutPipe(StdoutPipe));
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+ Options.StdoutPipe = &StdoutPipe;
+
+ std::string ReceivedData;
+ bool Exited = false;
+
+ ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; });
+
+ Proc->SetStdoutCallback([&](ManagedProcess&, std::string_view Data) { ReceivedData.append(Data); });
+
+ IoContext.run_for(5s);
+
+ CHECK(Exited);
+ CHECK(ReceivedData.find("callback_test") != std::string::npos);
+ // When a callback is set, accumulated buffer should be empty
+ CHECK(Proc->GetCapturedStdout().empty());
+}
+
+TEST_CASE("SubprocessManager.MetricsSampling")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 100, .MetricsBatchSize = 16});
+
+ std::filesystem::path AppStub = GetAppStubPath();
+ std::string CmdLine = AppStub.string() + " -t=2";
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ bool Exited = false;
+
+ ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; });
+
+ // Run for enough time to get metrics samples
+ IoContext.run_for(1s);
+
+ ProcessMetrics Metrics = Proc->GetLatestMetrics();
+ CHECK(Metrics.WorkingSetSize > 0);
+
+ auto Snapshot = Manager.GetMetricsSnapshot();
+ CHECK(Snapshot.size() == 1);
+
+ // Let it finish
+ IoContext.run_for(3s);
+ CHECK(Exited);
+}
+
+TEST_CASE("SubprocessManager.RemoveWhileRunning")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ std::filesystem::path AppStub = GetAppStubPath();
+ std::string CmdLine = AppStub.string() + " -t=10";
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ bool CallbackFired = false;
+
+ ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { CallbackFired = true; });
+
+ int Pid = Proc->Pid();
+
+ // Let it start
+ IoContext.run_for(100ms);
+
+ // Remove without killing — callback should NOT fire after this
+ Manager.Remove(Pid);
+
+ IoContext.run_for(500ms);
+
+ CHECK_FALSE(CallbackFired);
+ CHECK(Manager.GetProcessCount() == 0);
+}
+
+TEST_CASE("SubprocessManager.KillAndWaitForExit")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ std::filesystem::path AppStub = GetAppStubPath();
+ std::string CmdLine = AppStub.string() + " -t=60";
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ bool CallbackFired = false;
+
+ ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { CallbackFired = true; });
+
+ // Let it start
+ IoContext.run_for(200ms);
+
+ Proc->Kill();
+
+ IoContext.run_for(2s);
+
+ CHECK(CallbackFired);
+}
+
+TEST_CASE("SubprocessManager.AdoptProcess")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ std::filesystem::path AppStub = GetAppStubPath();
+ std::string CmdLine = AppStub.string() + " -f=7";
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ CreateProcResult Result = CreateProc(AppStub, CmdLine, Options);
+
+ int ReceivedExitCode = -1;
+
+ Manager.Adopt(ProcessHandle(Result), [&](ManagedProcess&, int ExitCode) { ReceivedExitCode = ExitCode; });
+
+ IoContext.run_for(5s);
+
+ CHECK(ReceivedExitCode == 7);
+}
+
+TEST_CASE("SubprocessManager.UserTag")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ std::filesystem::path AppStub = GetAppStubPath();
+ std::string CmdLine = AppStub.string() + " -f=0";
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ std::string ReceivedTag;
+
+ ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess& P, int) { ReceivedTag = std::string(P.GetTag()); });
+
+ Proc->SetTag("my-worker-1");
+ CHECK(Proc->GetTag() == "my-worker-1");
+
+ IoContext.run_for(5s);
+
+ CHECK(ReceivedTag == "my-worker-1");
+}
+
+TEST_CASE("ProcessGroup.SpawnAndMembership")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ ProcessGroup* Group = Manager.CreateGroup("test-group");
+ REQUIRE(Group != nullptr);
+ CHECK(Group->GetName() == "test-group");
+
+ std::filesystem::path AppStub = GetAppStubPath();
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ int ExitCount = 0;
+
+ std::string CmdLine1 = AppStub.string() + " -f=0";
+ std::string CmdLine2 = AppStub.string() + " -f=1";
+
+ Group->Spawn(AppStub, CmdLine1, Options, [&](ManagedProcess&, int) { ExitCount++; });
+ Group->Spawn(AppStub, CmdLine2, Options, [&](ManagedProcess&, int) { ExitCount++; });
+
+ CHECK(Group->GetProcessCount() == 2);
+ CHECK(Manager.GetProcessCount() == 2);
+
+ IoContext.run_for(5s);
+
+ CHECK(ExitCount == 2);
+}
+
+TEST_CASE("ProcessGroup.KillAll")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ ProcessGroup* Group = Manager.CreateGroup("kill-group");
+
+ std::filesystem::path AppStub = GetAppStubPath();
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ int ExitCount = 0;
+
+ std::string CmdLine = AppStub.string() + " -t=60";
+
+ Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { ExitCount++; });
+ Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { ExitCount++; });
+
+ // Let them start
+ IoContext.run_for(200ms);
+ CHECK(Group->GetProcessCount() == 2);
+
+ Group->KillAll();
+ CHECK(Group->GetProcessCount() == 0);
+}
+
+TEST_CASE("ProcessGroup.AggregateMetrics")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 100, .MetricsBatchSize = 16});
+
+ ProcessGroup* Group = Manager.CreateGroup("metrics-group");
+
+ std::filesystem::path AppStub = GetAppStubPath();
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ std::string CmdLine = AppStub.string() + " -t=3";
+
+ Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {});
+ Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {});
+
+ // Wait for metrics sampling
+ IoContext.run_for(1s);
+
+ AggregateProcessMetrics GroupAgg = Group->GetAggregateMetrics();
+ CHECK(GroupAgg.ProcessCount == 2);
+ CHECK(GroupAgg.TotalWorkingSetSize > 0);
+
+ // Manager-level metrics should include group processes
+ AggregateProcessMetrics ManagerAgg = Manager.GetAggregateMetrics();
+ CHECK(ManagerAgg.ProcessCount == 2);
+
+ Group->KillAll();
+}
+
+TEST_CASE("ProcessGroup.DestroyGroup")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ ProcessGroup* Group = Manager.CreateGroup("destroy-group");
+
+ std::filesystem::path AppStub = GetAppStubPath();
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ std::string CmdLine = AppStub.string() + " -t=60";
+
+ Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {});
+ Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {});
+
+ IoContext.run_for(200ms);
+ CHECK(Manager.GetProcessCount() == 2);
+
+ Manager.DestroyGroup("destroy-group");
+
+ CHECK(Manager.FindGroup("destroy-group") == nullptr);
+ CHECK(Manager.GetProcessCount() == 0);
+}
+
+TEST_CASE("ProcessGroup.MixedGroupedAndUngrouped")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ ProcessGroup* Group = Manager.CreateGroup("mixed-group");
+
+ std::filesystem::path AppStub = GetAppStubPath();
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ int GroupExitCount = 0;
+ int UngroupedExitCode = -1;
+
+ std::string CmdLine = AppStub.string() + " -f=0";
+
+ // Grouped processes
+ Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { GroupExitCount++; });
+ Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { GroupExitCount++; });
+
+ // Ungrouped process
+ Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int ExitCode) { UngroupedExitCode = ExitCode; });
+
+ CHECK(Group->GetProcessCount() == 2);
+ CHECK(Manager.GetProcessCount() == 3);
+
+ IoContext.run_for(5s);
+
+ CHECK(GroupExitCount == 2);
+ CHECK(UngroupedExitCode == 0);
+}
+
+TEST_CASE("ProcessGroup.FindGroup")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ CHECK(Manager.FindGroup("nonexistent") == nullptr);
+
+ ProcessGroup* Group = Manager.CreateGroup("findable");
+ CHECK(Manager.FindGroup("findable") == Group);
+ CHECK(Manager.FindGroup("findable")->GetName() == "findable");
+}
+
+TEST_CASE("SubprocessManager.StressTest" * doctest::skip())
+{
+ // Seed for reproducibility — change to explore different orderings
+ //
+ // Note that while this is a stress test, it is still single-threaded
+
+ constexpr uint32_t Seed = 42;
+ std::mt19937 Rng(Seed);
+ ZEN_INFO("StressTest: seed={}", Seed);
+
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 200, .MetricsBatchSize = 32});
+
+ std::filesystem::path AppStub = GetAppStubPath();
+ CreateProcOptions BaseOptions;
+ BaseOptions.Flags = CreateProcOptions::Flag_NoConsole;
+
+ std::atomic<int> TotalExitCallbacks{0};
+ std::atomic<int> KilledGroupProcessCount{0};
+
+ auto MakeExitCallback = [&](std::atomic<int>& Counter) {
+ return [&Counter, &TotalExitCallbacks](ManagedProcess&, int) {
+ Counter++;
+ TotalExitCallbacks++;
+ };
+ };
+
+ // ========================================================================
+ // Phase 1: Spawn multiple groups with varied workloads
+ // ========================================================================
+
+ ZEN_INFO("StressTest: Phase 1 — spawning initial groups");
+
+ constexpr int NumInitialGroups = 8;
+ std::vector<std::string> GroupNames;
+ std::vector<std::atomic<int>> GroupExitCounts(NumInitialGroups);
+ std::uniform_int_distribution<int> ProcCountDist(5, 100);
+ std::uniform_int_distribution<int> SleepDist(1, 5);
+ std::uniform_int_distribution<int> ExitCodeDist(0, 10);
+ std::uniform_int_distribution<int> WorkloadDist(0, 2); // 0=sleep, 1=exit-code, 2=echo+exit
+ int TotalPhase1Spawned = 0;
+
+ for (int G = 0; G < NumInitialGroups; G++)
+ {
+ std::string GroupName = fmt::format("stress-group-{}", G);
+ ProcessGroup* Group = Manager.CreateGroup(GroupName);
+ GroupNames.push_back(GroupName);
+
+ int ProcCount = ProcCountDist(Rng);
+ for (int P = 0; P < ProcCount; P++)
+ {
+ std::string CmdLine;
+ int Workload = WorkloadDist(Rng);
+ if (Workload == 0)
+ {
+ int Sleep = SleepDist(Rng);
+ CmdLine = fmt::format("{} -t={}", AppStub.string(), Sleep);
+ }
+ else if (Workload == 1)
+ {
+ int Code = ExitCodeDist(Rng);
+ CmdLine = fmt::format("{} -f={}", AppStub.string(), Code);
+ }
+ else
+ {
+ int Code = ExitCodeDist(Rng);
+ CmdLine = fmt::format("{} -echo=stress_g{}_p{} -f={}", AppStub.string(), G, P, Code);
+ }
+
+ Group->Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(GroupExitCounts[G]));
+ TotalPhase1Spawned++;
+ }
+
+ ZEN_INFO("StressTest: group '{}' spawned {} processes", GroupName, ProcCount);
+ }
+
+ ZEN_INFO("StressTest: Phase 1 total spawned: {}", TotalPhase1Spawned);
+
+ // Let processes start running and some short-lived ones exit
+ IoContext.run_for(1s);
+
+ // ========================================================================
+ // Phase 2: Randomly kill some groups, create replacements, add ungrouped
+ // ========================================================================
+
+ ZEN_INFO("StressTest: Phase 2 — random group kills and replacements");
+
+ constexpr int NumGroupsToKill = 3;
+
+ // Pick random groups to kill
+ std::vector<int> GroupIndices(NumInitialGroups);
+ std::iota(GroupIndices.begin(), GroupIndices.end(), 0);
+ std::shuffle(GroupIndices.begin(), GroupIndices.end(), Rng);
+
+ std::vector<int> KilledIndices(GroupIndices.begin(), GroupIndices.begin() + NumGroupsToKill);
+
+ for (int Idx : KilledIndices)
+ {
+ ProcessGroup* Group = Manager.FindGroup(GroupNames[Idx]);
+ if (Group)
+ {
+ size_t Count = Group->GetProcessCount();
+ ZEN_INFO("StressTest: killing group '{}' ({} processes)", GroupNames[Idx], Count);
+ Manager.DestroyGroup(GroupNames[Idx]);
+ }
+ }
+
+ // Let kills propagate
+ IoContext.run_for(500ms);
+
+ // Create replacement groups
+ std::atomic<int> ReplacementExitCount{0};
+ std::uniform_int_distribution<int> ReplacementCountDist(3, 10);
+
+ for (int R = 0; R < NumGroupsToKill; R++)
+ {
+ std::string Name = fmt::format("replacement-group-{}", R);
+ ProcessGroup* Group = Manager.CreateGroup(Name);
+ int Count = ReplacementCountDist(Rng);
+
+ for (int P = 0; P < Count; P++)
+ {
+ int Sleep = SleepDist(Rng);
+ std::string CmdLine = fmt::format("{} -t={}", AppStub.string(), Sleep);
+ Group->Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(ReplacementExitCount));
+ }
+
+ ZEN_INFO("StressTest: replacement group '{}' spawned {} processes", Name, Count);
+ }
+
+ // Also spawn some ungrouped processes
+ std::atomic<int> UngroupedExitCount{0};
+ constexpr int NumUngrouped = 10;
+
+ for (int U = 0; U < NumUngrouped; U++)
+ {
+ int ExitCode = ExitCodeDist(Rng);
+ std::string CmdLine = fmt::format("{} -f={}", AppStub.string(), ExitCode);
+ Manager.Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(UngroupedExitCount));
+ }
+
+ ZEN_INFO("StressTest: spawned {} ungrouped processes", NumUngrouped);
+
+ // Let things run
+ IoContext.run_for(2s);
+
+ // ========================================================================
+ // Phase 3: Rapid spawn/exit churn
+ // ========================================================================
+
+ ZEN_INFO("StressTest: Phase 3 — rapid spawn/exit churn");
+
+ std::atomic<int> ChurnExitCount{0};
+ int TotalChurnSpawned = 0;
+ constexpr int NumChurnBatches = 10;
+ std::uniform_int_distribution<int> ChurnBatchSizeDist(10, 20);
+
+ for (int Batch = 0; Batch < NumChurnBatches; Batch++)
+ {
+ std::string Name = fmt::format("churn-batch-{}", Batch);
+ ProcessGroup* Group = Manager.CreateGroup(Name);
+ int Count = ChurnBatchSizeDist(Rng);
+
+ for (int P = 0; P < Count; P++)
+ {
+ // Immediate exit processes to stress spawn/exit path
+ std::string CmdLine = fmt::format("{} -f=0", AppStub.string());
+ Group->Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(ChurnExitCount));
+ TotalChurnSpawned++;
+ }
+
+ // Brief pump to allow some exits to be processed
+ IoContext.run_for(200ms);
+
+ // Destroy the group — any still-running processes get killed
+ Manager.DestroyGroup(Name);
+ }
+
+ ZEN_INFO("StressTest: Phase 3 spawned {} churn processes across {} batches", TotalChurnSpawned, NumChurnBatches);
+
+ // ========================================================================
+ // Phase 4: Drain and verify
+ // ========================================================================
+
+ ZEN_INFO("StressTest: Phase 4 — draining remaining processes");
+
+ // Check metrics were collected before we wind down
+ AggregateProcessMetrics Agg = Manager.GetAggregateMetrics();
+ ZEN_INFO("StressTest: aggregate metrics: {} processes, {} bytes working set", Agg.ProcessCount, Agg.TotalWorkingSetSize);
+
+ // Let remaining processes finish (replacement groups have up to 5s sleep)
+ IoContext.run_for(8s);
+
+ // Kill anything still running
+ Manager.RemoveAll();
+
+ // Final pump to process any remaining callbacks
+ IoContext.run_for(1s);
+
+ ZEN_INFO("StressTest: Results:");
+ ZEN_INFO("StressTest: total exit callbacks fired: {}", TotalExitCallbacks.load());
+ ZEN_INFO("StressTest: ungrouped exits: {}", UngroupedExitCount.load());
+ ZEN_INFO("StressTest: replacement exits: {}", ReplacementExitCount.load());
+ ZEN_INFO("StressTest: churn exits: {}", ChurnExitCount.load());
+
+ // Verify the manager is clean
+ CHECK(Manager.GetProcessCount() == 0);
+
+ // Ungrouped processes should all have exited (they were immediate-exit)
+ CHECK(UngroupedExitCount.load() == NumUngrouped);
+
+ // Verify we got a reasonable number of total callbacks
+ // (exact count is hard to predict due to killed groups, but should be > 0)
+ CHECK(TotalExitCallbacks.load() > 0);
+
+ ZEN_INFO("StressTest: PASSED — seed={}", Seed);
+}
+
+TEST_SUITE_END();
+
+#endif