diff options
| author | Stefan Boberg <[email protected]> | 2026-03-24 15:47:23 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-24 15:47:23 +0100 |
| commit | 21c2abb1bde697c31bee562465cb986a0429a299 (patch) | |
| tree | 3734d235e79a8fbed307ae5c248936d356553b61 /src | |
| parent | v5.7.25 hotpatch (#874) (diff) | |
| download | zen-21c2abb1bde697c31bee562465cb986a0429a299.tar.xz zen-21c2abb1bde697c31bee562465cb986a0429a299.zip | |
Subprocess Manager (#889)
Adds a `SubprocessManager` for managing child processes with ASIO-integrated async exit detection, stdout/stderr pipe capture, and periodic metrics sampling. Also introduces `ProcessGroup` for OS-backed process grouping (Windows JobObjects / POSIX process groups).
### SubprocessManager
- Async process exit detection using platform-native mechanisms (Windows `object_handle`, Linux `pidfd_open`, macOS `kqueue EVFILT_PROC`) — no polling
- Stdout/stderr capture via async pipe readers with per-process or default callbacks
- Periodic round-robin metrics sampling (CPU, memory) across managed processes
- Spawn, adopt, remove, kill, and enumerate managed processes
### ProcessGroup
- OS-level process grouping: Windows JobObject (kill-on-close guarantee), POSIX `setpgid` (bulk signal delivery)
- Atomic group kill via `TerminateJobObject` (Windows) or `kill(-pgid, sig)` (POSIX)
- Per-group aggregate metrics and enumeration
### ProcessHandle improvements
- Added explicit constructors from `int` (pid) and `void*` (native handle)
- Added move constructor and move assignment operator
### ProcessMetricsTracker
- Cross-platform process metrics (CPU time, working set, page faults) via `QueryProcessMetrics()`
- ASIO timer-driven periodic sampling with configurable interval and batch size
- Aggregate metrics across tracked processes
### Other changes
- Fixed `zentest-appstub` writing a spurious `Versions` file to cwd on every invocation
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencompute/runners/windowsrunner.cpp | 2 | ||||
| -rw-r--r-- | src/zencore/include/zencore/process.h | 49 | ||||
| -rw-r--r-- | src/zencore/logging.cpp | 7 | ||||
| -rw-r--r-- | src/zencore/process.cpp | 53 | ||||
| -rw-r--r-- | src/zenhttp/httpclientauth.cpp | 2 | ||||
| -rw-r--r-- | src/zenserver-test/process-tests.cpp | 25 | ||||
| -rw-r--r-- | src/zentest-appstub/zentest-appstub.cpp | 8 | ||||
| -rw-r--r-- | src/zenutil-test/xmake.lua | 1 | ||||
| -rw-r--r-- | src/zenutil/cloud/minioprocess.cpp | 7 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/process/subprocessmanager.h | 283 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/processmetricstracker.h | 105 | ||||
| -rw-r--r-- | src/zenutil/logging/logging.cpp | 1 | ||||
| -rw-r--r-- | src/zenutil/process/asyncpipereader.cpp | 276 | ||||
| -rw-r--r-- | src/zenutil/process/asyncpipereader.h | 62 | ||||
| -rw-r--r-- | src/zenutil/process/exitwatcher.cpp | 294 | ||||
| -rw-r--r-- | src/zenutil/process/exitwatcher.h | 48 | ||||
| -rw-r--r-- | src/zenutil/process/subprocessmanager.cpp | 1811 | ||||
| -rw-r--r-- | src/zenutil/processmetricstracker.cpp | 392 | ||||
| -rw-r--r-- | src/zenutil/zenutil.cpp | 4 |
19 files changed, 2893 insertions, 537 deletions
diff --git a/src/zencompute/runners/windowsrunner.cpp b/src/zencompute/runners/windowsrunner.cpp index e9a1ae8b6..cd4b646e9 100644 --- a/src/zencompute/runners/windowsrunner.cpp +++ b/src/zencompute/runners/windowsrunner.cpp @@ -172,7 +172,7 @@ WindowsProcessRunner::SubmitAction(Ref<RunnerAction> Action) LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr; LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr; BOOL bInheritHandles = FALSE; - DWORD dwCreationFlags = 0; + DWORD dwCreationFlags = DETACHED_PROCESS; ZEN_DEBUG("Executing: {} (sandboxed={})", WideToUtf8(CommandLine.c_str()), m_Sandboxed); diff --git a/src/zencore/include/zencore/process.h b/src/zencore/include/zencore/process.h index d115bf11f..5ae7fad68 100644 --- a/src/zencore/include/zencore/process.h +++ b/src/zencore/include/zencore/process.h @@ -29,9 +29,23 @@ class ProcessHandle public: ProcessHandle(); + /// Construct by opening a handle to the process identified by @p Pid. + /// On Windows this calls OpenProcess(); on POSIX it simply stores the pid. + /// Throws std::system_error on failure. + explicit ProcessHandle(int Pid); + + /// Construct from an existing native process handle. Takes ownership — + /// the caller must not close the handle afterwards. Windows only. +#if ZEN_PLATFORM_WINDOWS + explicit ProcessHandle(void* NativeHandle); +#endif + ProcessHandle(const ProcessHandle&) = delete; ProcessHandle& operator=(const ProcessHandle&) = delete; + ProcessHandle(ProcessHandle&& Other) noexcept; + ProcessHandle& operator=(ProcessHandle&& Other) noexcept; + ~ProcessHandle(); /// Open a handle to the process identified by @p Pid. @@ -44,7 +58,9 @@ public: /// Initialize from an existing native process handle. Takes ownership — /// the caller must not close the handle afterwards. Windows only. +#if ZEN_PLATFORM_WINDOWS void Initialize(void* ProcessHandle); +#endif /// Returns true if the process is still alive. /// On Windows, queries the exit code (STILL_ACTIVE check). @@ -148,14 +164,23 @@ struct CreateProcOptions { enum { + // Allocate a new console for the child (CREATE_NEW_CONSOLE on Windows). Flag_NewConsole = 1 << 0, - Flag_Elevated = 1 << 1, + // Launch the child with elevated (administrator) privileges via ShellExecuteEx/runas. + Flag_Elevated = 1 << 1, + // Launch the child without elevation from an elevated parent, using a medium-integrity token. Flag_Unelevated = 1 << 2, - Flag_NoConsole = 1 << 3, - // This flag creates the new process in a new process group. This is relevant only on Windows, and - // allows sending ctrl-break events to the new process group without also sending it to the current - // process. + // Detach the child from all consoles (DETACHED_PROCESS on Windows). No console is + // allocated and no conhost.exe is spawned. Stdout/stderr still work when redirected + // via pipes. Prefer this for headless worker processes. + Flag_NoConsole = 1 << 3, + // Create the child in a new process group (CREATE_NEW_PROCESS_GROUP on Windows). + // Allows sending CTRL_BREAK_EVENT to the child group without affecting the parent. Flag_Windows_NewProcessGroup = 1 << 4, + // Allocate a hidden console for the child (CREATE_NO_WINDOW on Windows). Unlike + // Flag_NoConsole the child still gets a console (and a conhost.exe) but no visible + // window. Use this when the child needs a console for stdio but should not show a window. + Flag_NoWindow = 1 << 5, }; const std::filesystem::path* WorkingDirectory = nullptr; @@ -171,9 +196,16 @@ struct CreateProcOptions #if ZEN_PLATFORM_WINDOWS JobObject* AssignToJob = nullptr; // When set, the process is created suspended, assigned to the job, then resumed +#else + /// POSIX process group id. When > 0, the child is placed into this process + /// group via setpgid() before exec. Use the pid of the first child as the + /// pgid to create a group, then pass the same pgid for subsequent children. + int ProcessGroupId = 0; #endif }; +// TODO: this should really be replaced with ProcessHandle + #if ZEN_PLATFORM_WINDOWS using CreateProcResult = void*; // handle to the process #else @@ -224,9 +256,10 @@ public: JobObject(const JobObject&) = delete; JobObject& operator=(const JobObject&) = delete; - void Initialize(); - bool AssignProcess(void* ProcessHandle); - [[nodiscard]] bool IsValid() const; + void Initialize(); + bool AssignProcess(void* ProcessHandle); + [[nodiscard]] bool IsValid() const; + [[nodiscard]] void* Handle() const { return m_JobHandle; } private: void* m_JobHandle = nullptr; diff --git a/src/zencore/logging.cpp b/src/zencore/logging.cpp index 828bea6ed..5ada0cac7 100644 --- a/src/zencore/logging.cpp +++ b/src/zencore/logging.cpp @@ -414,6 +414,13 @@ InitializeLogging() { ZEN_MEMSCOPE(ELLMTag::Logging); + EnableVTMode(); + +#if ZEN_PLATFORM_WINDOWS + // Enable UTF-8 output so multi-byte characters render correctly via WriteFile + SetConsoleOutputCP(CP_UTF8); +#endif + TheDefaultLogger = LoggerRef(*Registry::Instance().DefaultLoggerRaw()); g_LoggingInitialized = true; } diff --git a/src/zencore/process.cpp b/src/zencore/process.cpp index dcb8b2422..e7baa3f8e 100644 --- a/src/zencore/process.cpp +++ b/src/zencore/process.cpp @@ -277,6 +277,46 @@ CreateStdoutPipe(StdoutPipeHandles& OutPipe) ProcessHandle::ProcessHandle() = default; +ProcessHandle::ProcessHandle(int Pid) +{ + Initialize(Pid); +} + +#if ZEN_PLATFORM_WINDOWS +ProcessHandle::ProcessHandle(void* NativeHandle) +{ + Initialize(NativeHandle); +} +#endif + +ProcessHandle::ProcessHandle(ProcessHandle&& Other) noexcept +: m_ProcessHandle(Other.m_ProcessHandle) +, m_Pid(Other.m_Pid) +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC +, m_ExitCode(Other.m_ExitCode) +#endif +{ + Other.m_ProcessHandle = nullptr; + Other.m_Pid = 0; +} + +ProcessHandle& +ProcessHandle::operator=(ProcessHandle&& Other) noexcept +{ + if (this != &Other) + { + Reset(); + m_ProcessHandle = Other.m_ProcessHandle; + m_Pid = Other.m_Pid; +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + m_ExitCode = Other.m_ExitCode; +#endif + Other.m_ProcessHandle = nullptr; + Other.m_Pid = 0; + } + return *this; +} + #if ZEN_PLATFORM_WINDOWS void ProcessHandle::Initialize(void* ProcessHandle) @@ -720,6 +760,10 @@ CreateProcNormal(const std::filesystem::path& Executable, std::string_view Comma } if (Options.Flags & CreateProcOptions::Flag_NoConsole) { + CreationFlags |= DETACHED_PROCESS; + } + if (Options.Flags & CreateProcOptions::Flag_NoWindow) + { CreationFlags |= CREATE_NO_WINDOW; } if (Options.Flags & CreateProcOptions::Flag_Windows_NewProcessGroup) @@ -930,6 +974,10 @@ CreateProcUnelevated(const std::filesystem::path& Executable, std::string_view C } if (Options.Flags & CreateProcOptions::Flag_NoConsole) { + CreateProcFlags |= DETACHED_PROCESS; + } + if (Options.Flags & CreateProcOptions::Flag_NoWindow) + { CreateProcFlags |= CREATE_NO_WINDOW; } if (AssignToJob) @@ -1070,6 +1118,11 @@ CreateProc(const std::filesystem::path& Executable, std::string_view CommandLine } } + if (Options.ProcessGroupId > 0) + { + setpgid(0, Options.ProcessGroupId); + } + for (const auto& [Key, Value] : Options.Environment) { setenv(Key.c_str(), Value.c_str(), 1); diff --git a/src/zenhttp/httpclientauth.cpp b/src/zenhttp/httpclientauth.cpp index 1ebf1f949..c42841922 100644 --- a/src/zenhttp/httpclientauth.cpp +++ b/src/zenhttp/httpclientauth.cpp @@ -105,7 +105,7 @@ namespace zen { namespace httpclientauth { } if (Hidden) { - ProcOptions.Flags |= CreateProcOptions::Flag_NoConsole; + ProcOptions.Flags |= CreateProcOptions::Flag_NoWindow; } const std::filesystem::path AuthTokenPath(std::filesystem::temp_directory_path() / fmt::format(".zen-auth-{}", Oid::NewOid())); diff --git a/src/zenserver-test/process-tests.cpp b/src/zenserver-test/process-tests.cpp index 649f24f54..ae11bb294 100644 --- a/src/zenserver-test/process-tests.cpp +++ b/src/zenserver-test/process-tests.cpp @@ -67,10 +67,7 @@ TEST_CASE("pipe.capture_stdout") CreateProcOptions Options; Options.StdoutPipe = &Pipe; - CreateProcResult ProcResult = CreateProc(AppStub, CommandLine, Options); - - ProcessHandle Process; - Process.Initialize(ProcResult); + ProcessHandle Process(CreateProc(AppStub, CommandLine, Options)); // Close the write end, then drain before Wait() to avoid deadlock if output fills the pipe buffer. Pipe.CloseWriteEnd(); @@ -95,10 +92,7 @@ TEST_CASE("pipe.capture_multiline") CreateProcOptions Options; Options.StdoutPipe = &Pipe; - CreateProcResult ProcResult = CreateProc(AppStub, CommandLine, Options); - - ProcessHandle Process; - Process.Initialize(ProcResult); + ProcessHandle Process(CreateProc(AppStub, CommandLine, Options)); Pipe.CloseWriteEnd(); @@ -211,10 +205,7 @@ TEST_CASE("pipe.capture_with_nonzero_exit") CreateProcOptions Options; Options.StdoutPipe = &Pipe; - CreateProcResult ProcResult = CreateProc(AppStub, CommandLine, Options); - - ProcessHandle Process; - Process.Initialize(ProcResult); + ProcessHandle Process(CreateProc(AppStub, CommandLine, Options)); Pipe.CloseWriteEnd(); @@ -237,10 +228,7 @@ TEST_CASE("pipe.stderr_on_shared_pipe") CreateProcOptions Options; Options.StdoutPipe = &Pipe; - CreateProcResult ProcResult = CreateProc(AppStub, CommandLine, Options); - - ProcessHandle Process; - Process.Initialize(ProcResult); + ProcessHandle Process(CreateProc(AppStub, CommandLine, Options)); Pipe.CloseWriteEnd(); @@ -268,10 +256,7 @@ TEST_CASE("pipe.separate_stderr") Options.StdoutPipe = &StdoutPipe; Options.StderrPipe = &StderrPipe; - CreateProcResult ProcResult = CreateProc(AppStub, CommandLine, Options); - - ProcessHandle Process; - Process.Initialize(ProcResult); + ProcessHandle Process(CreateProc(AppStub, CommandLine, Options)); StdoutPipe.CloseWriteEnd(); StderrPipe.CloseWriteEnd(); diff --git a/src/zentest-appstub/zentest-appstub.cpp b/src/zentest-appstub/zentest-appstub.cpp index 13c96ebe2..54e54edde 100644 --- a/src/zentest-appstub/zentest-appstub.cpp +++ b/src/zentest-appstub/zentest-appstub.cpp @@ -221,10 +221,10 @@ main(int argc, char* argv[]) try { - std::filesystem::path BasePath = std::filesystem::current_path(); - std::filesystem::path InputPath = std::filesystem::current_path() / "Inputs"; - std::filesystem::path OutputPath = std::filesystem::current_path() / "Outputs"; - std::filesystem::path VersionPath = std::filesystem::current_path() / "Versions"; + std::filesystem::path BasePath = std::filesystem::current_path(); + std::filesystem::path InputPath = std::filesystem::current_path() / "Inputs"; + std::filesystem::path OutputPath = std::filesystem::current_path() / "Outputs"; + std::filesystem::path VersionPath; std::vector<std::filesystem::path> ActionPaths; /* diff --git a/src/zenutil-test/xmake.lua b/src/zenutil-test/xmake.lua index c77af2482..e124c321d 100644 --- a/src/zenutil-test/xmake.lua +++ b/src/zenutil-test/xmake.lua @@ -6,4 +6,5 @@ target("zenutil-test") add_headerfiles("**.h") add_files("*.cpp") add_deps("zenutil") + add_deps("zentest-appstub") add_deps("doctest") diff --git a/src/zenutil/cloud/minioprocess.cpp b/src/zenutil/cloud/minioprocess.cpp index 565705731..457453bd8 100644 --- a/src/zenutil/cloud/minioprocess.cpp +++ b/src/zenutil/cloud/minioprocess.cpp @@ -72,11 +72,12 @@ struct MinioProcess::Impl ZEN_INFO("MinIO server started successfully (waited {})", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); return; } - } while (Timer.GetElapsedTimeMs() < 10000); + } while (Timer.GetElapsedTimeMs() < 30000); } - // Report failure - ZEN_WARN("MinIO server failed to start within timeout period"); + // Report failure - throw so test failures show the real cause instead of a confusing + // assertion failure later when S3 operations fail silently. + throw std::runtime_error(fmt::format("MinIO server on port {} failed to start within timeout", m_Options.Port)); } void StopMinioServer() diff --git a/src/zenutil/include/zenutil/process/subprocessmanager.h b/src/zenutil/include/zenutil/process/subprocessmanager.h new file mode 100644 index 000000000..4a25170df --- /dev/null +++ b/src/zenutil/include/zenutil/process/subprocessmanager.h @@ -0,0 +1,283 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/process.h> +#include <zencore/zencore.h> + +#include <filesystem> +#include <functional> +#include <memory> +#include <string> +#include <string_view> +#include <vector> + +namespace zen { + +/** Tracked process entry with latest metrics snapshot. + */ +struct TrackedProcessEntry +{ + int Pid = 0; + ProcessMetrics Metrics; + + // Derived CPU usage percentage (delta-based, requires two samples). + // -1.0 means not yet sampled. + float CpuUsagePercent = -1.0f; +}; + +/** Aggregate metrics across all tracked processes. + */ +struct AggregateProcessMetrics +{ + uint64_t TotalWorkingSetSize = 0; + uint64_t TotalPeakWorkingSetSize = 0; + uint64_t TotalUserTimeMs = 0; + uint64_t TotalKernelTimeMs = 0; + uint32_t ProcessCount = 0; +}; + +} // namespace zen + +namespace asio { +class io_context; +} + +namespace zen { + +class ManagedProcess; +class ProcessGroup; + +/// Callback invoked when a managed process exits. +using ProcessExitCallback = std::function<void(ManagedProcess& Process, int ExitCode)>; + +/// Callback invoked when data is read from a managed process's stdout or stderr. +using ProcessDataCallback = std::function<void(ManagedProcess& Process, std::string_view Data)>; + +/// Configuration for SubprocessManager. +struct SubprocessManagerConfig +{ + /// Interval for periodic metrics sampling. Set to 0 to disable. + uint64_t MetricsSampleIntervalMs = 5000; + + /// Number of processes sampled per metrics tick (round-robin). + uint32_t MetricsBatchSize = 16; +}; + +/// Manages a set of child processes with async exit detection, stdout/stderr +/// capture, and periodic metrics sampling. +/// +/// All callbacks are posted to the io_context and never invoked under internal +/// locks. The caller must ensure the io_context outlives this manager and that +/// its run loop is active. +/// +/// Usage: +/// asio::io_context IoContext; +/// SubprocessManager Manager(IoContext); +/// +/// StdoutPipeHandles StdoutPipe; +/// CreateStdoutPipe(StdoutPipe); +/// +/// CreateProcOptions Options; +/// Options.StdoutPipe = &StdoutPipe; +/// +/// auto* Proc = Manager.Spawn(Executable, CommandLine, Options, +/// [](ManagedProcess& P, int Code) { ... }); +class SubprocessManager +{ +public: + explicit SubprocessManager(asio::io_context& IoContext, SubprocessManagerConfig Config = {}); + ~SubprocessManager(); + + SubprocessManager(const SubprocessManager&) = delete; + SubprocessManager& operator=(const SubprocessManager&) = delete; + + /// Spawn a new child process and begin monitoring it. + /// + /// If Options.StdoutPipe is set, the pipe is consumed and async reading + /// begins automatically. Similarly for Options.StderrPipe. + /// + /// Returns a non-owning pointer valid until Remove() or manager destruction. + /// The exit callback fires on an io_context thread when the process terminates. + ManagedProcess* Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit); + + /// Adopt an already-running process by handle. Takes ownership of handle internals. + ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit); + + /// Stop monitoring a process by pid. Does NOT kill the process — call + /// process->Kill() first if needed. The exit callback will not fire after + /// this returns. + void Remove(int Pid); + + /// Remove all managed processes. + void RemoveAll(); + + /// Set default stdout callback. Per-process callbacks override this. + void SetDefaultStdoutCallback(ProcessDataCallback Callback); + + /// Set default stderr callback. Per-process callbacks override this. + void SetDefaultStderrCallback(ProcessDataCallback Callback); + + /// Snapshot of per-process metrics for all managed processes. + [[nodiscard]] std::vector<TrackedProcessEntry> GetMetricsSnapshot() const; + + /// Aggregate metrics across all managed processes. + [[nodiscard]] AggregateProcessMetrics GetAggregateMetrics() const; + + /// Number of currently managed processes. + [[nodiscard]] size_t GetProcessCount() const; + + /// Enumerate all managed processes under a shared lock. + void Enumerate(std::function<void(const ManagedProcess&)> Callback) const; + + /// Create a new process group. The group is owned by this manager. + /// On Windows the group is backed by a JobObject (kill-on-close guarantee). + /// On POSIX the group uses setpgid for bulk signal delivery. + ProcessGroup* CreateGroup(std::string Name); + + /// Destroy a group by name. Kills all processes in the group first. + void DestroyGroup(std::string_view Name); + + /// Find a group by name. Returns nullptr if not found. + [[nodiscard]] ProcessGroup* FindGroup(std::string_view Name) const; + + /// Enumerate all groups. + void EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const; + +private: + friend class ProcessGroup; + + struct Impl; + std::unique_ptr<Impl> m_Impl; +}; + +/// A process managed by SubprocessManager. +/// +/// Not user-constructible. Pointers obtained from Spawn()/Adopt() remain valid +/// until Remove() or manager destruction. +class ManagedProcess +{ +public: + ~ManagedProcess(); + + ManagedProcess(const ManagedProcess&) = delete; + ManagedProcess& operator=(const ManagedProcess&) = delete; + + /// Process id. + [[nodiscard]] int Pid() const; + + /// Whether the process is still running. + [[nodiscard]] bool IsRunning() const; + + /// Underlying process handle. + [[nodiscard]] const ProcessHandle& GetHandle() const; + + /// Most recently sampled metrics (best-effort snapshot). + [[nodiscard]] ProcessMetrics GetLatestMetrics() const; + + /// CPU usage percentage from the last two samples. Returns -1.0 if not + /// yet computed. + [[nodiscard]] float GetCpuUsagePercent() const; + + /// Set per-process stdout callback (overrides manager default). + void SetStdoutCallback(ProcessDataCallback Callback); + + /// Set per-process stderr callback (overrides manager default). + void SetStderrCallback(ProcessDataCallback Callback); + + /// Return all stdout captured so far. When a callback is set, output is + /// delivered there instead of being accumulated. + [[nodiscard]] std::string GetCapturedStdout() const; + + /// Return all stderr captured so far. + [[nodiscard]] std::string GetCapturedStderr() const; + + /// Graceful shutdown with fallback to forced kill. + bool Kill(); + + /// Immediate forced termination. + bool Terminate(int ExitCode); + + /// User-defined tag for identifying this process in callbacks. + void SetTag(std::string Tag); + + /// Get the user-defined tag. + [[nodiscard]] std::string_view GetTag() const; + +private: + friend class SubprocessManager; + friend class ProcessGroup; + + struct Impl; + std::unique_ptr<Impl> m_Impl; + + explicit ManagedProcess(std::unique_ptr<Impl> InImpl); +}; + +/// A group of managed processes with OS-level backing. +/// +/// On Windows: backed by a JobObject. All processes assigned on spawn. +/// Kill-on-close guarantee — if the group is destroyed, the OS terminates +/// all member processes. +/// On Linux/macOS: uses setpgid() so children share a process group. +/// Enables bulk signal delivery via kill(-pgid, sig). +/// +/// Created via SubprocessManager::CreateGroup(). Not user-constructible. +class ProcessGroup +{ +public: + ~ProcessGroup(); + + ProcessGroup(const ProcessGroup&) = delete; + ProcessGroup& operator=(const ProcessGroup&) = delete; + + /// Group name (as passed to CreateGroup). + [[nodiscard]] std::string_view GetName() const; + + /// Spawn a process into this group. + ManagedProcess* Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit); + + /// Adopt an already-running process into this group. + /// On Windows the process is assigned to the group's JobObject. + /// On POSIX the process cannot be moved into a different process group + /// after creation, so OS-level grouping is best-effort for adopted processes. + ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit); + + /// Remove a process from this group. Does NOT kill it. + void Remove(int Pid); + + /// Kill all processes in the group. + /// On Windows: uses TerminateJobObject for atomic group kill. + /// On POSIX: sends SIGTERM then SIGKILL to the process group. + void KillAll(); + + /// Aggregate metrics for this group's processes. + [[nodiscard]] AggregateProcessMetrics GetAggregateMetrics() const; + + /// Per-process metrics snapshot for this group. + [[nodiscard]] std::vector<TrackedProcessEntry> GetMetricsSnapshot() const; + + /// Number of processes in this group. + [[nodiscard]] size_t GetProcessCount() const; + + /// Enumerate processes in this group. + void Enumerate(std::function<void(const ManagedProcess&)> Callback) const; + +private: + friend class SubprocessManager; + + struct Impl; + std::unique_ptr<Impl> m_Impl; + + explicit ProcessGroup(std::unique_ptr<Impl> InImpl); +}; + +void subprocessmanager_forcelink(); // internal + +} // namespace zen diff --git a/src/zenutil/include/zenutil/processmetricstracker.h b/src/zenutil/include/zenutil/processmetricstracker.h deleted file mode 100644 index fdeae2bfa..000000000 --- a/src/zenutil/include/zenutil/processmetricstracker.h +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/process.h> -#include <zencore/zencore.h> - -#include <memory> -#include <vector> - -namespace asio { -class io_context; -} - -namespace zen { - -/** Tracked process entry with latest metrics snapshot. - */ -struct TrackedProcessEntry -{ - int Pid = 0; - ProcessMetrics Metrics; - - // Derived CPU usage percentage (delta-based, requires two samples). - // -1.0 means not yet sampled. - float CpuUsagePercent = -1.0f; -}; - -/** Aggregate metrics across all tracked processes. - */ -struct AggregateProcessMetrics -{ - uint64_t TotalWorkingSetSize = 0; - uint64_t TotalPeakWorkingSetSize = 0; - uint64_t TotalUserTimeMs = 0; - uint64_t TotalKernelTimeMs = 0; - uint32_t ProcessCount = 0; -}; - -/** Background process metrics tracker. - * - * Maintains a set of child processes keyed by pid and periodically samples - * their resource usage (CPU times, memory) in a background thread or via - * an ASIO timer on an external io_context. - * - * The tracker does not take ownership of process handles. On Windows it - * duplicates the handle internally; on other platforms it uses the pid - * directly. - * - * Usage (dedicated thread): - * ProcessMetricsTracker Tracker; - * Tracker.Start(); - * Tracker.Add(ChildHandle); - * - * Usage (ASIO timer): - * ProcessMetricsTracker Tracker(IoContext); - * Tracker.Start(); - * Tracker.Add(ChildHandle); - */ -class ProcessMetricsTracker -{ -public: - /// Construct with a dedicated background thread for sampling. - explicit ProcessMetricsTracker(uint64_t SampleIntervalMs = 5000); - - /// Construct with an external io_context — uses an asio::steady_timer - /// instead of a dedicated thread. The caller must ensure the io_context - /// outlives this tracker and that its run loop is active. - ProcessMetricsTracker(asio::io_context& IoContext, uint64_t SampleIntervalMs = 5000); - - ~ProcessMetricsTracker(); - - ProcessMetricsTracker(const ProcessMetricsTracker&) = delete; - ProcessMetricsTracker& operator=(const ProcessMetricsTracker&) = delete; - - /// Start sampling. Spawns the background thread or enqueues the first timer. - void Start(); - - /// Stop sampling. Safe to call multiple times. - void Stop(); - - /// Add a process to track. Internally clones the handle (Windows) or - /// copies the pid (Linux/macOS). If the pid is already tracked, replaces it. - void Add(const ProcessHandle& Handle); - - /// Remove a tracked process by pid. - void Remove(int Pid); - - /// Remove all tracked processes. - void Clear(); - - /// Returns a snapshot of metrics for all tracked processes. - std::vector<TrackedProcessEntry> GetSnapshot() const; - - /// Returns aggregate metrics across all tracked processes. - AggregateProcessMetrics GetAggregate() const; - -private: - struct Impl; - std::unique_ptr<Impl> m_Impl; -}; - -void processmetricstracker_forcelink(); // internal - -} // namespace zen diff --git a/src/zenutil/logging/logging.cpp b/src/zenutil/logging/logging.cpp index e1755414b..aa34fc50c 100644 --- a/src/zenutil/logging/logging.cpp +++ b/src/zenutil/logging/logging.cpp @@ -55,7 +55,6 @@ BeginInitializeLogging(const LoggingOptions& LogOptions) ZEN_MEMSCOPE(ELLMTag::Logging); zen::logging::InitializeLogging(); - zen::logging::EnableVTMode(); // Sinks diff --git a/src/zenutil/process/asyncpipereader.cpp b/src/zenutil/process/asyncpipereader.cpp new file mode 100644 index 000000000..2fdcda30d --- /dev/null +++ b/src/zenutil/process/asyncpipereader.cpp @@ -0,0 +1,276 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "asyncpipereader.h" + +#include <zencore/logging.h> + +#include <array> + +ZEN_THIRD_PARTY_INCLUDES_START + +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> +# include <asio/io_context.hpp> +# include <asio/windows/stream_handle.hpp> +#else +# include <fcntl.h> +# include <unistd.h> +# include <asio/io_context.hpp> +# include <asio/posix/stream_descriptor.hpp> +#endif + +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +static constexpr size_t kReadBufferSize = 4096; + +// ============================================================================ +// POSIX: non-blocking pipe + stream_descriptor +// ============================================================================ + +#if !ZEN_PLATFORM_WINDOWS + +struct AsyncPipeReader::Impl +{ + asio::io_context& m_IoContext; + std::unique_ptr<asio::posix::stream_descriptor> m_Descriptor; + std::function<void(std::string_view)> m_DataCallback; + std::function<void()> m_EofCallback; + std::array<char, kReadBufferSize> m_Buffer{}; + + explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {} + + ~Impl() { Stop(); } + + void Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view)> DataCallback, std::function<void()> EofCallback) + { + m_DataCallback = std::move(DataCallback); + m_EofCallback = std::move(EofCallback); + + int Fd = Pipe.ReadFd; + + // Close the write end — child already has it + Pipe.CloseWriteEnd(); + + // Set non-blocking + int Flags = fcntl(Fd, F_GETFL, 0); + fcntl(Fd, F_SETFL, Flags | O_NONBLOCK); + + // Take ownership of the fd. Detach it from StdoutPipeHandles so it + // doesn't get double-closed. + Pipe.ReadFd = -1; + + m_Descriptor = std::make_unique<asio::posix::stream_descriptor>(m_IoContext, Fd); + EnqueueRead(); + } + + void Stop() + { + if (m_Descriptor) + { + asio::error_code Ec; + m_Descriptor->cancel(Ec); + m_Descriptor.reset(); + } + } + + void EnqueueRead() + { + if (!m_Descriptor) + { + return; + } + + m_Descriptor->async_read_some(asio::buffer(m_Buffer), [this](const asio::error_code& Ec, size_t BytesRead) { + if (Ec) + { + if (Ec != asio::error::operation_aborted && m_EofCallback) + { + m_EofCallback(); + } + return; + } + + if (BytesRead > 0 && m_DataCallback) + { + m_DataCallback(std::string_view(m_Buffer.data(), BytesRead)); + } + + EnqueueRead(); + }); + } +}; + +bool +CreateOverlappedStdoutPipe(StdoutPipeHandles& OutPipe) +{ + // On POSIX, regular pipes work fine with non-blocking I/O + return CreateStdoutPipe(OutPipe); +} + +// ============================================================================ +// Windows: overlapped named pipe + asio::windows::stream_handle +// +// Anonymous pipes (CreatePipe) do not support overlapped I/O. Instead, we +// create a named pipe pair with FILE_FLAG_OVERLAPPED on the read (server) end. +// The write (client) end is inheritable and used as the child's stdout/stderr. +// +// Callers must use CreateOverlappedStdoutPipe() instead of CreateStdoutPipe() +// so the pipe is overlapped from the start. Passing a non-overlapped anonymous +// pipe to Start() will fail. +// ============================================================================ + +#else // ZEN_PLATFORM_WINDOWS + +static std::atomic<uint64_t> s_PipeSerial{0}; + +bool +CreateOverlappedStdoutPipe(StdoutPipeHandles& OutPipe) +{ + // Generate a unique pipe name + uint64_t Serial = s_PipeSerial.fetch_add(1); + wchar_t PipeName[128]; + swprintf_s(PipeName, + _countof(PipeName), + L"\\\\.\\pipe\\zen_async_%u_%llu", + GetCurrentProcessId(), + static_cast<unsigned long long>(Serial)); + + // Create the server (read) end with FILE_FLAG_OVERLAPPED + HANDLE ReadHandle = CreateNamedPipeW(PipeName, + PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_WAIT, + 1, // max instances + 0, // out buffer size + kReadBufferSize, + 0, // default timeout + nullptr); + + if (ReadHandle == INVALID_HANDLE_VALUE) + { + ZEN_WARN("CreateNamedPipeW failed: {}", GetLastError()); + return false; + } + + // The read end should not be inherited by the child + SetHandleInformation(ReadHandle, HANDLE_FLAG_INHERIT, 0); + + // Open the client (write) end — inheritable, for the child process + SECURITY_ATTRIBUTES Sa; + Sa.nLength = sizeof(Sa); + Sa.lpSecurityDescriptor = nullptr; + Sa.bInheritHandle = TRUE; + + HANDLE WriteHandle = CreateFileW(PipeName, + GENERIC_WRITE, + 0, // no sharing + &Sa, // inheritable + OPEN_EXISTING, + 0, // no special flags on write end + nullptr); + + if (WriteHandle == INVALID_HANDLE_VALUE) + { + DWORD Err = GetLastError(); + CloseHandle(ReadHandle); + ZEN_WARN("CreateFileW for pipe client end failed: {}", Err); + return false; + } + + OutPipe.ReadHandle = ReadHandle; + OutPipe.WriteHandle = WriteHandle; + return true; +} + +struct AsyncPipeReader::Impl +{ + asio::io_context& m_IoContext; + std::unique_ptr<asio::windows::stream_handle> m_StreamHandle; + std::function<void(std::string_view)> m_DataCallback; + std::function<void()> m_EofCallback; + std::array<char, kReadBufferSize> m_Buffer{}; + + explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {} + + ~Impl() { Stop(); } + + void Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view)> DataCallback, std::function<void()> EofCallback) + { + m_DataCallback = std::move(DataCallback); + m_EofCallback = std::move(EofCallback); + + HANDLE ReadHandle = static_cast<HANDLE>(Pipe.ReadHandle); + + // Close the write end — child already has it + Pipe.CloseWriteEnd(); + + // Take ownership of the read handle + Pipe.ReadHandle = nullptr; + + m_StreamHandle = std::make_unique<asio::windows::stream_handle>(m_IoContext, ReadHandle); + EnqueueRead(); + } + + void Stop() + { + if (m_StreamHandle) + { + asio::error_code Ec; + m_StreamHandle->cancel(Ec); + m_StreamHandle.reset(); + } + } + + void EnqueueRead() + { + if (!m_StreamHandle) + { + return; + } + + m_StreamHandle->async_read_some(asio::buffer(m_Buffer), [this](const asio::error_code& Ec, size_t BytesRead) { + if (Ec) + { + if (Ec != asio::error::operation_aborted && m_EofCallback) + { + m_EofCallback(); + } + return; + } + + if (BytesRead > 0 && m_DataCallback) + { + m_DataCallback(std::string_view(m_Buffer.data(), BytesRead)); + } + + EnqueueRead(); + }); + } +}; + +#endif + +// ============================================================================ +// Common wrapper +// ============================================================================ + +AsyncPipeReader::AsyncPipeReader(asio::io_context& IoContext) : m_Impl(std::make_unique<Impl>(IoContext)) +{ +} + +AsyncPipeReader::~AsyncPipeReader() = default; + +void +AsyncPipeReader::Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view)> DataCallback, std::function<void()> EofCallback) +{ + m_Impl->Start(std::move(Pipe), std::move(DataCallback), std::move(EofCallback)); +} + +void +AsyncPipeReader::Stop() +{ + m_Impl->Stop(); +} + +} // namespace zen diff --git a/src/zenutil/process/asyncpipereader.h b/src/zenutil/process/asyncpipereader.h new file mode 100644 index 000000000..ad2ff8455 --- /dev/null +++ b/src/zenutil/process/asyncpipereader.h @@ -0,0 +1,62 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/process.h> +#include <zencore/zencore.h> + +#include <functional> +#include <memory> +#include <string_view> + +namespace asio { +class io_context; +} + +namespace zen { + +/// Create an overlapped pipe pair suitable for async I/O on Windows. +/// +/// Unlike CreateStdoutPipe() (which creates anonymous non-overlapped pipes), +/// this creates a named pipe with FILE_FLAG_OVERLAPPED on the read end, so it +/// can be used with asio::windows::stream_handle for fully async reads. +/// The write end is inheritable and suitable for child process redirection. +/// +/// On non-Windows platforms this simply delegates to CreateStdoutPipe(). +bool CreateOverlappedStdoutPipe(StdoutPipeHandles& OutPipe); + +/// Async pipe reader for capturing child process stdout/stderr. +/// +/// Takes ownership of a pipe's read end and reads asynchronously: +/// Linux/macOS: non-blocking fd + asio::posix::stream_descriptor +/// Windows: overlapped named pipe + asio::windows::stream_handle +/// +/// On Windows the pipe must have been created with CreateOverlappedStdoutPipe() +/// for async I/O to work. Pipes from CreateStdoutPipe() will fail. +/// +/// DataCallback is invoked for each chunk read (on the io_context). +/// EofCallback is invoked when the pipe closes (child exited or pipe broken). +class AsyncPipeReader +{ +public: + explicit AsyncPipeReader(asio::io_context& IoContext); + ~AsyncPipeReader(); + + AsyncPipeReader(const AsyncPipeReader&) = delete; + AsyncPipeReader& operator=(const AsyncPipeReader&) = delete; + + /// Take ownership of the pipe read-end and start async reading. + /// The write end is closed immediately (caller should have already launched + /// the child process). DataCallback receives raw chunks. EofCallback fires + /// once when the pipe reaches EOF. + void Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view Data)> DataCallback, std::function<void()> EofCallback); + + /// Stop reading and close the pipe. Callbacks will not fire after this returns. + void Stop(); + +private: + struct Impl; + std::unique_ptr<Impl> m_Impl; +}; + +} // namespace zen diff --git a/src/zenutil/process/exitwatcher.cpp b/src/zenutil/process/exitwatcher.cpp new file mode 100644 index 000000000..cef31ebca --- /dev/null +++ b/src/zenutil/process/exitwatcher.cpp @@ -0,0 +1,294 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "exitwatcher.h" + +#include <zencore/logging.h> + +ZEN_THIRD_PARTY_INCLUDES_START + +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> +# include <asio/io_context.hpp> +# include <asio/windows/object_handle.hpp> +#elif ZEN_PLATFORM_LINUX +# include <sys/syscall.h> +# include <sys/wait.h> +# include <unistd.h> +# include <asio/io_context.hpp> +# include <asio/posix/stream_descriptor.hpp> + +# ifndef SYS_pidfd_open +# define SYS_pidfd_open 434 // x86_64 +# endif +#elif ZEN_PLATFORM_MAC +# include <sys/event.h> +# include <sys/wait.h> +# include <unistd.h> +# include <asio/io_context.hpp> +# include <asio/posix/stream_descriptor.hpp> +#endif + +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +// ============================================================================ +// Linux: pidfd_open + stream_descriptor +// ============================================================================ + +#if ZEN_PLATFORM_LINUX + +struct ProcessExitWatcher::Impl +{ + asio::io_context& m_IoContext; + std::unique_ptr<asio::posix::stream_descriptor> m_Descriptor; + int m_PidFd = -1; + int m_Pid = 0; + + explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {} + + ~Impl() { Cancel(); } + + void Watch(const ProcessHandle& Handle, std::function<void(int ExitCode)> OnExit) + { + m_Pid = Handle.Pid(); + + // pidfd_open returns an fd that becomes readable when the process exits. + // Available since Linux 5.3. + m_PidFd = static_cast<int>(syscall(SYS_pidfd_open, m_Pid, 0)); + if (m_PidFd < 0) + { + ZEN_WARN("pidfd_open failed for pid {}: {}", m_Pid, strerror(errno)); + return; + } + + m_Descriptor = std::make_unique<asio::posix::stream_descriptor>(m_IoContext, m_PidFd); + + m_Descriptor->async_wait(asio::posix::stream_descriptor::wait_read, + [this, Callback = std::move(OnExit)](const asio::error_code& Ec) { + if (Ec) + { + return; // Cancelled or error + } + + int ExitCode = -1; + int Status = 0; + // The pidfd told us the process exited. Reap it with waitpid. + if (waitpid(m_Pid, &Status, WNOHANG) > 0) + { + if (WIFEXITED(Status)) + { + ExitCode = WEXITSTATUS(Status); + } + else if (WIFSIGNALED(Status)) + { + constexpr int kSignalExitBase = 128; + ExitCode = kSignalExitBase + WTERMSIG(Status); + } + } + + Callback(ExitCode); + }); + } + + void Cancel() + { + if (m_Descriptor) + { + asio::error_code Ec; + m_Descriptor->cancel(Ec); + m_Descriptor.reset(); + // stream_descriptor closes the fd on destruction, so don't close m_PidFd separately + m_PidFd = -1; + } + else if (m_PidFd >= 0) + { + close(m_PidFd); + m_PidFd = -1; + } + } +}; + +// ============================================================================ +// Windows: object_handle::async_wait +// ============================================================================ + +#elif ZEN_PLATFORM_WINDOWS + +struct ProcessExitWatcher::Impl +{ + asio::io_context& m_IoContext; + std::unique_ptr<asio::windows::object_handle> m_ObjectHandle; + void* m_DuplicatedHandle = nullptr; + + explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {} + + ~Impl() { Cancel(); } + + void Watch(const ProcessHandle& Handle, std::function<void(int ExitCode)> OnExit) + { + // Duplicate the process handle so ASIO can take ownership independently + HANDLE SourceHandle = static_cast<HANDLE>(Handle.Handle()); + HANDLE CurrentProcess = GetCurrentProcess(); + BOOL Success = DuplicateHandle(CurrentProcess, + SourceHandle, + CurrentProcess, + reinterpret_cast<LPHANDLE>(&m_DuplicatedHandle), + SYNCHRONIZE | PROCESS_QUERY_INFORMATION, + FALSE, + 0); + + if (!Success) + { + ZEN_WARN("DuplicateHandle failed for pid {}: {}", Handle.Pid(), GetLastError()); + return; + } + + // object_handle takes ownership of the handle + m_ObjectHandle = std::make_unique<asio::windows::object_handle>(m_IoContext, m_DuplicatedHandle); + + m_ObjectHandle->async_wait([this, DupHandle = m_DuplicatedHandle, Callback = std::move(OnExit)](const asio::error_code& Ec) { + if (Ec) + { + return; + } + + DWORD ExitCode = 0; + GetExitCodeProcess(static_cast<HANDLE>(DupHandle), &ExitCode); + Callback(static_cast<int>(ExitCode)); + }); + } + + void Cancel() + { + if (m_ObjectHandle) + { + asio::error_code Ec; + m_ObjectHandle->cancel(Ec); + m_ObjectHandle.reset(); // Closes the duplicated handle + m_DuplicatedHandle = nullptr; + } + else if (m_DuplicatedHandle) + { + CloseHandle(static_cast<HANDLE>(m_DuplicatedHandle)); + m_DuplicatedHandle = nullptr; + } + } +}; + +// ============================================================================ +// macOS: kqueue EVFILT_PROC + stream_descriptor +// ============================================================================ + +#elif ZEN_PLATFORM_MAC + +struct ProcessExitWatcher::Impl +{ + asio::io_context& m_IoContext; + std::unique_ptr<asio::posix::stream_descriptor> m_Descriptor; + int m_KqueueFd = -1; + int m_Pid = 0; + + explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {} + + ~Impl() { Cancel(); } + + void Watch(const ProcessHandle& Handle, std::function<void(int ExitCode)> OnExit) + { + m_Pid = Handle.Pid(); + + m_KqueueFd = kqueue(); + if (m_KqueueFd < 0) + { + ZEN_WARN("kqueue() failed for pid {}: {}", m_Pid, strerror(errno)); + return; + } + + // Register interest in the process exit event + struct kevent Change; + EV_SET(&Change, static_cast<uintptr_t>(m_Pid), EVFILT_PROC, EV_ADD | EV_ONESHOT, NOTE_EXIT, 0, nullptr); + + if (kevent(m_KqueueFd, &Change, 1, nullptr, 0, nullptr) < 0) + { + ZEN_WARN("kevent register failed for pid {}: {}", m_Pid, strerror(errno)); + close(m_KqueueFd); + m_KqueueFd = -1; + return; + } + + m_Descriptor = std::make_unique<asio::posix::stream_descriptor>(m_IoContext, m_KqueueFd); + + m_Descriptor->async_wait(asio::posix::stream_descriptor::wait_read, + [this, Callback = std::move(OnExit)](const asio::error_code& Ec) { + if (Ec) + { + return; + } + + // Drain the kqueue event + struct kevent Event; + struct timespec Timeout = {0, 0}; + kevent(m_KqueueFd, nullptr, 0, &Event, 1, &Timeout); + + int ExitCode = -1; + int Status = 0; + if (waitpid(m_Pid, &Status, WNOHANG) > 0) + { + if (WIFEXITED(Status)) + { + ExitCode = WEXITSTATUS(Status); + } + else if (WIFSIGNALED(Status)) + { + constexpr int kSignalExitBase = 128; + ExitCode = kSignalExitBase + WTERMSIG(Status); + } + } + + Callback(ExitCode); + }); + } + + void Cancel() + { + if (m_Descriptor) + { + asio::error_code Ec; + m_Descriptor->cancel(Ec); + m_Descriptor.reset(); + // stream_descriptor closes the kqueue fd on destruction + m_KqueueFd = -1; + } + else if (m_KqueueFd >= 0) + { + close(m_KqueueFd); + m_KqueueFd = -1; + } + } +}; + +#endif + +// ============================================================================ +// Common wrapper (delegates to Impl) +// ============================================================================ + +ProcessExitWatcher::ProcessExitWatcher(asio::io_context& IoContext) : m_Impl(std::make_unique<Impl>(IoContext)) +{ +} + +ProcessExitWatcher::~ProcessExitWatcher() = default; + +void +ProcessExitWatcher::Watch(const ProcessHandle& Handle, std::function<void(int ExitCode)> OnExit) +{ + m_Impl->Watch(Handle, std::move(OnExit)); +} + +void +ProcessExitWatcher::Cancel() +{ + m_Impl->Cancel(); +} + +} // namespace zen diff --git a/src/zenutil/process/exitwatcher.h b/src/zenutil/process/exitwatcher.h new file mode 100644 index 000000000..24906d7d0 --- /dev/null +++ b/src/zenutil/process/exitwatcher.h @@ -0,0 +1,48 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/process.h> +#include <zencore/zencore.h> + +#include <functional> +#include <memory> + +namespace asio { +class io_context; +} + +namespace zen { + +/// Async process exit watcher. +/// +/// Uses platform-specific mechanisms for scalable, non-polling exit detection: +/// Linux: pidfd_open() + asio::posix::stream_descriptor +/// Windows: asio::windows::object_handle +/// macOS: kqueue EVFILT_PROC/NOTE_EXIT + asio::posix::stream_descriptor +/// +/// The callback is invoked exactly once when the process exits, posted to the +/// io_context. Call Cancel() to suppress the callback. +class ProcessExitWatcher +{ +public: + explicit ProcessExitWatcher(asio::io_context& IoContext); + ~ProcessExitWatcher(); + + ProcessExitWatcher(const ProcessExitWatcher&) = delete; + ProcessExitWatcher& operator=(const ProcessExitWatcher&) = delete; + + /// Begin watching the given process. The callback is posted to the io_context + /// when the process exits. Only one Watch() may be active at a time. + void Watch(const ProcessHandle& Handle, std::function<void(int ExitCode)> OnExit); + + /// Cancel any outstanding watch. The callback will not be invoked after this + /// returns. Safe to call if no watch is active. + void Cancel(); + +private: + struct Impl; + std::unique_ptr<Impl> m_Impl; +}; + +} // namespace zen diff --git a/src/zenutil/process/subprocessmanager.cpp b/src/zenutil/process/subprocessmanager.cpp new file mode 100644 index 000000000..3a91b0a61 --- /dev/null +++ b/src/zenutil/process/subprocessmanager.cpp @@ -0,0 +1,1811 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/process/subprocessmanager.h> + +#include "asyncpipereader.h" +#include "exitwatcher.h" + +#include <zencore/logging.h> +#include <zencore/thread.h> +#include <zencore/timer.h> +#include <zencore/trace.h> + +#include <algorithm> +#include <atomic> +#include <numeric> +#include <random> +#include <string> +#include <unordered_map> +#include <vector> + +ZEN_THIRD_PARTY_INCLUDES_START +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> +#else +# include <csignal> +#endif +#include <asio/io_context.hpp> +#include <asio/steady_timer.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +// ============================================================================ +// ManagedProcess::Impl +// ============================================================================ + +struct ManagedProcess::Impl +{ + asio::io_context& m_IoContext; + ProcessHandle m_Handle; + ProcessExitWatcher m_ExitWatcher; + ProcessExitCallback m_ExitCallback; + std::atomic<bool> m_Exited{false}; + + // Stdout capture + std::unique_ptr<AsyncPipeReader> m_StdoutReader; + ProcessDataCallback m_StdoutCallback; + mutable RwLock m_StdoutLock; + std::string m_CapturedStdout; + + // Stderr capture + std::unique_ptr<AsyncPipeReader> m_StderrReader; + ProcessDataCallback m_StderrCallback; + mutable RwLock m_StderrLock; + std::string m_CapturedStderr; + + // Metrics + ProcessMetrics m_LastMetrics; + std::atomic<float> m_CpuUsagePercent{-1.0f}; + uint64_t m_PrevUserTimeMs = 0; + uint64_t m_PrevKernelTimeMs = 0; + uint64_t m_PrevSampleTicks = 0; + + // User tag + std::string m_Tag; + + explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext), m_ExitWatcher(IoContext) {} + + void OnStdoutData(ManagedProcess& Self, std::string_view Data) + { + if (m_StdoutCallback) + { + m_StdoutCallback(Self, Data); + } + else + { + RwLock::ExclusiveLockScope $(m_StdoutLock); + m_CapturedStdout.append(Data); + } + } + + void OnStderrData(ManagedProcess& Self, std::string_view Data) + { + if (m_StderrCallback) + { + m_StderrCallback(Self, Data); + } + else + { + RwLock::ExclusiveLockScope $(m_StderrLock); + m_CapturedStderr.append(Data); + } + } + + void SampleMetrics() + { + if (m_Exited.load()) + { + return; + } + + ProcessMetrics Metrics; + GetProcessMetrics(m_Handle, Metrics); + + uint64_t NowTicks = GetHifreqTimerValue(); + + if (m_PrevSampleTicks > 0) + { + uint64_t ElapsedMs = Stopwatch::GetElapsedTimeMs(NowTicks - m_PrevSampleTicks); + uint64_t DeltaCpuTimeMs = (Metrics.UserTimeMs + Metrics.KernelTimeMs) - (m_PrevUserTimeMs + m_PrevKernelTimeMs); + if (ElapsedMs > 0) + { + m_CpuUsagePercent.store(static_cast<float>(static_cast<double>(DeltaCpuTimeMs) / ElapsedMs * 100.0)); + } + } + + m_PrevUserTimeMs = Metrics.UserTimeMs; + m_PrevKernelTimeMs = Metrics.KernelTimeMs; + m_PrevSampleTicks = NowTicks; + m_LastMetrics = Metrics; + } + + [[nodiscard]] int Pid() const { return m_Handle.Pid(); } + + [[nodiscard]] bool IsRunning() const { return !m_Exited.load() && m_Handle.IsValid() && m_Handle.IsRunning(); } + + [[nodiscard]] std::string GetCapturedStdout() const + { + RwLock::SharedLockScope $(m_StdoutLock); + return m_CapturedStdout; + } + + [[nodiscard]] std::string GetCapturedStderr() const + { + RwLock::SharedLockScope $(m_StderrLock); + return m_CapturedStderr; + } + + void CancelAll() + { + m_ExitWatcher.Cancel(); + if (m_StdoutReader) + { + m_StdoutReader->Stop(); + } + if (m_StderrReader) + { + m_StderrReader->Stop(); + } + } +}; + +// ============================================================================ +// ManagedProcess +// ============================================================================ + +ManagedProcess::ManagedProcess(std::unique_ptr<Impl> InImpl) : m_Impl(std::move(InImpl)) +{ +} + +ManagedProcess::~ManagedProcess() +{ + if (m_Impl) + { + m_Impl->CancelAll(); + } +} + +int +ManagedProcess::Pid() const +{ + return m_Impl->Pid(); +} + +bool +ManagedProcess::IsRunning() const +{ + return m_Impl->IsRunning(); +} + +const ProcessHandle& +ManagedProcess::GetHandle() const +{ + return m_Impl->m_Handle; +} + +ProcessMetrics +ManagedProcess::GetLatestMetrics() const +{ + return m_Impl->m_LastMetrics; +} + +float +ManagedProcess::GetCpuUsagePercent() const +{ + return m_Impl->m_CpuUsagePercent.load(); +} + +void +ManagedProcess::SetStdoutCallback(ProcessDataCallback Callback) +{ + m_Impl->m_StdoutCallback = std::move(Callback); +} + +void +ManagedProcess::SetStderrCallback(ProcessDataCallback Callback) +{ + m_Impl->m_StderrCallback = std::move(Callback); +} + +std::string +ManagedProcess::GetCapturedStdout() const +{ + return m_Impl->GetCapturedStdout(); +} + +std::string +ManagedProcess::GetCapturedStderr() const +{ + return m_Impl->GetCapturedStderr(); +} + +bool +ManagedProcess::Kill() +{ + return m_Impl->m_Handle.Kill(); +} + +bool +ManagedProcess::Terminate(int ExitCode) +{ + return m_Impl->m_Handle.Terminate(ExitCode); +} + +void +ManagedProcess::SetTag(std::string Tag) +{ + m_Impl->m_Tag = std::move(Tag); +} + +std::string_view +ManagedProcess::GetTag() const +{ + return m_Impl->m_Tag; +} + +// ============================================================================ +// SubprocessManager::Impl +// ============================================================================ + +struct SubprocessManager::Impl +{ + asio::io_context& m_IoContext; + SubprocessManagerConfig m_Config; + + // Ungrouped processes + mutable RwLock m_Lock; + std::unordered_map<int, std::unique_ptr<ManagedProcess>> m_Processes; + + // Groups + mutable RwLock m_GroupsLock; + std::unordered_map<std::string, std::unique_ptr<ProcessGroup>> m_Groups; + + // Cross-group metrics index: all pids (grouped + ungrouped) for round-robin sampling + mutable RwLock m_MetricsLock; + std::unordered_map<int, ManagedProcess*> m_AllProcesses; // non-owning + std::vector<int> m_KeyOrder; + size_t m_NextSampleIndex = 0; + + ProcessDataCallback m_DefaultStdoutCallback; + ProcessDataCallback m_DefaultStderrCallback; + + std::unique_ptr<asio::steady_timer> m_MetricsTimer; + std::atomic<bool> m_Running{true}; + + explicit Impl(asio::io_context& IoContext, SubprocessManagerConfig Config); + ~Impl(); + + ManagedProcess* AddProcess(std::unique_ptr<ManagedProcess> Process); + void RegisterForMetrics(int Pid, ManagedProcess* Ptr); + void UnregisterFromMetrics(int Pid); + ManagedProcess* FindProcess(int Pid) const; + + void SetupExitWatcher(ManagedProcess* Proc, ProcessExitCallback OnExit); + void SetupStdoutReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe); + void SetupStderrReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe); + + ManagedProcess* Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit); + ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit); + void Remove(int Pid); + void RemoveAll(); + + void SetDefaultStdoutCallback(ProcessDataCallback Callback) { m_DefaultStdoutCallback = std::move(Callback); } + void SetDefaultStderrCallback(ProcessDataCallback Callback) { m_DefaultStderrCallback = std::move(Callback); } + + void EnqueueMetricsTimer(); + void SampleBatch(); + std::vector<TrackedProcessEntry> GetMetricsSnapshot() const; + AggregateProcessMetrics GetAggregateMetrics() const; + [[nodiscard]] size_t GetProcessCount() const; + void Enumerate(std::function<void(const ManagedProcess&)> Callback) const; + + ProcessGroup* CreateGroup(std::string Name); + void DestroyGroup(std::string_view Name); + ProcessGroup* FindGroup(std::string_view Name) const; + void EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const; +}; + +// ============================================================================ +// SubprocessManager::Impl method definitions +// ============================================================================ + +SubprocessManager::Impl::Impl(asio::io_context& IoContext, SubprocessManagerConfig Config) : m_IoContext(IoContext), m_Config(Config) +{ + if (m_Config.MetricsSampleIntervalMs > 0) + { + m_MetricsTimer = std::make_unique<asio::steady_timer>(IoContext); + EnqueueMetricsTimer(); + } +} + +SubprocessManager::Impl::~Impl() +{ + m_Running = false; + if (m_MetricsTimer) + { + m_MetricsTimer->cancel(); + } + + // Destroy groups first (they reference m_Manager back to us) + { + RwLock::ExclusiveLockScope $(m_GroupsLock); + m_Groups.clear(); + } + + RemoveAll(); +} + +ManagedProcess* +SubprocessManager::Impl::AddProcess(std::unique_ptr<ManagedProcess> Process) +{ + int Pid = Process->Pid(); + ManagedProcess* Ptr = Process.get(); + + { + RwLock::ExclusiveLockScope $(m_Lock); + m_Processes[Pid] = std::move(Process); + } + + RegisterForMetrics(Pid, Ptr); + return Ptr; +} + +void +SubprocessManager::Impl::RegisterForMetrics(int Pid, ManagedProcess* Ptr) +{ + RwLock::ExclusiveLockScope $(m_MetricsLock); + m_AllProcesses[Pid] = Ptr; + m_KeyOrder.push_back(Pid); +} + +void +SubprocessManager::Impl::UnregisterFromMetrics(int Pid) +{ + RwLock::ExclusiveLockScope $(m_MetricsLock); + m_AllProcesses.erase(Pid); + m_KeyOrder.erase(std::remove(m_KeyOrder.begin(), m_KeyOrder.end(), Pid), m_KeyOrder.end()); + if (m_NextSampleIndex >= m_KeyOrder.size()) + { + m_NextSampleIndex = 0; + } +} + +ManagedProcess* +SubprocessManager::Impl::FindProcess(int Pid) const +{ + RwLock::SharedLockScope $(m_MetricsLock); + auto It = m_AllProcesses.find(Pid); + if (It != m_AllProcesses.end()) + { + return It->second; + } + return nullptr; +} + +void +SubprocessManager::Impl::SetupExitWatcher(ManagedProcess* Proc, ProcessExitCallback OnExit) +{ + int Pid = Proc->Pid(); + + Proc->m_Impl->m_ExitWatcher.Watch(Proc->m_Impl->m_Handle, [this, Pid, Callback = std::move(OnExit)](int ExitCode) { + ManagedProcess* Found = FindProcess(Pid); + + if (Found) + { + Found->m_Impl->m_Exited.store(true); + Callback(*Found, ExitCode); + } + }); +} + +void +SubprocessManager::Impl::SetupStdoutReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe) +{ + int Pid = Proc->Pid(); + Proc->m_Impl->m_StdoutReader = std::make_unique<AsyncPipeReader>(m_IoContext); + Proc->m_Impl->m_StdoutReader->Start( + std::move(Pipe), + [this, Pid](std::string_view Data) { + ManagedProcess* Found = FindProcess(Pid); + if (Found) + { + if (Found->m_Impl->m_StdoutCallback) + { + Found->m_Impl->m_StdoutCallback(*Found, Data); + } + else if (m_DefaultStdoutCallback) + { + m_DefaultStdoutCallback(*Found, Data); + } + else + { + Found->m_Impl->OnStdoutData(*Found, Data); + } + } + }, + [] {}); +} + +void +SubprocessManager::Impl::SetupStderrReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe) +{ + int Pid = Proc->Pid(); + Proc->m_Impl->m_StderrReader = std::make_unique<AsyncPipeReader>(m_IoContext); + Proc->m_Impl->m_StderrReader->Start( + std::move(Pipe), + [this, Pid](std::string_view Data) { + ManagedProcess* Found = FindProcess(Pid); + if (Found) + { + if (Found->m_Impl->m_StderrCallback) + { + Found->m_Impl->m_StderrCallback(*Found, Data); + } + else if (m_DefaultStderrCallback) + { + m_DefaultStderrCallback(*Found, Data); + } + else + { + Found->m_Impl->OnStderrData(*Found, Data); + } + } + }, + [] {}); +} + +ManagedProcess* +SubprocessManager::Impl::Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit) +{ + bool HasStdout = Options.StdoutPipe != nullptr; + bool HasStderr = Options.StderrPipe != nullptr; + + CreateProcResult Result = CreateProc(Executable, CommandLine, Options); + + auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_IoContext); +#if ZEN_PLATFORM_WINDOWS + ImplPtr->m_Handle.Initialize(Result); +#else + ImplPtr->m_Handle.Initialize(static_cast<int>(Result)); +#endif + + auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr))); + + ManagedProcess* Ptr = AddProcess(std::move(Proc)); + SetupExitWatcher(Ptr, std::move(OnExit)); + + if (HasStdout) + { + SetupStdoutReader(Ptr, std::move(*Options.StdoutPipe)); + } + if (HasStderr) + { + SetupStderrReader(Ptr, std::move(*Options.StderrPipe)); + } + + return Ptr; +} + +ManagedProcess* +SubprocessManager::Impl::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit) +{ + int Pid = Handle.Pid(); + + auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_IoContext); + ImplPtr->m_Handle.Initialize(Pid); + + // Reset the original handle so caller doesn't double-close + Handle.Reset(); + + auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr))); + + ManagedProcess* Ptr = AddProcess(std::move(Proc)); + SetupExitWatcher(Ptr, std::move(OnExit)); + + return Ptr; +} + +void +SubprocessManager::Impl::Remove(int Pid) +{ + UnregisterFromMetrics(Pid); + + RwLock::ExclusiveLockScope $(m_Lock); + auto It = m_Processes.find(Pid); + if (It != m_Processes.end()) + { + It->second->m_Impl->CancelAll(); + m_Processes.erase(It); + } +} + +void +SubprocessManager::Impl::RemoveAll() +{ + { + RwLock::ExclusiveLockScope $(m_Lock); + for (auto& [Pid, Proc] : m_Processes) + { + Proc->m_Impl->CancelAll(); + } + m_Processes.clear(); + } + + { + RwLock::ExclusiveLockScope $(m_MetricsLock); + m_AllProcesses.clear(); + m_KeyOrder.clear(); + m_NextSampleIndex = 0; + } +} + +void +SubprocessManager::Impl::EnqueueMetricsTimer() +{ + if (!m_MetricsTimer || !m_Running.load()) + { + return; + } + + m_MetricsTimer->expires_after(std::chrono::milliseconds(m_Config.MetricsSampleIntervalMs)); + m_MetricsTimer->async_wait([this](const asio::error_code& Ec) { + if (Ec || !m_Running.load()) + { + return; + } + + SampleBatch(); + EnqueueMetricsTimer(); + }); +} + +void +SubprocessManager::Impl::SampleBatch() +{ + RwLock::SharedLockScope $(m_MetricsLock); + + if (m_KeyOrder.empty()) + { + return; + } + + size_t Remaining = std::min(static_cast<size_t>(m_Config.MetricsBatchSize), m_KeyOrder.size()); + + while (Remaining > 0) + { + if (m_NextSampleIndex >= m_KeyOrder.size()) + { + m_NextSampleIndex = 0; + } + + int Pid = m_KeyOrder[m_NextSampleIndex]; + auto It = m_AllProcesses.find(Pid); + + if (It != m_AllProcesses.end()) + { + It->second->m_Impl->SampleMetrics(); + } + + m_NextSampleIndex++; + Remaining--; + } +} + +std::vector<TrackedProcessEntry> +SubprocessManager::Impl::GetMetricsSnapshot() const +{ + std::vector<TrackedProcessEntry> Result; + + RwLock::SharedLockScope $(m_MetricsLock); + Result.reserve(m_AllProcesses.size()); + + for (const auto& [Pid, Proc] : m_AllProcesses) + { + TrackedProcessEntry Entry; + Entry.Pid = Pid; + Entry.Metrics = Proc->m_Impl->m_LastMetrics; + Entry.CpuUsagePercent = Proc->m_Impl->m_CpuUsagePercent.load(); + Result.push_back(std::move(Entry)); + } + + return Result; +} + +AggregateProcessMetrics +SubprocessManager::Impl::GetAggregateMetrics() const +{ + AggregateProcessMetrics Agg; + + RwLock::SharedLockScope $(m_MetricsLock); + + for (const auto& [Pid, Proc] : m_AllProcesses) + { + const ProcessMetrics& M = Proc->m_Impl->m_LastMetrics; + Agg.TotalWorkingSetSize += M.WorkingSetSize; + Agg.TotalPeakWorkingSetSize += M.PeakWorkingSetSize; + Agg.TotalUserTimeMs += M.UserTimeMs; + Agg.TotalKernelTimeMs += M.KernelTimeMs; + Agg.ProcessCount++; + } + + return Agg; +} + +size_t +SubprocessManager::Impl::GetProcessCount() const +{ + RwLock::SharedLockScope $(m_MetricsLock); + return m_AllProcesses.size(); +} + +void +SubprocessManager::Impl::Enumerate(std::function<void(const ManagedProcess&)> Callback) const +{ + RwLock::SharedLockScope $(m_MetricsLock); + for (const auto& [Pid, Proc] : m_AllProcesses) + { + Callback(*Proc); + } +} + +ProcessGroup* +SubprocessManager::Impl::CreateGroup(std::string Name) +{ + auto GroupImpl = std::make_unique<ProcessGroup::Impl>(std::move(Name), *this, m_IoContext); + ProcessGroup* Ptr = nullptr; + + auto Group = std::unique_ptr<ProcessGroup>(new ProcessGroup(std::move(GroupImpl))); + Ptr = Group.get(); + + RwLock::ExclusiveLockScope $(m_GroupsLock); + m_Groups[std::string(Ptr->GetName())] = std::move(Group); + + return Ptr; +} + +void +SubprocessManager::Impl::DestroyGroup(std::string_view Name) +{ + RwLock::ExclusiveLockScope $(m_GroupsLock); + auto It = m_Groups.find(std::string(Name)); + if (It != m_Groups.end()) + { + It->second->KillAll(); + m_Groups.erase(It); + } +} + +ProcessGroup* +SubprocessManager::Impl::FindGroup(std::string_view Name) const +{ + RwLock::SharedLockScope $(m_GroupsLock); + auto It = m_Groups.find(std::string(Name)); + if (It != m_Groups.end()) + { + return It->second.get(); + } + return nullptr; +} + +void +SubprocessManager::Impl::EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const +{ + RwLock::SharedLockScope $(m_GroupsLock); + for (const auto& [Name, Group] : m_Groups) + { + Callback(*Group); + } +} + +// ============================================================================ +// SubprocessManager +// ============================================================================ + +SubprocessManager::SubprocessManager(asio::io_context& IoContext, SubprocessManagerConfig Config) +: m_Impl(std::make_unique<Impl>(IoContext, Config)) +{ +} + +SubprocessManager::~SubprocessManager() = default; + +ManagedProcess* +SubprocessManager::Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit) +{ + ZEN_TRACE_CPU("SubprocessManager::Spawn"); + return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit)); +} + +ManagedProcess* +SubprocessManager::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit) +{ + ZEN_TRACE_CPU("SubprocessManager::Adopt"); + return m_Impl->Adopt(std::move(Handle), std::move(OnExit)); +} + +void +SubprocessManager::Remove(int Pid) +{ + ZEN_TRACE_CPU("SubprocessManager::Remove"); + m_Impl->Remove(Pid); +} + +void +SubprocessManager::RemoveAll() +{ + ZEN_TRACE_CPU("SubprocessManager::RemoveAll"); + m_Impl->RemoveAll(); +} + +void +SubprocessManager::SetDefaultStdoutCallback(ProcessDataCallback Callback) +{ + m_Impl->SetDefaultStdoutCallback(std::move(Callback)); +} + +void +SubprocessManager::SetDefaultStderrCallback(ProcessDataCallback Callback) +{ + m_Impl->SetDefaultStderrCallback(std::move(Callback)); +} + +std::vector<TrackedProcessEntry> +SubprocessManager::GetMetricsSnapshot() const +{ + return m_Impl->GetMetricsSnapshot(); +} + +AggregateProcessMetrics +SubprocessManager::GetAggregateMetrics() const +{ + return m_Impl->GetAggregateMetrics(); +} + +size_t +SubprocessManager::GetProcessCount() const +{ + return m_Impl->GetProcessCount(); +} + +void +SubprocessManager::Enumerate(std::function<void(const ManagedProcess&)> Callback) const +{ + m_Impl->Enumerate(std::move(Callback)); +} + +ProcessGroup* +SubprocessManager::CreateGroup(std::string Name) +{ + ZEN_TRACE_CPU("SubprocessManager::CreateGroup"); + return m_Impl->CreateGroup(std::move(Name)); +} + +void +SubprocessManager::DestroyGroup(std::string_view Name) +{ + ZEN_TRACE_CPU("SubprocessManager::DestroyGroup"); + m_Impl->DestroyGroup(Name); +} + +ProcessGroup* +SubprocessManager::FindGroup(std::string_view Name) const +{ + return m_Impl->FindGroup(Name); +} + +void +SubprocessManager::EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const +{ + m_Impl->EnumerateGroups(std::move(Callback)); +} + +// ============================================================================ +// ProcessGroup::Impl +// ============================================================================ + +struct ProcessGroup::Impl +{ + std::string m_Name; + SubprocessManager::Impl& m_Manager; + asio::io_context& m_IoContext; + + mutable RwLock m_Lock; + std::unordered_map<int, std::unique_ptr<ManagedProcess>> m_Processes; + +#if ZEN_PLATFORM_WINDOWS + JobObject m_JobObject; +#else + int m_Pgid = 0; +#endif + + Impl(std::string Name, SubprocessManager::Impl& Manager, asio::io_context& IoContext); + ~Impl(); + + ManagedProcess* AddProcess(std::unique_ptr<ManagedProcess> Process); + + ManagedProcess* Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit); + ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit); + void Remove(int Pid); + void KillAll(); + + AggregateProcessMetrics GetAggregateMetrics() const; + std::vector<TrackedProcessEntry> GetMetricsSnapshot() const; + [[nodiscard]] size_t GetProcessCount() const; + void Enumerate(std::function<void(const ManagedProcess&)> Callback) const; +}; + +// ============================================================================ +// ProcessGroup::Impl method definitions +// ============================================================================ + +ProcessGroup::Impl::Impl(std::string Name, SubprocessManager::Impl& Manager, asio::io_context& IoContext) +: m_Name(std::move(Name)) +, m_Manager(Manager) +, m_IoContext(IoContext) +{ +#if ZEN_PLATFORM_WINDOWS + m_JobObject.Initialize(); +#endif +} + +ProcessGroup::Impl::~Impl() +{ + KillAll(); +} + +ManagedProcess* +ProcessGroup::Impl::AddProcess(std::unique_ptr<ManagedProcess> Process) +{ + int Pid = Process->Pid(); + ManagedProcess* Ptr = Process.get(); + + { + RwLock::ExclusiveLockScope $(m_Lock); + m_Processes[Pid] = std::move(Process); + } + + m_Manager.RegisterForMetrics(Pid, Ptr); + return Ptr; +} + +ManagedProcess* +ProcessGroup::Impl::Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit) +{ + bool HasStdout = Options.StdoutPipe != nullptr; + bool HasStderr = Options.StderrPipe != nullptr; + +#if ZEN_PLATFORM_WINDOWS + if (m_JobObject.IsValid()) + { + Options.AssignToJob = &m_JobObject; + } +#else + if (m_Pgid > 0) + { + Options.ProcessGroupId = m_Pgid; + } +#endif + + CreateProcResult Result = CreateProc(Executable, CommandLine, Options); + + auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_IoContext); +#if ZEN_PLATFORM_WINDOWS + ImplPtr->m_Handle.Initialize(Result); +#else + int Pid = static_cast<int>(Result); + ImplPtr->m_Handle.Initialize(Pid); + + // First process becomes the group leader + if (m_Pgid == 0) + { + m_Pgid = Pid; + } +#endif + + auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr))); + + ManagedProcess* Ptr = AddProcess(std::move(Proc)); + m_Manager.SetupExitWatcher(Ptr, std::move(OnExit)); + + if (HasStdout) + { + m_Manager.SetupStdoutReader(Ptr, std::move(*Options.StdoutPipe)); + } + if (HasStderr) + { + m_Manager.SetupStderrReader(Ptr, std::move(*Options.StderrPipe)); + } + + return Ptr; +} + +ManagedProcess* +ProcessGroup::Impl::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit) +{ + int Pid = Handle.Pid(); + + auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_IoContext); + ImplPtr->m_Handle.Initialize(Pid); + Handle.Reset(); + +#if ZEN_PLATFORM_WINDOWS + if (m_JobObject.IsValid()) + { + m_JobObject.AssignProcess(ImplPtr->m_Handle.Handle()); + } +#endif + + auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr))); + + ManagedProcess* Ptr = AddProcess(std::move(Proc)); + m_Manager.SetupExitWatcher(Ptr, std::move(OnExit)); + + return Ptr; +} + +void +ProcessGroup::Impl::Remove(int Pid) +{ + m_Manager.UnregisterFromMetrics(Pid); + + RwLock::ExclusiveLockScope $(m_Lock); + auto It = m_Processes.find(Pid); + if (It != m_Processes.end()) + { + It->second->m_Impl->CancelAll(); + m_Processes.erase(It); + } +} + +void +ProcessGroup::Impl::KillAll() +{ +#if ZEN_PLATFORM_WINDOWS + if (m_JobObject.IsValid()) + { + TerminateJobObject(static_cast<HANDLE>(m_JobObject.Handle()), 1); + } +#else + if (m_Pgid > 0) + { + kill(-m_Pgid, SIGTERM); + } +#endif + // Also kill individually as fallback and clean up + RwLock::ExclusiveLockScope $(m_Lock); + for (auto& [Pid, Proc] : m_Processes) + { + if (Proc->IsRunning()) + { + Proc->Kill(); + } + m_Manager.UnregisterFromMetrics(Pid); + Proc->m_Impl->CancelAll(); + } + m_Processes.clear(); +} + +AggregateProcessMetrics +ProcessGroup::Impl::GetAggregateMetrics() const +{ + AggregateProcessMetrics Agg; + + RwLock::SharedLockScope $(m_Lock); + + for (const auto& [Pid, Proc] : m_Processes) + { + const ProcessMetrics& M = Proc->m_Impl->m_LastMetrics; + Agg.TotalWorkingSetSize += M.WorkingSetSize; + Agg.TotalPeakWorkingSetSize += M.PeakWorkingSetSize; + Agg.TotalUserTimeMs += M.UserTimeMs; + Agg.TotalKernelTimeMs += M.KernelTimeMs; + Agg.ProcessCount++; + } + + return Agg; +} + +std::vector<TrackedProcessEntry> +ProcessGroup::Impl::GetMetricsSnapshot() const +{ + std::vector<TrackedProcessEntry> Result; + + RwLock::SharedLockScope $(m_Lock); + Result.reserve(m_Processes.size()); + + for (const auto& [Pid, Proc] : m_Processes) + { + TrackedProcessEntry Entry; + Entry.Pid = Pid; + Entry.Metrics = Proc->m_Impl->m_LastMetrics; + Entry.CpuUsagePercent = Proc->m_Impl->m_CpuUsagePercent.load(); + Result.push_back(std::move(Entry)); + } + + return Result; +} + +size_t +ProcessGroup::Impl::GetProcessCount() const +{ + RwLock::SharedLockScope $(m_Lock); + return m_Processes.size(); +} + +void +ProcessGroup::Impl::Enumerate(std::function<void(const ManagedProcess&)> Callback) const +{ + RwLock::SharedLockScope $(m_Lock); + for (const auto& [Pid, Proc] : m_Processes) + { + Callback(*Proc); + } +} + +// ============================================================================ +// ProcessGroup +// ============================================================================ + +ProcessGroup::ProcessGroup(std::unique_ptr<Impl> InImpl) : m_Impl(std::move(InImpl)) +{ +} + +ProcessGroup::~ProcessGroup() = default; + +std::string_view +ProcessGroup::GetName() const +{ + return m_Impl->m_Name; +} + +ManagedProcess* +ProcessGroup::Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit) +{ + ZEN_TRACE_CPU("ProcessGroup::Spawn"); + return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit)); +} + +ManagedProcess* +ProcessGroup::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit) +{ + ZEN_TRACE_CPU("ProcessGroup::Adopt"); + return m_Impl->Adopt(std::move(Handle), std::move(OnExit)); +} + +void +ProcessGroup::Remove(int Pid) +{ + ZEN_TRACE_CPU("ProcessGroup::Remove"); + m_Impl->Remove(Pid); +} + +void +ProcessGroup::KillAll() +{ + ZEN_TRACE_CPU("ProcessGroup::KillAll"); + m_Impl->KillAll(); +} + +AggregateProcessMetrics +ProcessGroup::GetAggregateMetrics() const +{ + return m_Impl->GetAggregateMetrics(); +} + +std::vector<TrackedProcessEntry> +ProcessGroup::GetMetricsSnapshot() const +{ + return m_Impl->GetMetricsSnapshot(); +} + +size_t +ProcessGroup::GetProcessCount() const +{ + return m_Impl->GetProcessCount(); +} + +void +ProcessGroup::Enumerate(std::function<void(const ManagedProcess&)> Callback) const +{ + m_Impl->Enumerate(std::move(Callback)); +} + +} // namespace zen + +// ============================================================================ +// Tests +// ============================================================================ + +#if ZEN_WITH_TESTS + +# include <zencore/testing.h> + +# include <chrono> + +ZEN_THIRD_PARTY_INCLUDES_START +# include <asio/io_context.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +using namespace zen; +using namespace std::literals; + +void +zen::subprocessmanager_forcelink() +{ +} + +namespace { + +std::filesystem::path +GetAppStubPath() +{ + std::error_code Ec; + std::filesystem::path SelfPath = GetProcessExecutablePath(zen::GetCurrentProcessId(), Ec); + return SelfPath.parent_path() / "zentest-appstub" ZEN_EXE_SUFFIX_LITERAL; +} + +} // namespace + +TEST_SUITE_BEGIN("util.subprocessmanager"); + +TEST_CASE("SubprocessManager.SpawnAndDetectExit") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -f=42"; + + int ReceivedExitCode = -1; + bool CallbackFired = false; + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int ExitCode) { + ReceivedExitCode = ExitCode; + CallbackFired = true; + }); + + IoContext.run_for(5s); + + CHECK(CallbackFired); + CHECK(ReceivedExitCode == 42); +} + +TEST_CASE("SubprocessManager.SpawnAndDetectCleanExit") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string(); + + int ReceivedExitCode = -1; + bool CallbackFired = false; + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int ExitCode) { + ReceivedExitCode = ExitCode; + CallbackFired = true; + }); + + IoContext.run_for(5s); + + CHECK(CallbackFired); + CHECK(ReceivedExitCode == 0); +} + +TEST_CASE("SubprocessManager.StdoutCapture") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -echo=hello_world"; + + StdoutPipeHandles StdoutPipe; + REQUIRE(CreateOverlappedStdoutPipe(StdoutPipe)); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + Options.StdoutPipe = &StdoutPipe; + + bool Exited = false; + + ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); + + IoContext.run_for(5s); + + CHECK(Exited); + std::string Captured = Proc->GetCapturedStdout(); + CHECK(Captured.find("hello_world") != std::string::npos); +} + +TEST_CASE("SubprocessManager.StderrCapture") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -echoerr=error_msg"; + + StdoutPipeHandles StdoutPipe; + StdoutPipeHandles StderrPipe; + REQUIRE(CreateOverlappedStdoutPipe(StdoutPipe)); + REQUIRE(CreateOverlappedStdoutPipe(StderrPipe)); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + Options.StdoutPipe = &StdoutPipe; + Options.StderrPipe = &StderrPipe; + + bool Exited = false; + + ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); + + IoContext.run_for(5s); + + CHECK(Exited); + std::string CapturedErr = Proc->GetCapturedStderr(); + CHECK(CapturedErr.find("error_msg") != std::string::npos); +} + +TEST_CASE("SubprocessManager.StdoutCallback") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -echo=callback_test"; + + StdoutPipeHandles StdoutPipe; + REQUIRE(CreateOverlappedStdoutPipe(StdoutPipe)); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + Options.StdoutPipe = &StdoutPipe; + + std::string ReceivedData; + bool Exited = false; + + ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); + + Proc->SetStdoutCallback([&](ManagedProcess&, std::string_view Data) { ReceivedData.append(Data); }); + + IoContext.run_for(5s); + + CHECK(Exited); + CHECK(ReceivedData.find("callback_test") != std::string::npos); + // When a callback is set, accumulated buffer should be empty + CHECK(Proc->GetCapturedStdout().empty()); +} + +TEST_CASE("SubprocessManager.MetricsSampling") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 100, .MetricsBatchSize = 16}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -t=2"; + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + bool Exited = false; + + ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); + + // Run for enough time to get metrics samples + IoContext.run_for(1s); + + ProcessMetrics Metrics = Proc->GetLatestMetrics(); + CHECK(Metrics.WorkingSetSize > 0); + + auto Snapshot = Manager.GetMetricsSnapshot(); + CHECK(Snapshot.size() == 1); + + // Let it finish + IoContext.run_for(3s); + CHECK(Exited); +} + +TEST_CASE("SubprocessManager.RemoveWhileRunning") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -t=10"; + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + bool CallbackFired = false; + + ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { CallbackFired = true; }); + + int Pid = Proc->Pid(); + + // Let it start + IoContext.run_for(100ms); + + // Remove without killing — callback should NOT fire after this + Manager.Remove(Pid); + + IoContext.run_for(500ms); + + CHECK_FALSE(CallbackFired); + CHECK(Manager.GetProcessCount() == 0); +} + +TEST_CASE("SubprocessManager.KillAndWaitForExit") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -t=60"; + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + bool CallbackFired = false; + + ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { CallbackFired = true; }); + + // Let it start + IoContext.run_for(200ms); + + Proc->Kill(); + + IoContext.run_for(2s); + + CHECK(CallbackFired); +} + +TEST_CASE("SubprocessManager.AdoptProcess") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -f=7"; + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + CreateProcResult Result = CreateProc(AppStub, CmdLine, Options); + + int ReceivedExitCode = -1; + + Manager.Adopt(ProcessHandle(Result), [&](ManagedProcess&, int ExitCode) { ReceivedExitCode = ExitCode; }); + + IoContext.run_for(5s); + + CHECK(ReceivedExitCode == 7); +} + +TEST_CASE("SubprocessManager.UserTag") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -f=0"; + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + std::string ReceivedTag; + + ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess& P, int) { ReceivedTag = std::string(P.GetTag()); }); + + Proc->SetTag("my-worker-1"); + CHECK(Proc->GetTag() == "my-worker-1"); + + IoContext.run_for(5s); + + CHECK(ReceivedTag == "my-worker-1"); +} + +TEST_CASE("ProcessGroup.SpawnAndMembership") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + ProcessGroup* Group = Manager.CreateGroup("test-group"); + REQUIRE(Group != nullptr); + CHECK(Group->GetName() == "test-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + int ExitCount = 0; + + std::string CmdLine1 = AppStub.string() + " -f=0"; + std::string CmdLine2 = AppStub.string() + " -f=1"; + + Group->Spawn(AppStub, CmdLine1, Options, [&](ManagedProcess&, int) { ExitCount++; }); + Group->Spawn(AppStub, CmdLine2, Options, [&](ManagedProcess&, int) { ExitCount++; }); + + CHECK(Group->GetProcessCount() == 2); + CHECK(Manager.GetProcessCount() == 2); + + IoContext.run_for(5s); + + CHECK(ExitCount == 2); +} + +TEST_CASE("ProcessGroup.KillAll") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + ProcessGroup* Group = Manager.CreateGroup("kill-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + int ExitCount = 0; + + std::string CmdLine = AppStub.string() + " -t=60"; + + Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { ExitCount++; }); + Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { ExitCount++; }); + + // Let them start + IoContext.run_for(200ms); + CHECK(Group->GetProcessCount() == 2); + + Group->KillAll(); + CHECK(Group->GetProcessCount() == 0); +} + +TEST_CASE("ProcessGroup.AggregateMetrics") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 100, .MetricsBatchSize = 16}); + + ProcessGroup* Group = Manager.CreateGroup("metrics-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + std::string CmdLine = AppStub.string() + " -t=3"; + + Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); + Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); + + // Wait for metrics sampling + IoContext.run_for(1s); + + AggregateProcessMetrics GroupAgg = Group->GetAggregateMetrics(); + CHECK(GroupAgg.ProcessCount == 2); + CHECK(GroupAgg.TotalWorkingSetSize > 0); + + // Manager-level metrics should include group processes + AggregateProcessMetrics ManagerAgg = Manager.GetAggregateMetrics(); + CHECK(ManagerAgg.ProcessCount == 2); + + Group->KillAll(); +} + +TEST_CASE("ProcessGroup.DestroyGroup") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + ProcessGroup* Group = Manager.CreateGroup("destroy-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + std::string CmdLine = AppStub.string() + " -t=60"; + + Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); + Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); + + IoContext.run_for(200ms); + CHECK(Manager.GetProcessCount() == 2); + + Manager.DestroyGroup("destroy-group"); + + CHECK(Manager.FindGroup("destroy-group") == nullptr); + CHECK(Manager.GetProcessCount() == 0); +} + +TEST_CASE("ProcessGroup.MixedGroupedAndUngrouped") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + ProcessGroup* Group = Manager.CreateGroup("mixed-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + int GroupExitCount = 0; + int UngroupedExitCode = -1; + + std::string CmdLine = AppStub.string() + " -f=0"; + + // Grouped processes + Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { GroupExitCount++; }); + Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { GroupExitCount++; }); + + // Ungrouped process + Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int ExitCode) { UngroupedExitCode = ExitCode; }); + + CHECK(Group->GetProcessCount() == 2); + CHECK(Manager.GetProcessCount() == 3); + + IoContext.run_for(5s); + + CHECK(GroupExitCount == 2); + CHECK(UngroupedExitCode == 0); +} + +TEST_CASE("ProcessGroup.FindGroup") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + CHECK(Manager.FindGroup("nonexistent") == nullptr); + + ProcessGroup* Group = Manager.CreateGroup("findable"); + CHECK(Manager.FindGroup("findable") == Group); + CHECK(Manager.FindGroup("findable")->GetName() == "findable"); +} + +TEST_CASE("SubprocessManager.StressTest" * doctest::skip()) +{ + // Seed for reproducibility — change to explore different orderings + // + // Note that while this is a stress test, it is still single-threaded + + constexpr uint32_t Seed = 42; + std::mt19937 Rng(Seed); + ZEN_INFO("StressTest: seed={}", Seed); + + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 200, .MetricsBatchSize = 32}); + + std::filesystem::path AppStub = GetAppStubPath(); + CreateProcOptions BaseOptions; + BaseOptions.Flags = CreateProcOptions::Flag_NoConsole; + + std::atomic<int> TotalExitCallbacks{0}; + std::atomic<int> KilledGroupProcessCount{0}; + + auto MakeExitCallback = [&](std::atomic<int>& Counter) { + return [&Counter, &TotalExitCallbacks](ManagedProcess&, int) { + Counter++; + TotalExitCallbacks++; + }; + }; + + // ======================================================================== + // Phase 1: Spawn multiple groups with varied workloads + // ======================================================================== + + ZEN_INFO("StressTest: Phase 1 — spawning initial groups"); + + constexpr int NumInitialGroups = 8; + std::vector<std::string> GroupNames; + std::vector<std::atomic<int>> GroupExitCounts(NumInitialGroups); + std::uniform_int_distribution<int> ProcCountDist(5, 100); + std::uniform_int_distribution<int> SleepDist(1, 5); + std::uniform_int_distribution<int> ExitCodeDist(0, 10); + std::uniform_int_distribution<int> WorkloadDist(0, 2); // 0=sleep, 1=exit-code, 2=echo+exit + int TotalPhase1Spawned = 0; + + for (int G = 0; G < NumInitialGroups; G++) + { + std::string GroupName = fmt::format("stress-group-{}", G); + ProcessGroup* Group = Manager.CreateGroup(GroupName); + GroupNames.push_back(GroupName); + + int ProcCount = ProcCountDist(Rng); + for (int P = 0; P < ProcCount; P++) + { + std::string CmdLine; + int Workload = WorkloadDist(Rng); + if (Workload == 0) + { + int Sleep = SleepDist(Rng); + CmdLine = fmt::format("{} -t={}", AppStub.string(), Sleep); + } + else if (Workload == 1) + { + int Code = ExitCodeDist(Rng); + CmdLine = fmt::format("{} -f={}", AppStub.string(), Code); + } + else + { + int Code = ExitCodeDist(Rng); + CmdLine = fmt::format("{} -echo=stress_g{}_p{} -f={}", AppStub.string(), G, P, Code); + } + + Group->Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(GroupExitCounts[G])); + TotalPhase1Spawned++; + } + + ZEN_INFO("StressTest: group '{}' spawned {} processes", GroupName, ProcCount); + } + + ZEN_INFO("StressTest: Phase 1 total spawned: {}", TotalPhase1Spawned); + + // Let processes start running and some short-lived ones exit + IoContext.run_for(1s); + + // ======================================================================== + // Phase 2: Randomly kill some groups, create replacements, add ungrouped + // ======================================================================== + + ZEN_INFO("StressTest: Phase 2 — random group kills and replacements"); + + constexpr int NumGroupsToKill = 3; + + // Pick random groups to kill + std::vector<int> GroupIndices(NumInitialGroups); + std::iota(GroupIndices.begin(), GroupIndices.end(), 0); + std::shuffle(GroupIndices.begin(), GroupIndices.end(), Rng); + + std::vector<int> KilledIndices(GroupIndices.begin(), GroupIndices.begin() + NumGroupsToKill); + + for (int Idx : KilledIndices) + { + ProcessGroup* Group = Manager.FindGroup(GroupNames[Idx]); + if (Group) + { + size_t Count = Group->GetProcessCount(); + ZEN_INFO("StressTest: killing group '{}' ({} processes)", GroupNames[Idx], Count); + Manager.DestroyGroup(GroupNames[Idx]); + } + } + + // Let kills propagate + IoContext.run_for(500ms); + + // Create replacement groups + std::atomic<int> ReplacementExitCount{0}; + std::uniform_int_distribution<int> ReplacementCountDist(3, 10); + + for (int R = 0; R < NumGroupsToKill; R++) + { + std::string Name = fmt::format("replacement-group-{}", R); + ProcessGroup* Group = Manager.CreateGroup(Name); + int Count = ReplacementCountDist(Rng); + + for (int P = 0; P < Count; P++) + { + int Sleep = SleepDist(Rng); + std::string CmdLine = fmt::format("{} -t={}", AppStub.string(), Sleep); + Group->Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(ReplacementExitCount)); + } + + ZEN_INFO("StressTest: replacement group '{}' spawned {} processes", Name, Count); + } + + // Also spawn some ungrouped processes + std::atomic<int> UngroupedExitCount{0}; + constexpr int NumUngrouped = 10; + + for (int U = 0; U < NumUngrouped; U++) + { + int ExitCode = ExitCodeDist(Rng); + std::string CmdLine = fmt::format("{} -f={}", AppStub.string(), ExitCode); + Manager.Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(UngroupedExitCount)); + } + + ZEN_INFO("StressTest: spawned {} ungrouped processes", NumUngrouped); + + // Let things run + IoContext.run_for(2s); + + // ======================================================================== + // Phase 3: Rapid spawn/exit churn + // ======================================================================== + + ZEN_INFO("StressTest: Phase 3 — rapid spawn/exit churn"); + + std::atomic<int> ChurnExitCount{0}; + int TotalChurnSpawned = 0; + constexpr int NumChurnBatches = 10; + std::uniform_int_distribution<int> ChurnBatchSizeDist(10, 20); + + for (int Batch = 0; Batch < NumChurnBatches; Batch++) + { + std::string Name = fmt::format("churn-batch-{}", Batch); + ProcessGroup* Group = Manager.CreateGroup(Name); + int Count = ChurnBatchSizeDist(Rng); + + for (int P = 0; P < Count; P++) + { + // Immediate exit processes to stress spawn/exit path + std::string CmdLine = fmt::format("{} -f=0", AppStub.string()); + Group->Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(ChurnExitCount)); + TotalChurnSpawned++; + } + + // Brief pump to allow some exits to be processed + IoContext.run_for(200ms); + + // Destroy the group — any still-running processes get killed + Manager.DestroyGroup(Name); + } + + ZEN_INFO("StressTest: Phase 3 spawned {} churn processes across {} batches", TotalChurnSpawned, NumChurnBatches); + + // ======================================================================== + // Phase 4: Drain and verify + // ======================================================================== + + ZEN_INFO("StressTest: Phase 4 — draining remaining processes"); + + // Check metrics were collected before we wind down + AggregateProcessMetrics Agg = Manager.GetAggregateMetrics(); + ZEN_INFO("StressTest: aggregate metrics: {} processes, {} bytes working set", Agg.ProcessCount, Agg.TotalWorkingSetSize); + + // Let remaining processes finish (replacement groups have up to 5s sleep) + IoContext.run_for(8s); + + // Kill anything still running + Manager.RemoveAll(); + + // Final pump to process any remaining callbacks + IoContext.run_for(1s); + + ZEN_INFO("StressTest: Results:"); + ZEN_INFO("StressTest: total exit callbacks fired: {}", TotalExitCallbacks.load()); + ZEN_INFO("StressTest: ungrouped exits: {}", UngroupedExitCount.load()); + ZEN_INFO("StressTest: replacement exits: {}", ReplacementExitCount.load()); + ZEN_INFO("StressTest: churn exits: {}", ChurnExitCount.load()); + + // Verify the manager is clean + CHECK(Manager.GetProcessCount() == 0); + + // Ungrouped processes should all have exited (they were immediate-exit) + CHECK(UngroupedExitCount.load() == NumUngrouped); + + // Verify we got a reasonable number of total callbacks + // (exact count is hard to predict due to killed groups, but should be > 0) + CHECK(TotalExitCallbacks.load() > 0); + + ZEN_INFO("StressTest: PASSED — seed={}", Seed); +} + +TEST_SUITE_END(); + +#endif diff --git a/src/zenutil/processmetricstracker.cpp b/src/zenutil/processmetricstracker.cpp deleted file mode 100644 index 555d0ae1a..000000000 --- a/src/zenutil/processmetricstracker.cpp +++ /dev/null @@ -1,392 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/processmetricstracker.h> - -#include <zencore/thread.h> -#include <zencore/timer.h> - -#include <algorithm> -#include <thread> -#include <unordered_map> -#include <vector> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <asio/io_context.hpp> -#include <asio/steady_timer.hpp> -ZEN_THIRD_PARTY_INCLUDES_END - -namespace zen { - -struct ProcessMetricsTracker::Impl -{ - static constexpr size_t kBatchSize = 8; - - struct Entry - { - ProcessHandle Handle; - ProcessMetrics LastMetrics; - float CpuUsagePercent = -1.0f; - - uint64_t PrevUserTimeMs = 0; - uint64_t PrevKernelTimeMs = 0; - uint64_t PrevSampleTicks = 0; - }; - - uint64_t m_SampleIntervalMs; - - mutable RwLock m_Lock; - std::unordered_map<int, Entry> m_Entries; - size_t m_NextSampleIndex = 0; - std::vector<int> m_KeyOrder; - - std::atomic<bool> m_Running{false}; - - // Thread-based sampling - std::thread m_Thread; - Event m_StopEvent; - - // Timer-based sampling - std::unique_ptr<asio::steady_timer> m_Timer; - - explicit Impl(uint64_t SampleIntervalMs) : m_SampleIntervalMs(SampleIntervalMs) {} - - Impl(asio::io_context& IoContext, uint64_t SampleIntervalMs) - : m_SampleIntervalMs(SampleIntervalMs) - , m_Timer(std::make_unique<asio::steady_timer>(IoContext)) - { - } - - ~Impl() { Stop(); } - - void Start() - { - if (m_Running.exchange(true)) - { - return; - } - - if (m_Timer) - { - EnqueueTimer(); - } - else - { - m_Thread = std::thread([this] { SamplingLoop(); }); - } - } - - void Stop() - { - if (!m_Running.exchange(false)) - { - return; - } - - if (m_Timer) - { - m_Timer->cancel(); - } - - if (m_Thread.joinable()) - { - m_StopEvent.Set(); - m_Thread.join(); - } - } - - void Add(const ProcessHandle& Handle) - { - int Pid = Handle.Pid(); - - RwLock::ExclusiveLockScope $(m_Lock); - - auto It = m_Entries.find(Pid); - if (It != m_Entries.end()) - { - m_Entries.erase(It); - } - else - { - m_KeyOrder.push_back(Pid); - } - - auto [NewIt, Inserted] = m_Entries.try_emplace(Pid); - NewIt->second.Handle.Initialize(Pid); - } - - void Remove(int Pid) - { - RwLock::ExclusiveLockScope $(m_Lock); - m_Entries.erase(Pid); - m_KeyOrder.erase(std::remove(m_KeyOrder.begin(), m_KeyOrder.end(), Pid), m_KeyOrder.end()); - - if (m_NextSampleIndex >= m_KeyOrder.size()) - { - m_NextSampleIndex = 0; - } - } - - void Clear() - { - RwLock::ExclusiveLockScope $(m_Lock); - m_Entries.clear(); - m_KeyOrder.clear(); - m_NextSampleIndex = 0; - } - - std::vector<TrackedProcessEntry> GetSnapshot() const - { - std::vector<TrackedProcessEntry> Result; - - RwLock::SharedLockScope $(m_Lock); - Result.reserve(m_Entries.size()); - - for (const auto& [Pid, E] : m_Entries) - { - TrackedProcessEntry Out; - Out.Pid = Pid; - Out.Metrics = E.LastMetrics; - Out.CpuUsagePercent = E.CpuUsagePercent; - Result.push_back(std::move(Out)); - } - - return Result; - } - - AggregateProcessMetrics GetAggregate() const - { - AggregateProcessMetrics Agg; - - RwLock::SharedLockScope $(m_Lock); - - for (const auto& [Pid, E] : m_Entries) - { - Agg.TotalWorkingSetSize += E.LastMetrics.WorkingSetSize; - Agg.TotalPeakWorkingSetSize += E.LastMetrics.PeakWorkingSetSize; - Agg.TotalUserTimeMs += E.LastMetrics.UserTimeMs; - Agg.TotalKernelTimeMs += E.LastMetrics.KernelTimeMs; - Agg.ProcessCount++; - } - - return Agg; - } - - void SampleBatch() - { - RwLock::SharedLockScope $(m_Lock); - - if (m_KeyOrder.empty()) - { - return; - } - - const uint64_t NowTicks = GetHifreqTimerValue(); - size_t Remaining = std::min(kBatchSize, m_KeyOrder.size()); - - while (Remaining > 0) - { - if (m_NextSampleIndex >= m_KeyOrder.size()) - { - m_NextSampleIndex = 0; - } - - int Pid = m_KeyOrder[m_NextSampleIndex]; - auto It = m_Entries.find(Pid); - - if (It == m_Entries.end()) - { - m_NextSampleIndex++; - Remaining--; - continue; - } - - Entry& E = It->second; - - ProcessMetrics Metrics; - GetProcessMetrics(E.Handle, Metrics); - - if (E.PrevSampleTicks > 0) - { - uint64_t ElapsedMs = Stopwatch::GetElapsedTimeMs(NowTicks - E.PrevSampleTicks); - uint64_t DeltaCpuTimeMs = (Metrics.UserTimeMs + Metrics.KernelTimeMs) - (E.PrevUserTimeMs + E.PrevKernelTimeMs); - if (ElapsedMs > 0) - { - E.CpuUsagePercent = static_cast<float>(static_cast<double>(DeltaCpuTimeMs) / ElapsedMs * 100.0); - } - } - - E.PrevUserTimeMs = Metrics.UserTimeMs; - E.PrevKernelTimeMs = Metrics.KernelTimeMs; - E.PrevSampleTicks = NowTicks; - E.LastMetrics = Metrics; - - m_NextSampleIndex++; - Remaining--; - } - } - - void SamplingLoop() - { - while (!m_StopEvent.Wait(static_cast<int>(m_SampleIntervalMs))) - { - if (!m_Running.load()) - { - return; - } - - SampleBatch(); - } - } - - void EnqueueTimer() - { - if (!m_Timer || !m_Running.load()) - { - return; - } - - m_Timer->expires_after(std::chrono::milliseconds(m_SampleIntervalMs)); - m_Timer->async_wait([this](const asio::error_code& Ec) { - if (Ec || !m_Running.load()) - { - return; - } - - SampleBatch(); - EnqueueTimer(); - }); - } -}; - -////////////////////////////////////////////////////////////////////////// - -ProcessMetricsTracker::ProcessMetricsTracker(uint64_t SampleIntervalMs) : m_Impl(std::make_unique<Impl>(SampleIntervalMs)) -{ -} - -ProcessMetricsTracker::ProcessMetricsTracker(asio::io_context& IoContext, uint64_t SampleIntervalMs) -: m_Impl(std::make_unique<Impl>(IoContext, SampleIntervalMs)) -{ -} - -ProcessMetricsTracker::~ProcessMetricsTracker() = default; - -void -ProcessMetricsTracker::Start() -{ - m_Impl->Start(); -} - -void -ProcessMetricsTracker::Stop() -{ - m_Impl->Stop(); -} - -void -ProcessMetricsTracker::Add(const ProcessHandle& Handle) -{ - m_Impl->Add(Handle); -} - -void -ProcessMetricsTracker::Remove(int Pid) -{ - m_Impl->Remove(Pid); -} - -void -ProcessMetricsTracker::Clear() -{ - m_Impl->Clear(); -} - -std::vector<TrackedProcessEntry> -ProcessMetricsTracker::GetSnapshot() const -{ - return m_Impl->GetSnapshot(); -} - -AggregateProcessMetrics -ProcessMetricsTracker::GetAggregate() const -{ - return m_Impl->GetAggregate(); -} - -} // namespace zen - -#if ZEN_WITH_TESTS - -# include <zencore/testing.h> - -using namespace zen; - -void -zen::processmetricstracker_forcelink() -{ -} - -TEST_SUITE_BEGIN("util.processmetricstracker"); - -TEST_CASE("ProcessMetricsTracker.SelfProcess") -{ - ProcessMetricsTracker Tracker(100); - Tracker.Start(); - - ProcessHandle Handle; - Handle.Initialize(zen::GetCurrentProcessId()); - REQUIRE(Handle.IsValid()); - - int Pid = Handle.Pid(); - Tracker.Add(Handle); - - // Wait for at least two samples so CPU% is computed - std::this_thread::sleep_for(std::chrono::milliseconds(350)); - - auto Snapshot = Tracker.GetSnapshot(); - REQUIRE(Snapshot.size() == 1); - CHECK(Snapshot[0].Pid == Pid); - CHECK(Snapshot[0].Metrics.WorkingSetSize > 0); - CHECK(Snapshot[0].Metrics.MemoryBytes > 0); - CHECK((Snapshot[0].Metrics.UserTimeMs + Snapshot[0].Metrics.KernelTimeMs) > 0); - CHECK(Snapshot[0].CpuUsagePercent >= 0.0f); - - auto Agg = Tracker.GetAggregate(); - CHECK(Agg.ProcessCount == 1); - CHECK(Agg.TotalWorkingSetSize > 0); - - Tracker.Remove(Pid); - - Snapshot = Tracker.GetSnapshot(); - CHECK(Snapshot.empty()); - - Tracker.Stop(); -} - -TEST_CASE("ProcessMetricsTracker.AsioTimer") -{ - asio::io_context IoContext; - - ProcessMetricsTracker Tracker(IoContext, 100); - Tracker.Start(); - - ProcessHandle Handle; - Handle.Initialize(zen::GetCurrentProcessId()); - REQUIRE(Handle.IsValid()); - - Tracker.Add(Handle); - - // Run the io_context for enough time to get two samples - IoContext.run_for(std::chrono::milliseconds(350)); - - auto Snapshot = Tracker.GetSnapshot(); - REQUIRE(Snapshot.size() == 1); - CHECK(Snapshot[0].Metrics.WorkingSetSize > 0); - CHECK(Snapshot[0].CpuUsagePercent >= 0.0f); - - Tracker.Stop(); -} - -TEST_SUITE_END(); - -#endif diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index 032f21c9b..2ca380c75 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -10,7 +10,7 @@ # include <zenutil/config/commandlineoptions.h> # include <zenutil/rpcrecording.h> # include <zenutil/splitconsole/logstreamlistener.h> -# include <zenutil/processmetricstracker.h> +# include <zenutil/process/subprocessmanager.h> # include <zenutil/wildcard.h> namespace zen { @@ -22,7 +22,7 @@ zenutil_forcelinktests() commandlineoptions_forcelink(); imdscredentials_forcelink(); logstreamlistener_forcelink(); - processmetricstracker_forcelink(); + subprocessmanager_forcelink(); s3client_forcelink(); sigv4_forcelink(); wildcard_forcelink(); |