diff options
| author | Stefan Boberg <[email protected]> | 2026-03-24 15:47:23 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-24 15:47:23 +0100 |
| commit | 21c2abb1bde697c31bee562465cb986a0429a299 (patch) | |
| tree | 3734d235e79a8fbed307ae5c248936d356553b61 /src/zenutil/process/subprocessmanager.cpp | |
| parent | v5.7.25 hotpatch (#874) (diff) | |
| download | zen-21c2abb1bde697c31bee562465cb986a0429a299.tar.xz zen-21c2abb1bde697c31bee562465cb986a0429a299.zip | |
Subprocess Manager (#889)
Adds a `SubprocessManager` for managing child processes with ASIO-integrated async exit detection, stdout/stderr pipe capture, and periodic metrics sampling. Also introduces `ProcessGroup` for OS-backed process grouping (Windows JobObjects / POSIX process groups).
### SubprocessManager
- Async process exit detection using platform-native mechanisms (Windows `object_handle`, Linux `pidfd_open`, macOS `kqueue EVFILT_PROC`) — no polling
- Stdout/stderr capture via async pipe readers with per-process or default callbacks
- Periodic round-robin metrics sampling (CPU, memory) across managed processes
- Spawn, adopt, remove, kill, and enumerate managed processes
### ProcessGroup
- OS-level process grouping: Windows JobObject (kill-on-close guarantee), POSIX `setpgid` (bulk signal delivery)
- Atomic group kill via `TerminateJobObject` (Windows) or `kill(-pgid, sig)` (POSIX)
- Per-group aggregate metrics and enumeration
### ProcessHandle improvements
- Added explicit constructors from `int` (pid) and `void*` (native handle)
- Added move constructor and move assignment operator
### ProcessMetricsTracker
- Cross-platform process metrics (CPU time, working set, page faults) via `QueryProcessMetrics()`
- ASIO timer-driven periodic sampling with configurable interval and batch size
- Aggregate metrics across tracked processes
### Other changes
- Fixed `zentest-appstub` writing a spurious `Versions` file to cwd on every invocation
Diffstat (limited to 'src/zenutil/process/subprocessmanager.cpp')
| -rw-r--r-- | src/zenutil/process/subprocessmanager.cpp | 1811 |
1 files changed, 1811 insertions, 0 deletions
diff --git a/src/zenutil/process/subprocessmanager.cpp b/src/zenutil/process/subprocessmanager.cpp new file mode 100644 index 000000000..3a91b0a61 --- /dev/null +++ b/src/zenutil/process/subprocessmanager.cpp @@ -0,0 +1,1811 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/process/subprocessmanager.h> + +#include "asyncpipereader.h" +#include "exitwatcher.h" + +#include <zencore/logging.h> +#include <zencore/thread.h> +#include <zencore/timer.h> +#include <zencore/trace.h> + +#include <algorithm> +#include <atomic> +#include <numeric> +#include <random> +#include <string> +#include <unordered_map> +#include <vector> + +ZEN_THIRD_PARTY_INCLUDES_START +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> +#else +# include <csignal> +#endif +#include <asio/io_context.hpp> +#include <asio/steady_timer.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +// ============================================================================ +// ManagedProcess::Impl +// ============================================================================ + +struct ManagedProcess::Impl +{ + asio::io_context& m_IoContext; + ProcessHandle m_Handle; + ProcessExitWatcher m_ExitWatcher; + ProcessExitCallback m_ExitCallback; + std::atomic<bool> m_Exited{false}; + + // Stdout capture + std::unique_ptr<AsyncPipeReader> m_StdoutReader; + ProcessDataCallback m_StdoutCallback; + mutable RwLock m_StdoutLock; + std::string m_CapturedStdout; + + // Stderr capture + std::unique_ptr<AsyncPipeReader> m_StderrReader; + ProcessDataCallback m_StderrCallback; + mutable RwLock m_StderrLock; + std::string m_CapturedStderr; + + // Metrics + ProcessMetrics m_LastMetrics; + std::atomic<float> m_CpuUsagePercent{-1.0f}; + uint64_t m_PrevUserTimeMs = 0; + uint64_t m_PrevKernelTimeMs = 0; + uint64_t m_PrevSampleTicks = 0; + + // User tag + std::string m_Tag; + + explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext), m_ExitWatcher(IoContext) {} + + void OnStdoutData(ManagedProcess& Self, std::string_view Data) + { + if (m_StdoutCallback) + { + m_StdoutCallback(Self, Data); + } + else + { + RwLock::ExclusiveLockScope $(m_StdoutLock); + m_CapturedStdout.append(Data); + } + } + + void OnStderrData(ManagedProcess& Self, std::string_view Data) + { + if (m_StderrCallback) + { + m_StderrCallback(Self, Data); + } + else + { + RwLock::ExclusiveLockScope $(m_StderrLock); + m_CapturedStderr.append(Data); + } + } + + void SampleMetrics() + { + if (m_Exited.load()) + { + return; + } + + ProcessMetrics Metrics; + GetProcessMetrics(m_Handle, Metrics); + + uint64_t NowTicks = GetHifreqTimerValue(); + + if (m_PrevSampleTicks > 0) + { + uint64_t ElapsedMs = Stopwatch::GetElapsedTimeMs(NowTicks - m_PrevSampleTicks); + uint64_t DeltaCpuTimeMs = (Metrics.UserTimeMs + Metrics.KernelTimeMs) - (m_PrevUserTimeMs + m_PrevKernelTimeMs); + if (ElapsedMs > 0) + { + m_CpuUsagePercent.store(static_cast<float>(static_cast<double>(DeltaCpuTimeMs) / ElapsedMs * 100.0)); + } + } + + m_PrevUserTimeMs = Metrics.UserTimeMs; + m_PrevKernelTimeMs = Metrics.KernelTimeMs; + m_PrevSampleTicks = NowTicks; + m_LastMetrics = Metrics; + } + + [[nodiscard]] int Pid() const { return m_Handle.Pid(); } + + [[nodiscard]] bool IsRunning() const { return !m_Exited.load() && m_Handle.IsValid() && m_Handle.IsRunning(); } + + [[nodiscard]] std::string GetCapturedStdout() const + { + RwLock::SharedLockScope $(m_StdoutLock); + return m_CapturedStdout; + } + + [[nodiscard]] std::string GetCapturedStderr() const + { + RwLock::SharedLockScope $(m_StderrLock); + return m_CapturedStderr; + } + + void CancelAll() + { + m_ExitWatcher.Cancel(); + if (m_StdoutReader) + { + m_StdoutReader->Stop(); + } + if (m_StderrReader) + { + m_StderrReader->Stop(); + } + } +}; + +// ============================================================================ +// ManagedProcess +// ============================================================================ + +ManagedProcess::ManagedProcess(std::unique_ptr<Impl> InImpl) : m_Impl(std::move(InImpl)) +{ +} + +ManagedProcess::~ManagedProcess() +{ + if (m_Impl) + { + m_Impl->CancelAll(); + } +} + +int +ManagedProcess::Pid() const +{ + return m_Impl->Pid(); +} + +bool +ManagedProcess::IsRunning() const +{ + return m_Impl->IsRunning(); +} + +const ProcessHandle& +ManagedProcess::GetHandle() const +{ + return m_Impl->m_Handle; +} + +ProcessMetrics +ManagedProcess::GetLatestMetrics() const +{ + return m_Impl->m_LastMetrics; +} + +float +ManagedProcess::GetCpuUsagePercent() const +{ + return m_Impl->m_CpuUsagePercent.load(); +} + +void +ManagedProcess::SetStdoutCallback(ProcessDataCallback Callback) +{ + m_Impl->m_StdoutCallback = std::move(Callback); +} + +void +ManagedProcess::SetStderrCallback(ProcessDataCallback Callback) +{ + m_Impl->m_StderrCallback = std::move(Callback); +} + +std::string +ManagedProcess::GetCapturedStdout() const +{ + return m_Impl->GetCapturedStdout(); +} + +std::string +ManagedProcess::GetCapturedStderr() const +{ + return m_Impl->GetCapturedStderr(); +} + +bool +ManagedProcess::Kill() +{ + return m_Impl->m_Handle.Kill(); +} + +bool +ManagedProcess::Terminate(int ExitCode) +{ + return m_Impl->m_Handle.Terminate(ExitCode); +} + +void +ManagedProcess::SetTag(std::string Tag) +{ + m_Impl->m_Tag = std::move(Tag); +} + +std::string_view +ManagedProcess::GetTag() const +{ + return m_Impl->m_Tag; +} + +// ============================================================================ +// SubprocessManager::Impl +// ============================================================================ + +struct SubprocessManager::Impl +{ + asio::io_context& m_IoContext; + SubprocessManagerConfig m_Config; + + // Ungrouped processes + mutable RwLock m_Lock; + std::unordered_map<int, std::unique_ptr<ManagedProcess>> m_Processes; + + // Groups + mutable RwLock m_GroupsLock; + std::unordered_map<std::string, std::unique_ptr<ProcessGroup>> m_Groups; + + // Cross-group metrics index: all pids (grouped + ungrouped) for round-robin sampling + mutable RwLock m_MetricsLock; + std::unordered_map<int, ManagedProcess*> m_AllProcesses; // non-owning + std::vector<int> m_KeyOrder; + size_t m_NextSampleIndex = 0; + + ProcessDataCallback m_DefaultStdoutCallback; + ProcessDataCallback m_DefaultStderrCallback; + + std::unique_ptr<asio::steady_timer> m_MetricsTimer; + std::atomic<bool> m_Running{true}; + + explicit Impl(asio::io_context& IoContext, SubprocessManagerConfig Config); + ~Impl(); + + ManagedProcess* AddProcess(std::unique_ptr<ManagedProcess> Process); + void RegisterForMetrics(int Pid, ManagedProcess* Ptr); + void UnregisterFromMetrics(int Pid); + ManagedProcess* FindProcess(int Pid) const; + + void SetupExitWatcher(ManagedProcess* Proc, ProcessExitCallback OnExit); + void SetupStdoutReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe); + void SetupStderrReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe); + + ManagedProcess* Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit); + ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit); + void Remove(int Pid); + void RemoveAll(); + + void SetDefaultStdoutCallback(ProcessDataCallback Callback) { m_DefaultStdoutCallback = std::move(Callback); } + void SetDefaultStderrCallback(ProcessDataCallback Callback) { m_DefaultStderrCallback = std::move(Callback); } + + void EnqueueMetricsTimer(); + void SampleBatch(); + std::vector<TrackedProcessEntry> GetMetricsSnapshot() const; + AggregateProcessMetrics GetAggregateMetrics() const; + [[nodiscard]] size_t GetProcessCount() const; + void Enumerate(std::function<void(const ManagedProcess&)> Callback) const; + + ProcessGroup* CreateGroup(std::string Name); + void DestroyGroup(std::string_view Name); + ProcessGroup* FindGroup(std::string_view Name) const; + void EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const; +}; + +// ============================================================================ +// SubprocessManager::Impl method definitions +// ============================================================================ + +SubprocessManager::Impl::Impl(asio::io_context& IoContext, SubprocessManagerConfig Config) : m_IoContext(IoContext), m_Config(Config) +{ + if (m_Config.MetricsSampleIntervalMs > 0) + { + m_MetricsTimer = std::make_unique<asio::steady_timer>(IoContext); + EnqueueMetricsTimer(); + } +} + +SubprocessManager::Impl::~Impl() +{ + m_Running = false; + if (m_MetricsTimer) + { + m_MetricsTimer->cancel(); + } + + // Destroy groups first (they reference m_Manager back to us) + { + RwLock::ExclusiveLockScope $(m_GroupsLock); + m_Groups.clear(); + } + + RemoveAll(); +} + +ManagedProcess* +SubprocessManager::Impl::AddProcess(std::unique_ptr<ManagedProcess> Process) +{ + int Pid = Process->Pid(); + ManagedProcess* Ptr = Process.get(); + + { + RwLock::ExclusiveLockScope $(m_Lock); + m_Processes[Pid] = std::move(Process); + } + + RegisterForMetrics(Pid, Ptr); + return Ptr; +} + +void +SubprocessManager::Impl::RegisterForMetrics(int Pid, ManagedProcess* Ptr) +{ + RwLock::ExclusiveLockScope $(m_MetricsLock); + m_AllProcesses[Pid] = Ptr; + m_KeyOrder.push_back(Pid); +} + +void +SubprocessManager::Impl::UnregisterFromMetrics(int Pid) +{ + RwLock::ExclusiveLockScope $(m_MetricsLock); + m_AllProcesses.erase(Pid); + m_KeyOrder.erase(std::remove(m_KeyOrder.begin(), m_KeyOrder.end(), Pid), m_KeyOrder.end()); + if (m_NextSampleIndex >= m_KeyOrder.size()) + { + m_NextSampleIndex = 0; + } +} + +ManagedProcess* +SubprocessManager::Impl::FindProcess(int Pid) const +{ + RwLock::SharedLockScope $(m_MetricsLock); + auto It = m_AllProcesses.find(Pid); + if (It != m_AllProcesses.end()) + { + return It->second; + } + return nullptr; +} + +void +SubprocessManager::Impl::SetupExitWatcher(ManagedProcess* Proc, ProcessExitCallback OnExit) +{ + int Pid = Proc->Pid(); + + Proc->m_Impl->m_ExitWatcher.Watch(Proc->m_Impl->m_Handle, [this, Pid, Callback = std::move(OnExit)](int ExitCode) { + ManagedProcess* Found = FindProcess(Pid); + + if (Found) + { + Found->m_Impl->m_Exited.store(true); + Callback(*Found, ExitCode); + } + }); +} + +void +SubprocessManager::Impl::SetupStdoutReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe) +{ + int Pid = Proc->Pid(); + Proc->m_Impl->m_StdoutReader = std::make_unique<AsyncPipeReader>(m_IoContext); + Proc->m_Impl->m_StdoutReader->Start( + std::move(Pipe), + [this, Pid](std::string_view Data) { + ManagedProcess* Found = FindProcess(Pid); + if (Found) + { + if (Found->m_Impl->m_StdoutCallback) + { + Found->m_Impl->m_StdoutCallback(*Found, Data); + } + else if (m_DefaultStdoutCallback) + { + m_DefaultStdoutCallback(*Found, Data); + } + else + { + Found->m_Impl->OnStdoutData(*Found, Data); + } + } + }, + [] {}); +} + +void +SubprocessManager::Impl::SetupStderrReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe) +{ + int Pid = Proc->Pid(); + Proc->m_Impl->m_StderrReader = std::make_unique<AsyncPipeReader>(m_IoContext); + Proc->m_Impl->m_StderrReader->Start( + std::move(Pipe), + [this, Pid](std::string_view Data) { + ManagedProcess* Found = FindProcess(Pid); + if (Found) + { + if (Found->m_Impl->m_StderrCallback) + { + Found->m_Impl->m_StderrCallback(*Found, Data); + } + else if (m_DefaultStderrCallback) + { + m_DefaultStderrCallback(*Found, Data); + } + else + { + Found->m_Impl->OnStderrData(*Found, Data); + } + } + }, + [] {}); +} + +ManagedProcess* +SubprocessManager::Impl::Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit) +{ + bool HasStdout = Options.StdoutPipe != nullptr; + bool HasStderr = Options.StderrPipe != nullptr; + + CreateProcResult Result = CreateProc(Executable, CommandLine, Options); + + auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_IoContext); +#if ZEN_PLATFORM_WINDOWS + ImplPtr->m_Handle.Initialize(Result); +#else + ImplPtr->m_Handle.Initialize(static_cast<int>(Result)); +#endif + + auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr))); + + ManagedProcess* Ptr = AddProcess(std::move(Proc)); + SetupExitWatcher(Ptr, std::move(OnExit)); + + if (HasStdout) + { + SetupStdoutReader(Ptr, std::move(*Options.StdoutPipe)); + } + if (HasStderr) + { + SetupStderrReader(Ptr, std::move(*Options.StderrPipe)); + } + + return Ptr; +} + +ManagedProcess* +SubprocessManager::Impl::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit) +{ + int Pid = Handle.Pid(); + + auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_IoContext); + ImplPtr->m_Handle.Initialize(Pid); + + // Reset the original handle so caller doesn't double-close + Handle.Reset(); + + auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr))); + + ManagedProcess* Ptr = AddProcess(std::move(Proc)); + SetupExitWatcher(Ptr, std::move(OnExit)); + + return Ptr; +} + +void +SubprocessManager::Impl::Remove(int Pid) +{ + UnregisterFromMetrics(Pid); + + RwLock::ExclusiveLockScope $(m_Lock); + auto It = m_Processes.find(Pid); + if (It != m_Processes.end()) + { + It->second->m_Impl->CancelAll(); + m_Processes.erase(It); + } +} + +void +SubprocessManager::Impl::RemoveAll() +{ + { + RwLock::ExclusiveLockScope $(m_Lock); + for (auto& [Pid, Proc] : m_Processes) + { + Proc->m_Impl->CancelAll(); + } + m_Processes.clear(); + } + + { + RwLock::ExclusiveLockScope $(m_MetricsLock); + m_AllProcesses.clear(); + m_KeyOrder.clear(); + m_NextSampleIndex = 0; + } +} + +void +SubprocessManager::Impl::EnqueueMetricsTimer() +{ + if (!m_MetricsTimer || !m_Running.load()) + { + return; + } + + m_MetricsTimer->expires_after(std::chrono::milliseconds(m_Config.MetricsSampleIntervalMs)); + m_MetricsTimer->async_wait([this](const asio::error_code& Ec) { + if (Ec || !m_Running.load()) + { + return; + } + + SampleBatch(); + EnqueueMetricsTimer(); + }); +} + +void +SubprocessManager::Impl::SampleBatch() +{ + RwLock::SharedLockScope $(m_MetricsLock); + + if (m_KeyOrder.empty()) + { + return; + } + + size_t Remaining = std::min(static_cast<size_t>(m_Config.MetricsBatchSize), m_KeyOrder.size()); + + while (Remaining > 0) + { + if (m_NextSampleIndex >= m_KeyOrder.size()) + { + m_NextSampleIndex = 0; + } + + int Pid = m_KeyOrder[m_NextSampleIndex]; + auto It = m_AllProcesses.find(Pid); + + if (It != m_AllProcesses.end()) + { + It->second->m_Impl->SampleMetrics(); + } + + m_NextSampleIndex++; + Remaining--; + } +} + +std::vector<TrackedProcessEntry> +SubprocessManager::Impl::GetMetricsSnapshot() const +{ + std::vector<TrackedProcessEntry> Result; + + RwLock::SharedLockScope $(m_MetricsLock); + Result.reserve(m_AllProcesses.size()); + + for (const auto& [Pid, Proc] : m_AllProcesses) + { + TrackedProcessEntry Entry; + Entry.Pid = Pid; + Entry.Metrics = Proc->m_Impl->m_LastMetrics; + Entry.CpuUsagePercent = Proc->m_Impl->m_CpuUsagePercent.load(); + Result.push_back(std::move(Entry)); + } + + return Result; +} + +AggregateProcessMetrics +SubprocessManager::Impl::GetAggregateMetrics() const +{ + AggregateProcessMetrics Agg; + + RwLock::SharedLockScope $(m_MetricsLock); + + for (const auto& [Pid, Proc] : m_AllProcesses) + { + const ProcessMetrics& M = Proc->m_Impl->m_LastMetrics; + Agg.TotalWorkingSetSize += M.WorkingSetSize; + Agg.TotalPeakWorkingSetSize += M.PeakWorkingSetSize; + Agg.TotalUserTimeMs += M.UserTimeMs; + Agg.TotalKernelTimeMs += M.KernelTimeMs; + Agg.ProcessCount++; + } + + return Agg; +} + +size_t +SubprocessManager::Impl::GetProcessCount() const +{ + RwLock::SharedLockScope $(m_MetricsLock); + return m_AllProcesses.size(); +} + +void +SubprocessManager::Impl::Enumerate(std::function<void(const ManagedProcess&)> Callback) const +{ + RwLock::SharedLockScope $(m_MetricsLock); + for (const auto& [Pid, Proc] : m_AllProcesses) + { + Callback(*Proc); + } +} + +ProcessGroup* +SubprocessManager::Impl::CreateGroup(std::string Name) +{ + auto GroupImpl = std::make_unique<ProcessGroup::Impl>(std::move(Name), *this, m_IoContext); + ProcessGroup* Ptr = nullptr; + + auto Group = std::unique_ptr<ProcessGroup>(new ProcessGroup(std::move(GroupImpl))); + Ptr = Group.get(); + + RwLock::ExclusiveLockScope $(m_GroupsLock); + m_Groups[std::string(Ptr->GetName())] = std::move(Group); + + return Ptr; +} + +void +SubprocessManager::Impl::DestroyGroup(std::string_view Name) +{ + RwLock::ExclusiveLockScope $(m_GroupsLock); + auto It = m_Groups.find(std::string(Name)); + if (It != m_Groups.end()) + { + It->second->KillAll(); + m_Groups.erase(It); + } +} + +ProcessGroup* +SubprocessManager::Impl::FindGroup(std::string_view Name) const +{ + RwLock::SharedLockScope $(m_GroupsLock); + auto It = m_Groups.find(std::string(Name)); + if (It != m_Groups.end()) + { + return It->second.get(); + } + return nullptr; +} + +void +SubprocessManager::Impl::EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const +{ + RwLock::SharedLockScope $(m_GroupsLock); + for (const auto& [Name, Group] : m_Groups) + { + Callback(*Group); + } +} + +// ============================================================================ +// SubprocessManager +// ============================================================================ + +SubprocessManager::SubprocessManager(asio::io_context& IoContext, SubprocessManagerConfig Config) +: m_Impl(std::make_unique<Impl>(IoContext, Config)) +{ +} + +SubprocessManager::~SubprocessManager() = default; + +ManagedProcess* +SubprocessManager::Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit) +{ + ZEN_TRACE_CPU("SubprocessManager::Spawn"); + return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit)); +} + +ManagedProcess* +SubprocessManager::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit) +{ + ZEN_TRACE_CPU("SubprocessManager::Adopt"); + return m_Impl->Adopt(std::move(Handle), std::move(OnExit)); +} + +void +SubprocessManager::Remove(int Pid) +{ + ZEN_TRACE_CPU("SubprocessManager::Remove"); + m_Impl->Remove(Pid); +} + +void +SubprocessManager::RemoveAll() +{ + ZEN_TRACE_CPU("SubprocessManager::RemoveAll"); + m_Impl->RemoveAll(); +} + +void +SubprocessManager::SetDefaultStdoutCallback(ProcessDataCallback Callback) +{ + m_Impl->SetDefaultStdoutCallback(std::move(Callback)); +} + +void +SubprocessManager::SetDefaultStderrCallback(ProcessDataCallback Callback) +{ + m_Impl->SetDefaultStderrCallback(std::move(Callback)); +} + +std::vector<TrackedProcessEntry> +SubprocessManager::GetMetricsSnapshot() const +{ + return m_Impl->GetMetricsSnapshot(); +} + +AggregateProcessMetrics +SubprocessManager::GetAggregateMetrics() const +{ + return m_Impl->GetAggregateMetrics(); +} + +size_t +SubprocessManager::GetProcessCount() const +{ + return m_Impl->GetProcessCount(); +} + +void +SubprocessManager::Enumerate(std::function<void(const ManagedProcess&)> Callback) const +{ + m_Impl->Enumerate(std::move(Callback)); +} + +ProcessGroup* +SubprocessManager::CreateGroup(std::string Name) +{ + ZEN_TRACE_CPU("SubprocessManager::CreateGroup"); + return m_Impl->CreateGroup(std::move(Name)); +} + +void +SubprocessManager::DestroyGroup(std::string_view Name) +{ + ZEN_TRACE_CPU("SubprocessManager::DestroyGroup"); + m_Impl->DestroyGroup(Name); +} + +ProcessGroup* +SubprocessManager::FindGroup(std::string_view Name) const +{ + return m_Impl->FindGroup(Name); +} + +void +SubprocessManager::EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const +{ + m_Impl->EnumerateGroups(std::move(Callback)); +} + +// ============================================================================ +// ProcessGroup::Impl +// ============================================================================ + +struct ProcessGroup::Impl +{ + std::string m_Name; + SubprocessManager::Impl& m_Manager; + asio::io_context& m_IoContext; + + mutable RwLock m_Lock; + std::unordered_map<int, std::unique_ptr<ManagedProcess>> m_Processes; + +#if ZEN_PLATFORM_WINDOWS + JobObject m_JobObject; +#else + int m_Pgid = 0; +#endif + + Impl(std::string Name, SubprocessManager::Impl& Manager, asio::io_context& IoContext); + ~Impl(); + + ManagedProcess* AddProcess(std::unique_ptr<ManagedProcess> Process); + + ManagedProcess* Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit); + ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit); + void Remove(int Pid); + void KillAll(); + + AggregateProcessMetrics GetAggregateMetrics() const; + std::vector<TrackedProcessEntry> GetMetricsSnapshot() const; + [[nodiscard]] size_t GetProcessCount() const; + void Enumerate(std::function<void(const ManagedProcess&)> Callback) const; +}; + +// ============================================================================ +// ProcessGroup::Impl method definitions +// ============================================================================ + +ProcessGroup::Impl::Impl(std::string Name, SubprocessManager::Impl& Manager, asio::io_context& IoContext) +: m_Name(std::move(Name)) +, m_Manager(Manager) +, m_IoContext(IoContext) +{ +#if ZEN_PLATFORM_WINDOWS + m_JobObject.Initialize(); +#endif +} + +ProcessGroup::Impl::~Impl() +{ + KillAll(); +} + +ManagedProcess* +ProcessGroup::Impl::AddProcess(std::unique_ptr<ManagedProcess> Process) +{ + int Pid = Process->Pid(); + ManagedProcess* Ptr = Process.get(); + + { + RwLock::ExclusiveLockScope $(m_Lock); + m_Processes[Pid] = std::move(Process); + } + + m_Manager.RegisterForMetrics(Pid, Ptr); + return Ptr; +} + +ManagedProcess* +ProcessGroup::Impl::Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit) +{ + bool HasStdout = Options.StdoutPipe != nullptr; + bool HasStderr = Options.StderrPipe != nullptr; + +#if ZEN_PLATFORM_WINDOWS + if (m_JobObject.IsValid()) + { + Options.AssignToJob = &m_JobObject; + } +#else + if (m_Pgid > 0) + { + Options.ProcessGroupId = m_Pgid; + } +#endif + + CreateProcResult Result = CreateProc(Executable, CommandLine, Options); + + auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_IoContext); +#if ZEN_PLATFORM_WINDOWS + ImplPtr->m_Handle.Initialize(Result); +#else + int Pid = static_cast<int>(Result); + ImplPtr->m_Handle.Initialize(Pid); + + // First process becomes the group leader + if (m_Pgid == 0) + { + m_Pgid = Pid; + } +#endif + + auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr))); + + ManagedProcess* Ptr = AddProcess(std::move(Proc)); + m_Manager.SetupExitWatcher(Ptr, std::move(OnExit)); + + if (HasStdout) + { + m_Manager.SetupStdoutReader(Ptr, std::move(*Options.StdoutPipe)); + } + if (HasStderr) + { + m_Manager.SetupStderrReader(Ptr, std::move(*Options.StderrPipe)); + } + + return Ptr; +} + +ManagedProcess* +ProcessGroup::Impl::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit) +{ + int Pid = Handle.Pid(); + + auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_IoContext); + ImplPtr->m_Handle.Initialize(Pid); + Handle.Reset(); + +#if ZEN_PLATFORM_WINDOWS + if (m_JobObject.IsValid()) + { + m_JobObject.AssignProcess(ImplPtr->m_Handle.Handle()); + } +#endif + + auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr))); + + ManagedProcess* Ptr = AddProcess(std::move(Proc)); + m_Manager.SetupExitWatcher(Ptr, std::move(OnExit)); + + return Ptr; +} + +void +ProcessGroup::Impl::Remove(int Pid) +{ + m_Manager.UnregisterFromMetrics(Pid); + + RwLock::ExclusiveLockScope $(m_Lock); + auto It = m_Processes.find(Pid); + if (It != m_Processes.end()) + { + It->second->m_Impl->CancelAll(); + m_Processes.erase(It); + } +} + +void +ProcessGroup::Impl::KillAll() +{ +#if ZEN_PLATFORM_WINDOWS + if (m_JobObject.IsValid()) + { + TerminateJobObject(static_cast<HANDLE>(m_JobObject.Handle()), 1); + } +#else + if (m_Pgid > 0) + { + kill(-m_Pgid, SIGTERM); + } +#endif + // Also kill individually as fallback and clean up + RwLock::ExclusiveLockScope $(m_Lock); + for (auto& [Pid, Proc] : m_Processes) + { + if (Proc->IsRunning()) + { + Proc->Kill(); + } + m_Manager.UnregisterFromMetrics(Pid); + Proc->m_Impl->CancelAll(); + } + m_Processes.clear(); +} + +AggregateProcessMetrics +ProcessGroup::Impl::GetAggregateMetrics() const +{ + AggregateProcessMetrics Agg; + + RwLock::SharedLockScope $(m_Lock); + + for (const auto& [Pid, Proc] : m_Processes) + { + const ProcessMetrics& M = Proc->m_Impl->m_LastMetrics; + Agg.TotalWorkingSetSize += M.WorkingSetSize; + Agg.TotalPeakWorkingSetSize += M.PeakWorkingSetSize; + Agg.TotalUserTimeMs += M.UserTimeMs; + Agg.TotalKernelTimeMs += M.KernelTimeMs; + Agg.ProcessCount++; + } + + return Agg; +} + +std::vector<TrackedProcessEntry> +ProcessGroup::Impl::GetMetricsSnapshot() const +{ + std::vector<TrackedProcessEntry> Result; + + RwLock::SharedLockScope $(m_Lock); + Result.reserve(m_Processes.size()); + + for (const auto& [Pid, Proc] : m_Processes) + { + TrackedProcessEntry Entry; + Entry.Pid = Pid; + Entry.Metrics = Proc->m_Impl->m_LastMetrics; + Entry.CpuUsagePercent = Proc->m_Impl->m_CpuUsagePercent.load(); + Result.push_back(std::move(Entry)); + } + + return Result; +} + +size_t +ProcessGroup::Impl::GetProcessCount() const +{ + RwLock::SharedLockScope $(m_Lock); + return m_Processes.size(); +} + +void +ProcessGroup::Impl::Enumerate(std::function<void(const ManagedProcess&)> Callback) const +{ + RwLock::SharedLockScope $(m_Lock); + for (const auto& [Pid, Proc] : m_Processes) + { + Callback(*Proc); + } +} + +// ============================================================================ +// ProcessGroup +// ============================================================================ + +ProcessGroup::ProcessGroup(std::unique_ptr<Impl> InImpl) : m_Impl(std::move(InImpl)) +{ +} + +ProcessGroup::~ProcessGroup() = default; + +std::string_view +ProcessGroup::GetName() const +{ + return m_Impl->m_Name; +} + +ManagedProcess* +ProcessGroup::Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit) +{ + ZEN_TRACE_CPU("ProcessGroup::Spawn"); + return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit)); +} + +ManagedProcess* +ProcessGroup::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit) +{ + ZEN_TRACE_CPU("ProcessGroup::Adopt"); + return m_Impl->Adopt(std::move(Handle), std::move(OnExit)); +} + +void +ProcessGroup::Remove(int Pid) +{ + ZEN_TRACE_CPU("ProcessGroup::Remove"); + m_Impl->Remove(Pid); +} + +void +ProcessGroup::KillAll() +{ + ZEN_TRACE_CPU("ProcessGroup::KillAll"); + m_Impl->KillAll(); +} + +AggregateProcessMetrics +ProcessGroup::GetAggregateMetrics() const +{ + return m_Impl->GetAggregateMetrics(); +} + +std::vector<TrackedProcessEntry> +ProcessGroup::GetMetricsSnapshot() const +{ + return m_Impl->GetMetricsSnapshot(); +} + +size_t +ProcessGroup::GetProcessCount() const +{ + return m_Impl->GetProcessCount(); +} + +void +ProcessGroup::Enumerate(std::function<void(const ManagedProcess&)> Callback) const +{ + m_Impl->Enumerate(std::move(Callback)); +} + +} // namespace zen + +// ============================================================================ +// Tests +// ============================================================================ + +#if ZEN_WITH_TESTS + +# include <zencore/testing.h> + +# include <chrono> + +ZEN_THIRD_PARTY_INCLUDES_START +# include <asio/io_context.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +using namespace zen; +using namespace std::literals; + +void +zen::subprocessmanager_forcelink() +{ +} + +namespace { + +std::filesystem::path +GetAppStubPath() +{ + std::error_code Ec; + std::filesystem::path SelfPath = GetProcessExecutablePath(zen::GetCurrentProcessId(), Ec); + return SelfPath.parent_path() / "zentest-appstub" ZEN_EXE_SUFFIX_LITERAL; +} + +} // namespace + +TEST_SUITE_BEGIN("util.subprocessmanager"); + +TEST_CASE("SubprocessManager.SpawnAndDetectExit") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -f=42"; + + int ReceivedExitCode = -1; + bool CallbackFired = false; + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int ExitCode) { + ReceivedExitCode = ExitCode; + CallbackFired = true; + }); + + IoContext.run_for(5s); + + CHECK(CallbackFired); + CHECK(ReceivedExitCode == 42); +} + +TEST_CASE("SubprocessManager.SpawnAndDetectCleanExit") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string(); + + int ReceivedExitCode = -1; + bool CallbackFired = false; + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int ExitCode) { + ReceivedExitCode = ExitCode; + CallbackFired = true; + }); + + IoContext.run_for(5s); + + CHECK(CallbackFired); + CHECK(ReceivedExitCode == 0); +} + +TEST_CASE("SubprocessManager.StdoutCapture") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -echo=hello_world"; + + StdoutPipeHandles StdoutPipe; + REQUIRE(CreateOverlappedStdoutPipe(StdoutPipe)); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + Options.StdoutPipe = &StdoutPipe; + + bool Exited = false; + + ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); + + IoContext.run_for(5s); + + CHECK(Exited); + std::string Captured = Proc->GetCapturedStdout(); + CHECK(Captured.find("hello_world") != std::string::npos); +} + +TEST_CASE("SubprocessManager.StderrCapture") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -echoerr=error_msg"; + + StdoutPipeHandles StdoutPipe; + StdoutPipeHandles StderrPipe; + REQUIRE(CreateOverlappedStdoutPipe(StdoutPipe)); + REQUIRE(CreateOverlappedStdoutPipe(StderrPipe)); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + Options.StdoutPipe = &StdoutPipe; + Options.StderrPipe = &StderrPipe; + + bool Exited = false; + + ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); + + IoContext.run_for(5s); + + CHECK(Exited); + std::string CapturedErr = Proc->GetCapturedStderr(); + CHECK(CapturedErr.find("error_msg") != std::string::npos); +} + +TEST_CASE("SubprocessManager.StdoutCallback") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -echo=callback_test"; + + StdoutPipeHandles StdoutPipe; + REQUIRE(CreateOverlappedStdoutPipe(StdoutPipe)); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + Options.StdoutPipe = &StdoutPipe; + + std::string ReceivedData; + bool Exited = false; + + ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); + + Proc->SetStdoutCallback([&](ManagedProcess&, std::string_view Data) { ReceivedData.append(Data); }); + + IoContext.run_for(5s); + + CHECK(Exited); + CHECK(ReceivedData.find("callback_test") != std::string::npos); + // When a callback is set, accumulated buffer should be empty + CHECK(Proc->GetCapturedStdout().empty()); +} + +TEST_CASE("SubprocessManager.MetricsSampling") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 100, .MetricsBatchSize = 16}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -t=2"; + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + bool Exited = false; + + ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); + + // Run for enough time to get metrics samples + IoContext.run_for(1s); + + ProcessMetrics Metrics = Proc->GetLatestMetrics(); + CHECK(Metrics.WorkingSetSize > 0); + + auto Snapshot = Manager.GetMetricsSnapshot(); + CHECK(Snapshot.size() == 1); + + // Let it finish + IoContext.run_for(3s); + CHECK(Exited); +} + +TEST_CASE("SubprocessManager.RemoveWhileRunning") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -t=10"; + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + bool CallbackFired = false; + + ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { CallbackFired = true; }); + + int Pid = Proc->Pid(); + + // Let it start + IoContext.run_for(100ms); + + // Remove without killing — callback should NOT fire after this + Manager.Remove(Pid); + + IoContext.run_for(500ms); + + CHECK_FALSE(CallbackFired); + CHECK(Manager.GetProcessCount() == 0); +} + +TEST_CASE("SubprocessManager.KillAndWaitForExit") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -t=60"; + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + bool CallbackFired = false; + + ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { CallbackFired = true; }); + + // Let it start + IoContext.run_for(200ms); + + Proc->Kill(); + + IoContext.run_for(2s); + + CHECK(CallbackFired); +} + +TEST_CASE("SubprocessManager.AdoptProcess") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -f=7"; + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + CreateProcResult Result = CreateProc(AppStub, CmdLine, Options); + + int ReceivedExitCode = -1; + + Manager.Adopt(ProcessHandle(Result), [&](ManagedProcess&, int ExitCode) { ReceivedExitCode = ExitCode; }); + + IoContext.run_for(5s); + + CHECK(ReceivedExitCode == 7); +} + +TEST_CASE("SubprocessManager.UserTag") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + std::filesystem::path AppStub = GetAppStubPath(); + std::string CmdLine = AppStub.string() + " -f=0"; + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + std::string ReceivedTag; + + ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess& P, int) { ReceivedTag = std::string(P.GetTag()); }); + + Proc->SetTag("my-worker-1"); + CHECK(Proc->GetTag() == "my-worker-1"); + + IoContext.run_for(5s); + + CHECK(ReceivedTag == "my-worker-1"); +} + +TEST_CASE("ProcessGroup.SpawnAndMembership") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + ProcessGroup* Group = Manager.CreateGroup("test-group"); + REQUIRE(Group != nullptr); + CHECK(Group->GetName() == "test-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + int ExitCount = 0; + + std::string CmdLine1 = AppStub.string() + " -f=0"; + std::string CmdLine2 = AppStub.string() + " -f=1"; + + Group->Spawn(AppStub, CmdLine1, Options, [&](ManagedProcess&, int) { ExitCount++; }); + Group->Spawn(AppStub, CmdLine2, Options, [&](ManagedProcess&, int) { ExitCount++; }); + + CHECK(Group->GetProcessCount() == 2); + CHECK(Manager.GetProcessCount() == 2); + + IoContext.run_for(5s); + + CHECK(ExitCount == 2); +} + +TEST_CASE("ProcessGroup.KillAll") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + ProcessGroup* Group = Manager.CreateGroup("kill-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + int ExitCount = 0; + + std::string CmdLine = AppStub.string() + " -t=60"; + + Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { ExitCount++; }); + Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { ExitCount++; }); + + // Let them start + IoContext.run_for(200ms); + CHECK(Group->GetProcessCount() == 2); + + Group->KillAll(); + CHECK(Group->GetProcessCount() == 0); +} + +TEST_CASE("ProcessGroup.AggregateMetrics") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 100, .MetricsBatchSize = 16}); + + ProcessGroup* Group = Manager.CreateGroup("metrics-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + std::string CmdLine = AppStub.string() + " -t=3"; + + Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); + Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); + + // Wait for metrics sampling + IoContext.run_for(1s); + + AggregateProcessMetrics GroupAgg = Group->GetAggregateMetrics(); + CHECK(GroupAgg.ProcessCount == 2); + CHECK(GroupAgg.TotalWorkingSetSize > 0); + + // Manager-level metrics should include group processes + AggregateProcessMetrics ManagerAgg = Manager.GetAggregateMetrics(); + CHECK(ManagerAgg.ProcessCount == 2); + + Group->KillAll(); +} + +TEST_CASE("ProcessGroup.DestroyGroup") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + ProcessGroup* Group = Manager.CreateGroup("destroy-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + std::string CmdLine = AppStub.string() + " -t=60"; + + Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); + Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); + + IoContext.run_for(200ms); + CHECK(Manager.GetProcessCount() == 2); + + Manager.DestroyGroup("destroy-group"); + + CHECK(Manager.FindGroup("destroy-group") == nullptr); + CHECK(Manager.GetProcessCount() == 0); +} + +TEST_CASE("ProcessGroup.MixedGroupedAndUngrouped") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + ProcessGroup* Group = Manager.CreateGroup("mixed-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + int GroupExitCount = 0; + int UngroupedExitCode = -1; + + std::string CmdLine = AppStub.string() + " -f=0"; + + // Grouped processes + Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { GroupExitCount++; }); + Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { GroupExitCount++; }); + + // Ungrouped process + Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int ExitCode) { UngroupedExitCode = ExitCode; }); + + CHECK(Group->GetProcessCount() == 2); + CHECK(Manager.GetProcessCount() == 3); + + IoContext.run_for(5s); + + CHECK(GroupExitCount == 2); + CHECK(UngroupedExitCode == 0); +} + +TEST_CASE("ProcessGroup.FindGroup") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + CHECK(Manager.FindGroup("nonexistent") == nullptr); + + ProcessGroup* Group = Manager.CreateGroup("findable"); + CHECK(Manager.FindGroup("findable") == Group); + CHECK(Manager.FindGroup("findable")->GetName() == "findable"); +} + +TEST_CASE("SubprocessManager.StressTest" * doctest::skip()) +{ + // Seed for reproducibility — change to explore different orderings + // + // Note that while this is a stress test, it is still single-threaded + + constexpr uint32_t Seed = 42; + std::mt19937 Rng(Seed); + ZEN_INFO("StressTest: seed={}", Seed); + + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 200, .MetricsBatchSize = 32}); + + std::filesystem::path AppStub = GetAppStubPath(); + CreateProcOptions BaseOptions; + BaseOptions.Flags = CreateProcOptions::Flag_NoConsole; + + std::atomic<int> TotalExitCallbacks{0}; + std::atomic<int> KilledGroupProcessCount{0}; + + auto MakeExitCallback = [&](std::atomic<int>& Counter) { + return [&Counter, &TotalExitCallbacks](ManagedProcess&, int) { + Counter++; + TotalExitCallbacks++; + }; + }; + + // ======================================================================== + // Phase 1: Spawn multiple groups with varied workloads + // ======================================================================== + + ZEN_INFO("StressTest: Phase 1 — spawning initial groups"); + + constexpr int NumInitialGroups = 8; + std::vector<std::string> GroupNames; + std::vector<std::atomic<int>> GroupExitCounts(NumInitialGroups); + std::uniform_int_distribution<int> ProcCountDist(5, 100); + std::uniform_int_distribution<int> SleepDist(1, 5); + std::uniform_int_distribution<int> ExitCodeDist(0, 10); + std::uniform_int_distribution<int> WorkloadDist(0, 2); // 0=sleep, 1=exit-code, 2=echo+exit + int TotalPhase1Spawned = 0; + + for (int G = 0; G < NumInitialGroups; G++) + { + std::string GroupName = fmt::format("stress-group-{}", G); + ProcessGroup* Group = Manager.CreateGroup(GroupName); + GroupNames.push_back(GroupName); + + int ProcCount = ProcCountDist(Rng); + for (int P = 0; P < ProcCount; P++) + { + std::string CmdLine; + int Workload = WorkloadDist(Rng); + if (Workload == 0) + { + int Sleep = SleepDist(Rng); + CmdLine = fmt::format("{} -t={}", AppStub.string(), Sleep); + } + else if (Workload == 1) + { + int Code = ExitCodeDist(Rng); + CmdLine = fmt::format("{} -f={}", AppStub.string(), Code); + } + else + { + int Code = ExitCodeDist(Rng); + CmdLine = fmt::format("{} -echo=stress_g{}_p{} -f={}", AppStub.string(), G, P, Code); + } + + Group->Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(GroupExitCounts[G])); + TotalPhase1Spawned++; + } + + ZEN_INFO("StressTest: group '{}' spawned {} processes", GroupName, ProcCount); + } + + ZEN_INFO("StressTest: Phase 1 total spawned: {}", TotalPhase1Spawned); + + // Let processes start running and some short-lived ones exit + IoContext.run_for(1s); + + // ======================================================================== + // Phase 2: Randomly kill some groups, create replacements, add ungrouped + // ======================================================================== + + ZEN_INFO("StressTest: Phase 2 — random group kills and replacements"); + + constexpr int NumGroupsToKill = 3; + + // Pick random groups to kill + std::vector<int> GroupIndices(NumInitialGroups); + std::iota(GroupIndices.begin(), GroupIndices.end(), 0); + std::shuffle(GroupIndices.begin(), GroupIndices.end(), Rng); + + std::vector<int> KilledIndices(GroupIndices.begin(), GroupIndices.begin() + NumGroupsToKill); + + for (int Idx : KilledIndices) + { + ProcessGroup* Group = Manager.FindGroup(GroupNames[Idx]); + if (Group) + { + size_t Count = Group->GetProcessCount(); + ZEN_INFO("StressTest: killing group '{}' ({} processes)", GroupNames[Idx], Count); + Manager.DestroyGroup(GroupNames[Idx]); + } + } + + // Let kills propagate + IoContext.run_for(500ms); + + // Create replacement groups + std::atomic<int> ReplacementExitCount{0}; + std::uniform_int_distribution<int> ReplacementCountDist(3, 10); + + for (int R = 0; R < NumGroupsToKill; R++) + { + std::string Name = fmt::format("replacement-group-{}", R); + ProcessGroup* Group = Manager.CreateGroup(Name); + int Count = ReplacementCountDist(Rng); + + for (int P = 0; P < Count; P++) + { + int Sleep = SleepDist(Rng); + std::string CmdLine = fmt::format("{} -t={}", AppStub.string(), Sleep); + Group->Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(ReplacementExitCount)); + } + + ZEN_INFO("StressTest: replacement group '{}' spawned {} processes", Name, Count); + } + + // Also spawn some ungrouped processes + std::atomic<int> UngroupedExitCount{0}; + constexpr int NumUngrouped = 10; + + for (int U = 0; U < NumUngrouped; U++) + { + int ExitCode = ExitCodeDist(Rng); + std::string CmdLine = fmt::format("{} -f={}", AppStub.string(), ExitCode); + Manager.Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(UngroupedExitCount)); + } + + ZEN_INFO("StressTest: spawned {} ungrouped processes", NumUngrouped); + + // Let things run + IoContext.run_for(2s); + + // ======================================================================== + // Phase 3: Rapid spawn/exit churn + // ======================================================================== + + ZEN_INFO("StressTest: Phase 3 — rapid spawn/exit churn"); + + std::atomic<int> ChurnExitCount{0}; + int TotalChurnSpawned = 0; + constexpr int NumChurnBatches = 10; + std::uniform_int_distribution<int> ChurnBatchSizeDist(10, 20); + + for (int Batch = 0; Batch < NumChurnBatches; Batch++) + { + std::string Name = fmt::format("churn-batch-{}", Batch); + ProcessGroup* Group = Manager.CreateGroup(Name); + int Count = ChurnBatchSizeDist(Rng); + + for (int P = 0; P < Count; P++) + { + // Immediate exit processes to stress spawn/exit path + std::string CmdLine = fmt::format("{} -f=0", AppStub.string()); + Group->Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(ChurnExitCount)); + TotalChurnSpawned++; + } + + // Brief pump to allow some exits to be processed + IoContext.run_for(200ms); + + // Destroy the group — any still-running processes get killed + Manager.DestroyGroup(Name); + } + + ZEN_INFO("StressTest: Phase 3 spawned {} churn processes across {} batches", TotalChurnSpawned, NumChurnBatches); + + // ======================================================================== + // Phase 4: Drain and verify + // ======================================================================== + + ZEN_INFO("StressTest: Phase 4 — draining remaining processes"); + + // Check metrics were collected before we wind down + AggregateProcessMetrics Agg = Manager.GetAggregateMetrics(); + ZEN_INFO("StressTest: aggregate metrics: {} processes, {} bytes working set", Agg.ProcessCount, Agg.TotalWorkingSetSize); + + // Let remaining processes finish (replacement groups have up to 5s sleep) + IoContext.run_for(8s); + + // Kill anything still running + Manager.RemoveAll(); + + // Final pump to process any remaining callbacks + IoContext.run_for(1s); + + ZEN_INFO("StressTest: Results:"); + ZEN_INFO("StressTest: total exit callbacks fired: {}", TotalExitCallbacks.load()); + ZEN_INFO("StressTest: ungrouped exits: {}", UngroupedExitCount.load()); + ZEN_INFO("StressTest: replacement exits: {}", ReplacementExitCount.load()); + ZEN_INFO("StressTest: churn exits: {}", ChurnExitCount.load()); + + // Verify the manager is clean + CHECK(Manager.GetProcessCount() == 0); + + // Ungrouped processes should all have exited (they were immediate-exit) + CHECK(UngroupedExitCount.load() == NumUngrouped); + + // Verify we got a reasonable number of total callbacks + // (exact count is hard to predict due to killed groups, but should be > 0) + CHECK(TotalExitCallbacks.load() > 0); + + ZEN_INFO("StressTest: PASSED — seed={}", Seed); +} + +TEST_SUITE_END(); + +#endif |