// Copyright Epic Games, Inc. All Rights Reserved. #include #include "asyncpipereader.h" #include "exitwatcher.h" #include #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #if ZEN_PLATFORM_WINDOWS # include #else # include #endif #include #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { // ============================================================================ // ManagedProcess::Impl // ============================================================================ struct ManagedProcess::Impl { asio::io_context& m_IoContext; ProcessHandle m_Handle; ProcessExitWatcher m_ExitWatcher; ProcessExitCallback m_ExitCallback; std::atomic m_Exited{false}; // Stdout capture std::unique_ptr m_StdoutReader; ProcessDataCallback m_StdoutCallback; mutable RwLock m_StdoutLock; std::string m_CapturedStdout; // Stderr capture std::unique_ptr m_StderrReader; ProcessDataCallback m_StderrCallback; mutable RwLock m_StderrLock; std::string m_CapturedStderr; // Metrics ProcessMetrics m_LastMetrics; std::atomic m_CpuUsagePercent{-1.0f}; uint64_t m_PrevUserTimeMs = 0; uint64_t m_PrevKernelTimeMs = 0; uint64_t m_PrevSampleTicks = 0; // User tag std::string m_Tag; explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext), m_ExitWatcher(IoContext) {} void OnStdoutData(ManagedProcess& Self, std::string_view Data) { if (m_StdoutCallback) { m_StdoutCallback(Self, Data); } else { RwLock::ExclusiveLockScope $(m_StdoutLock); m_CapturedStdout.append(Data); } } void OnStderrData(ManagedProcess& Self, std::string_view Data) { if (m_StderrCallback) { m_StderrCallback(Self, Data); } else { RwLock::ExclusiveLockScope $(m_StderrLock); m_CapturedStderr.append(Data); } } void SampleMetrics() { if (m_Exited.load()) { return; } ProcessMetrics Metrics; GetProcessMetrics(m_Handle, Metrics); uint64_t NowTicks = GetHifreqTimerValue(); if (m_PrevSampleTicks > 0) { uint64_t ElapsedMs = Stopwatch::GetElapsedTimeMs(NowTicks - m_PrevSampleTicks); uint64_t DeltaCpuTimeMs = (Metrics.UserTimeMs + Metrics.KernelTimeMs) - (m_PrevUserTimeMs + m_PrevKernelTimeMs); if (ElapsedMs > 0) { m_CpuUsagePercent.store(static_cast(static_cast(DeltaCpuTimeMs) / ElapsedMs * 100.0)); } } m_PrevUserTimeMs = Metrics.UserTimeMs; m_PrevKernelTimeMs = Metrics.KernelTimeMs; m_PrevSampleTicks = NowTicks; m_LastMetrics = Metrics; } [[nodiscard]] int Pid() const { return m_Handle.Pid(); } [[nodiscard]] bool IsRunning() const { return !m_Exited.load() && m_Handle.IsValid() && m_Handle.IsRunning(); } [[nodiscard]] std::string GetCapturedStdout() const { RwLock::SharedLockScope $(m_StdoutLock); return m_CapturedStdout; } [[nodiscard]] std::string GetCapturedStderr() const { RwLock::SharedLockScope $(m_StderrLock); return m_CapturedStderr; } void CancelAll() { m_ExitWatcher.Cancel(); if (m_StdoutReader) { m_StdoutReader->Stop(); } if (m_StderrReader) { m_StderrReader->Stop(); } } }; // ============================================================================ // ManagedProcess // ============================================================================ ManagedProcess::ManagedProcess(std::unique_ptr InImpl) : m_Impl(std::move(InImpl)) { } ManagedProcess::~ManagedProcess() { if (m_Impl) { m_Impl->CancelAll(); } } int ManagedProcess::Pid() const { return m_Impl->Pid(); } bool ManagedProcess::IsRunning() const { return m_Impl->IsRunning(); } const ProcessHandle& ManagedProcess::GetHandle() const { return m_Impl->m_Handle; } ProcessMetrics ManagedProcess::GetLatestMetrics() const { return m_Impl->m_LastMetrics; } float ManagedProcess::GetCpuUsagePercent() const { return m_Impl->m_CpuUsagePercent.load(); } void ManagedProcess::SetStdoutCallback(ProcessDataCallback Callback) { m_Impl->m_StdoutCallback = std::move(Callback); } void ManagedProcess::SetStderrCallback(ProcessDataCallback Callback) { m_Impl->m_StderrCallback = std::move(Callback); } std::string ManagedProcess::GetCapturedStdout() const { return m_Impl->GetCapturedStdout(); } std::string ManagedProcess::GetCapturedStderr() const { return m_Impl->GetCapturedStderr(); } bool ManagedProcess::Kill() { return m_Impl->m_Handle.Kill(); } bool ManagedProcess::Terminate(int ExitCode) { return m_Impl->m_Handle.Terminate(ExitCode); } void ManagedProcess::SetTag(std::string Tag) { m_Impl->m_Tag = std::move(Tag); } std::string_view ManagedProcess::GetTag() const { return m_Impl->m_Tag; } // ============================================================================ // SubprocessManager::Impl // ============================================================================ struct SubprocessManager::Impl { asio::io_context& m_IoContext; SubprocessManagerConfig m_Config; // Ungrouped processes mutable RwLock m_Lock; std::unordered_map> m_Processes; // Groups mutable RwLock m_GroupsLock; std::unordered_map> m_Groups; // Cross-group metrics index: all pids (grouped + ungrouped) for round-robin sampling mutable RwLock m_MetricsLock; std::unordered_map m_AllProcesses; // non-owning std::vector m_KeyOrder; size_t m_NextSampleIndex = 0; ProcessDataCallback m_DefaultStdoutCallback; ProcessDataCallback m_DefaultStderrCallback; std::unique_ptr m_MetricsTimer; std::atomic m_Running{true}; explicit Impl(asio::io_context& IoContext, SubprocessManagerConfig Config); ~Impl(); ManagedProcess* AddProcess(std::unique_ptr Process); void RegisterForMetrics(int Pid, ManagedProcess* Ptr); void UnregisterFromMetrics(int Pid); ManagedProcess* FindProcess(int Pid) const; void SetupExitWatcher(ManagedProcess* Proc, ProcessExitCallback OnExit); void SetupStdoutReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe); void SetupStderrReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe); ManagedProcess* Spawn(const std::filesystem::path& Executable, std::string_view CommandLine, CreateProcOptions& Options, ProcessExitCallback OnExit); ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit); void Remove(int Pid); void RemoveAll(); void SetDefaultStdoutCallback(ProcessDataCallback Callback) { m_DefaultStdoutCallback = std::move(Callback); } void SetDefaultStderrCallback(ProcessDataCallback Callback) { m_DefaultStderrCallback = std::move(Callback); } void EnqueueMetricsTimer(); void SampleBatch(); std::vector GetMetricsSnapshot() const; AggregateProcessMetrics GetAggregateMetrics() const; [[nodiscard]] size_t GetProcessCount() const; void Enumerate(std::function Callback) const; ProcessGroup* CreateGroup(std::string Name); void DestroyGroup(std::string_view Name); ProcessGroup* FindGroup(std::string_view Name) const; void EnumerateGroups(std::function Callback) const; }; // ============================================================================ // SubprocessManager::Impl method definitions // ============================================================================ SubprocessManager::Impl::Impl(asio::io_context& IoContext, SubprocessManagerConfig Config) : m_IoContext(IoContext), m_Config(Config) { if (m_Config.MetricsSampleIntervalMs > 0) { m_MetricsTimer = std::make_unique(IoContext); EnqueueMetricsTimer(); } } SubprocessManager::Impl::~Impl() { m_Running = false; if (m_MetricsTimer) { m_MetricsTimer->cancel(); } // Destroy groups first (they reference m_Manager back to us) { RwLock::ExclusiveLockScope $(m_GroupsLock); m_Groups.clear(); } RemoveAll(); } ManagedProcess* SubprocessManager::Impl::AddProcess(std::unique_ptr Process) { int Pid = Process->Pid(); ManagedProcess* Ptr = Process.get(); { RwLock::ExclusiveLockScope $(m_Lock); m_Processes[Pid] = std::move(Process); } RegisterForMetrics(Pid, Ptr); return Ptr; } void SubprocessManager::Impl::RegisterForMetrics(int Pid, ManagedProcess* Ptr) { RwLock::ExclusiveLockScope $(m_MetricsLock); m_AllProcesses[Pid] = Ptr; m_KeyOrder.push_back(Pid); } void SubprocessManager::Impl::UnregisterFromMetrics(int Pid) { RwLock::ExclusiveLockScope $(m_MetricsLock); m_AllProcesses.erase(Pid); m_KeyOrder.erase(std::remove(m_KeyOrder.begin(), m_KeyOrder.end(), Pid), m_KeyOrder.end()); if (m_NextSampleIndex >= m_KeyOrder.size()) { m_NextSampleIndex = 0; } } ManagedProcess* SubprocessManager::Impl::FindProcess(int Pid) const { RwLock::SharedLockScope $(m_MetricsLock); auto It = m_AllProcesses.find(Pid); if (It != m_AllProcesses.end()) { return It->second; } return nullptr; } void SubprocessManager::Impl::SetupExitWatcher(ManagedProcess* Proc, ProcessExitCallback OnExit) { int Pid = Proc->Pid(); Proc->m_Impl->m_ExitWatcher.Watch(Proc->m_Impl->m_Handle, [this, Pid, Callback = std::move(OnExit)](int ExitCode) { ManagedProcess* Found = FindProcess(Pid); if (Found) { Found->m_Impl->m_Exited.store(true); Callback(*Found, ExitCode); } }); } void SubprocessManager::Impl::SetupStdoutReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe) { int Pid = Proc->Pid(); Proc->m_Impl->m_StdoutReader = std::make_unique(m_IoContext); Proc->m_Impl->m_StdoutReader->Start( std::move(Pipe), [this, Pid](std::string_view Data) { ManagedProcess* Found = FindProcess(Pid); if (Found) { if (Found->m_Impl->m_StdoutCallback) { Found->m_Impl->m_StdoutCallback(*Found, Data); } else if (m_DefaultStdoutCallback) { m_DefaultStdoutCallback(*Found, Data); } else { Found->m_Impl->OnStdoutData(*Found, Data); } } }, [] {}); } void SubprocessManager::Impl::SetupStderrReader(ManagedProcess* Proc, StdoutPipeHandles&& Pipe) { int Pid = Proc->Pid(); Proc->m_Impl->m_StderrReader = std::make_unique(m_IoContext); Proc->m_Impl->m_StderrReader->Start( std::move(Pipe), [this, Pid](std::string_view Data) { ManagedProcess* Found = FindProcess(Pid); if (Found) { if (Found->m_Impl->m_StderrCallback) { Found->m_Impl->m_StderrCallback(*Found, Data); } else if (m_DefaultStderrCallback) { m_DefaultStderrCallback(*Found, Data); } else { Found->m_Impl->OnStderrData(*Found, Data); } } }, [] {}); } ManagedProcess* SubprocessManager::Impl::Spawn(const std::filesystem::path& Executable, std::string_view CommandLine, CreateProcOptions& Options, ProcessExitCallback OnExit) { bool HasStdout = Options.StdoutPipe != nullptr; bool HasStderr = Options.StderrPipe != nullptr; CreateProcResult Result = CreateProc(Executable, CommandLine, Options); auto ImplPtr = std::make_unique(m_IoContext); #if ZEN_PLATFORM_WINDOWS ImplPtr->m_Handle.Initialize(Result); #else ImplPtr->m_Handle.Initialize(static_cast(Result)); #endif auto Proc = std::unique_ptr(new ManagedProcess(std::move(ImplPtr))); ManagedProcess* Ptr = AddProcess(std::move(Proc)); SetupExitWatcher(Ptr, std::move(OnExit)); if (HasStdout) { SetupStdoutReader(Ptr, std::move(*Options.StdoutPipe)); } if (HasStderr) { SetupStderrReader(Ptr, std::move(*Options.StderrPipe)); } return Ptr; } ManagedProcess* SubprocessManager::Impl::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit) { int Pid = Handle.Pid(); auto ImplPtr = std::make_unique(m_IoContext); ImplPtr->m_Handle.Initialize(Pid); // Reset the original handle so caller doesn't double-close Handle.Reset(); auto Proc = std::unique_ptr(new ManagedProcess(std::move(ImplPtr))); ManagedProcess* Ptr = AddProcess(std::move(Proc)); SetupExitWatcher(Ptr, std::move(OnExit)); return Ptr; } void SubprocessManager::Impl::Remove(int Pid) { UnregisterFromMetrics(Pid); RwLock::ExclusiveLockScope $(m_Lock); auto It = m_Processes.find(Pid); if (It != m_Processes.end()) { It->second->m_Impl->CancelAll(); m_Processes.erase(It); } } void SubprocessManager::Impl::RemoveAll() { { RwLock::ExclusiveLockScope $(m_Lock); for (auto& [Pid, Proc] : m_Processes) { Proc->m_Impl->CancelAll(); } m_Processes.clear(); } { RwLock::ExclusiveLockScope $(m_MetricsLock); m_AllProcesses.clear(); m_KeyOrder.clear(); m_NextSampleIndex = 0; } } void SubprocessManager::Impl::EnqueueMetricsTimer() { if (!m_MetricsTimer || !m_Running.load()) { return; } m_MetricsTimer->expires_after(std::chrono::milliseconds(m_Config.MetricsSampleIntervalMs)); m_MetricsTimer->async_wait([this](const asio::error_code& Ec) { if (Ec || !m_Running.load()) { return; } SampleBatch(); EnqueueMetricsTimer(); }); } void SubprocessManager::Impl::SampleBatch() { RwLock::SharedLockScope $(m_MetricsLock); if (m_KeyOrder.empty()) { return; } size_t Remaining = std::min(static_cast(m_Config.MetricsBatchSize), m_KeyOrder.size()); while (Remaining > 0) { if (m_NextSampleIndex >= m_KeyOrder.size()) { m_NextSampleIndex = 0; } int Pid = m_KeyOrder[m_NextSampleIndex]; auto It = m_AllProcesses.find(Pid); if (It != m_AllProcesses.end()) { It->second->m_Impl->SampleMetrics(); } m_NextSampleIndex++; Remaining--; } } std::vector SubprocessManager::Impl::GetMetricsSnapshot() const { std::vector Result; RwLock::SharedLockScope $(m_MetricsLock); Result.reserve(m_AllProcesses.size()); for (const auto& [Pid, Proc] : m_AllProcesses) { TrackedProcessEntry Entry; Entry.Pid = Pid; Entry.Metrics = Proc->m_Impl->m_LastMetrics; Entry.CpuUsagePercent = Proc->m_Impl->m_CpuUsagePercent.load(); Result.push_back(std::move(Entry)); } return Result; } AggregateProcessMetrics SubprocessManager::Impl::GetAggregateMetrics() const { AggregateProcessMetrics Agg; RwLock::SharedLockScope $(m_MetricsLock); for (const auto& [Pid, Proc] : m_AllProcesses) { const ProcessMetrics& M = Proc->m_Impl->m_LastMetrics; Agg.TotalWorkingSetSize += M.WorkingSetSize; Agg.TotalPeakWorkingSetSize += M.PeakWorkingSetSize; Agg.TotalUserTimeMs += M.UserTimeMs; Agg.TotalKernelTimeMs += M.KernelTimeMs; Agg.ProcessCount++; } return Agg; } size_t SubprocessManager::Impl::GetProcessCount() const { RwLock::SharedLockScope $(m_MetricsLock); return m_AllProcesses.size(); } void SubprocessManager::Impl::Enumerate(std::function Callback) const { RwLock::SharedLockScope $(m_MetricsLock); for (const auto& [Pid, Proc] : m_AllProcesses) { Callback(*Proc); } } ProcessGroup* SubprocessManager::Impl::CreateGroup(std::string Name) { auto GroupImpl = std::make_unique(std::move(Name), *this, m_IoContext); ProcessGroup* Ptr = nullptr; auto Group = std::unique_ptr(new ProcessGroup(std::move(GroupImpl))); Ptr = Group.get(); RwLock::ExclusiveLockScope $(m_GroupsLock); m_Groups[std::string(Ptr->GetName())] = std::move(Group); return Ptr; } void SubprocessManager::Impl::DestroyGroup(std::string_view Name) { RwLock::ExclusiveLockScope $(m_GroupsLock); auto It = m_Groups.find(std::string(Name)); if (It != m_Groups.end()) { It->second->KillAll(); m_Groups.erase(It); } } ProcessGroup* SubprocessManager::Impl::FindGroup(std::string_view Name) const { RwLock::SharedLockScope $(m_GroupsLock); auto It = m_Groups.find(std::string(Name)); if (It != m_Groups.end()) { return It->second.get(); } return nullptr; } void SubprocessManager::Impl::EnumerateGroups(std::function Callback) const { RwLock::SharedLockScope $(m_GroupsLock); for (const auto& [Name, Group] : m_Groups) { Callback(*Group); } } // ============================================================================ // SubprocessManager // ============================================================================ SubprocessManager::SubprocessManager(asio::io_context& IoContext, SubprocessManagerConfig Config) : m_Impl(std::make_unique(IoContext, Config)) { } SubprocessManager::~SubprocessManager() = default; ManagedProcess* SubprocessManager::Spawn(const std::filesystem::path& Executable, std::string_view CommandLine, CreateProcOptions& Options, ProcessExitCallback OnExit) { ZEN_TRACE_CPU("SubprocessManager::Spawn"); return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit)); } ManagedProcess* SubprocessManager::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit) { ZEN_TRACE_CPU("SubprocessManager::Adopt"); return m_Impl->Adopt(std::move(Handle), std::move(OnExit)); } void SubprocessManager::Remove(int Pid) { ZEN_TRACE_CPU("SubprocessManager::Remove"); m_Impl->Remove(Pid); } void SubprocessManager::RemoveAll() { ZEN_TRACE_CPU("SubprocessManager::RemoveAll"); m_Impl->RemoveAll(); } void SubprocessManager::SetDefaultStdoutCallback(ProcessDataCallback Callback) { m_Impl->SetDefaultStdoutCallback(std::move(Callback)); } void SubprocessManager::SetDefaultStderrCallback(ProcessDataCallback Callback) { m_Impl->SetDefaultStderrCallback(std::move(Callback)); } std::vector SubprocessManager::GetMetricsSnapshot() const { return m_Impl->GetMetricsSnapshot(); } AggregateProcessMetrics SubprocessManager::GetAggregateMetrics() const { return m_Impl->GetAggregateMetrics(); } size_t SubprocessManager::GetProcessCount() const { return m_Impl->GetProcessCount(); } void SubprocessManager::Enumerate(std::function Callback) const { m_Impl->Enumerate(std::move(Callback)); } ProcessGroup* SubprocessManager::CreateGroup(std::string Name) { ZEN_TRACE_CPU("SubprocessManager::CreateGroup"); return m_Impl->CreateGroup(std::move(Name)); } void SubprocessManager::DestroyGroup(std::string_view Name) { ZEN_TRACE_CPU("SubprocessManager::DestroyGroup"); m_Impl->DestroyGroup(Name); } ProcessGroup* SubprocessManager::FindGroup(std::string_view Name) const { return m_Impl->FindGroup(Name); } void SubprocessManager::EnumerateGroups(std::function Callback) const { m_Impl->EnumerateGroups(std::move(Callback)); } // ============================================================================ // ProcessGroup::Impl // ============================================================================ struct ProcessGroup::Impl { std::string m_Name; SubprocessManager::Impl& m_Manager; asio::io_context& m_IoContext; mutable RwLock m_Lock; std::unordered_map> m_Processes; #if ZEN_PLATFORM_WINDOWS JobObject m_JobObject; #else int m_Pgid = 0; #endif Impl(std::string Name, SubprocessManager::Impl& Manager, asio::io_context& IoContext); ~Impl(); ManagedProcess* AddProcess(std::unique_ptr Process); ManagedProcess* Spawn(const std::filesystem::path& Executable, std::string_view CommandLine, CreateProcOptions& Options, ProcessExitCallback OnExit); ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit); void Remove(int Pid); void KillAll(); AggregateProcessMetrics GetAggregateMetrics() const; std::vector GetMetricsSnapshot() const; [[nodiscard]] size_t GetProcessCount() const; void Enumerate(std::function Callback) const; }; // ============================================================================ // ProcessGroup::Impl method definitions // ============================================================================ ProcessGroup::Impl::Impl(std::string Name, SubprocessManager::Impl& Manager, asio::io_context& IoContext) : m_Name(std::move(Name)) , m_Manager(Manager) , m_IoContext(IoContext) { #if ZEN_PLATFORM_WINDOWS m_JobObject.Initialize(); #endif } ProcessGroup::Impl::~Impl() { KillAll(); } ManagedProcess* ProcessGroup::Impl::AddProcess(std::unique_ptr Process) { int Pid = Process->Pid(); ManagedProcess* Ptr = Process.get(); { RwLock::ExclusiveLockScope $(m_Lock); m_Processes[Pid] = std::move(Process); } m_Manager.RegisterForMetrics(Pid, Ptr); return Ptr; } ManagedProcess* ProcessGroup::Impl::Spawn(const std::filesystem::path& Executable, std::string_view CommandLine, CreateProcOptions& Options, ProcessExitCallback OnExit) { bool HasStdout = Options.StdoutPipe != nullptr; bool HasStderr = Options.StderrPipe != nullptr; #if ZEN_PLATFORM_WINDOWS if (m_JobObject.IsValid()) { Options.AssignToJob = &m_JobObject; } #else if (m_Pgid > 0) { Options.ProcessGroupId = m_Pgid; } #endif CreateProcResult Result = CreateProc(Executable, CommandLine, Options); auto ImplPtr = std::make_unique(m_IoContext); #if ZEN_PLATFORM_WINDOWS ImplPtr->m_Handle.Initialize(Result); #else int Pid = static_cast(Result); ImplPtr->m_Handle.Initialize(Pid); // First process becomes the group leader if (m_Pgid == 0) { m_Pgid = Pid; } #endif auto Proc = std::unique_ptr(new ManagedProcess(std::move(ImplPtr))); ManagedProcess* Ptr = AddProcess(std::move(Proc)); m_Manager.SetupExitWatcher(Ptr, std::move(OnExit)); if (HasStdout) { m_Manager.SetupStdoutReader(Ptr, std::move(*Options.StdoutPipe)); } if (HasStderr) { m_Manager.SetupStderrReader(Ptr, std::move(*Options.StderrPipe)); } return Ptr; } ManagedProcess* ProcessGroup::Impl::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit) { int Pid = Handle.Pid(); auto ImplPtr = std::make_unique(m_IoContext); ImplPtr->m_Handle.Initialize(Pid); Handle.Reset(); #if ZEN_PLATFORM_WINDOWS if (m_JobObject.IsValid()) { m_JobObject.AssignProcess(ImplPtr->m_Handle.Handle()); } #endif auto Proc = std::unique_ptr(new ManagedProcess(std::move(ImplPtr))); ManagedProcess* Ptr = AddProcess(std::move(Proc)); m_Manager.SetupExitWatcher(Ptr, std::move(OnExit)); return Ptr; } void ProcessGroup::Impl::Remove(int Pid) { m_Manager.UnregisterFromMetrics(Pid); RwLock::ExclusiveLockScope $(m_Lock); auto It = m_Processes.find(Pid); if (It != m_Processes.end()) { It->second->m_Impl->CancelAll(); m_Processes.erase(It); } } void ProcessGroup::Impl::KillAll() { #if ZEN_PLATFORM_WINDOWS if (m_JobObject.IsValid()) { TerminateJobObject(static_cast(m_JobObject.Handle()), 1); } #else if (m_Pgid > 0) { kill(-m_Pgid, SIGTERM); } #endif // Also kill individually as fallback and clean up RwLock::ExclusiveLockScope $(m_Lock); for (auto& [Pid, Proc] : m_Processes) { if (Proc->IsRunning()) { Proc->Kill(); } m_Manager.UnregisterFromMetrics(Pid); Proc->m_Impl->CancelAll(); } m_Processes.clear(); } AggregateProcessMetrics ProcessGroup::Impl::GetAggregateMetrics() const { AggregateProcessMetrics Agg; RwLock::SharedLockScope $(m_Lock); for (const auto& [Pid, Proc] : m_Processes) { const ProcessMetrics& M = Proc->m_Impl->m_LastMetrics; Agg.TotalWorkingSetSize += M.WorkingSetSize; Agg.TotalPeakWorkingSetSize += M.PeakWorkingSetSize; Agg.TotalUserTimeMs += M.UserTimeMs; Agg.TotalKernelTimeMs += M.KernelTimeMs; Agg.ProcessCount++; } return Agg; } std::vector ProcessGroup::Impl::GetMetricsSnapshot() const { std::vector Result; RwLock::SharedLockScope $(m_Lock); Result.reserve(m_Processes.size()); for (const auto& [Pid, Proc] : m_Processes) { TrackedProcessEntry Entry; Entry.Pid = Pid; Entry.Metrics = Proc->m_Impl->m_LastMetrics; Entry.CpuUsagePercent = Proc->m_Impl->m_CpuUsagePercent.load(); Result.push_back(std::move(Entry)); } return Result; } size_t ProcessGroup::Impl::GetProcessCount() const { RwLock::SharedLockScope $(m_Lock); return m_Processes.size(); } void ProcessGroup::Impl::Enumerate(std::function Callback) const { RwLock::SharedLockScope $(m_Lock); for (const auto& [Pid, Proc] : m_Processes) { Callback(*Proc); } } // ============================================================================ // ProcessGroup // ============================================================================ ProcessGroup::ProcessGroup(std::unique_ptr InImpl) : m_Impl(std::move(InImpl)) { } ProcessGroup::~ProcessGroup() = default; std::string_view ProcessGroup::GetName() const { return m_Impl->m_Name; } ManagedProcess* ProcessGroup::Spawn(const std::filesystem::path& Executable, std::string_view CommandLine, CreateProcOptions& Options, ProcessExitCallback OnExit) { ZEN_TRACE_CPU("ProcessGroup::Spawn"); return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit)); } ManagedProcess* ProcessGroup::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit) { ZEN_TRACE_CPU("ProcessGroup::Adopt"); return m_Impl->Adopt(std::move(Handle), std::move(OnExit)); } void ProcessGroup::Remove(int Pid) { ZEN_TRACE_CPU("ProcessGroup::Remove"); m_Impl->Remove(Pid); } void ProcessGroup::KillAll() { ZEN_TRACE_CPU("ProcessGroup::KillAll"); m_Impl->KillAll(); } AggregateProcessMetrics ProcessGroup::GetAggregateMetrics() const { return m_Impl->GetAggregateMetrics(); } std::vector ProcessGroup::GetMetricsSnapshot() const { return m_Impl->GetMetricsSnapshot(); } size_t ProcessGroup::GetProcessCount() const { return m_Impl->GetProcessCount(); } void ProcessGroup::Enumerate(std::function Callback) const { m_Impl->Enumerate(std::move(Callback)); } } // namespace zen // ============================================================================ // Tests // ============================================================================ #if ZEN_WITH_TESTS # include # include ZEN_THIRD_PARTY_INCLUDES_START # include ZEN_THIRD_PARTY_INCLUDES_END using namespace zen; using namespace std::literals; void zen::subprocessmanager_forcelink() { } namespace { std::filesystem::path GetAppStubPath() { std::error_code Ec; std::filesystem::path SelfPath = GetProcessExecutablePath(zen::GetCurrentProcessId(), Ec); return SelfPath.parent_path() / "zentest-appstub" ZEN_EXE_SUFFIX_LITERAL; } } // namespace TEST_SUITE_BEGIN("util.subprocessmanager"); TEST_CASE("SubprocessManager.SpawnAndDetectExit") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); std::filesystem::path AppStub = GetAppStubPath(); std::string CmdLine = AppStub.string() + " -f=42"; int ReceivedExitCode = -1; bool CallbackFired = false; CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int ExitCode) { ReceivedExitCode = ExitCode; CallbackFired = true; }); IoContext.run_for(5s); CHECK(CallbackFired); CHECK(ReceivedExitCode == 42); } TEST_CASE("SubprocessManager.SpawnAndDetectCleanExit") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); std::filesystem::path AppStub = GetAppStubPath(); std::string CmdLine = AppStub.string(); int ReceivedExitCode = -1; bool CallbackFired = false; CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int ExitCode) { ReceivedExitCode = ExitCode; CallbackFired = true; }); IoContext.run_for(5s); CHECK(CallbackFired); CHECK(ReceivedExitCode == 0); } TEST_CASE("SubprocessManager.StdoutCapture") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); std::filesystem::path AppStub = GetAppStubPath(); std::string CmdLine = AppStub.string() + " -echo=hello_world"; StdoutPipeHandles StdoutPipe; REQUIRE(CreateOverlappedStdoutPipe(StdoutPipe)); CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; Options.StdoutPipe = &StdoutPipe; bool Exited = false; ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); IoContext.run_for(5s); CHECK(Exited); std::string Captured = Proc->GetCapturedStdout(); CHECK(Captured.find("hello_world") != std::string::npos); } TEST_CASE("SubprocessManager.StderrCapture") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); std::filesystem::path AppStub = GetAppStubPath(); std::string CmdLine = AppStub.string() + " -echoerr=error_msg"; StdoutPipeHandles StdoutPipe; StdoutPipeHandles StderrPipe; REQUIRE(CreateOverlappedStdoutPipe(StdoutPipe)); REQUIRE(CreateOverlappedStdoutPipe(StderrPipe)); CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; Options.StdoutPipe = &StdoutPipe; Options.StderrPipe = &StderrPipe; bool Exited = false; ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); IoContext.run_for(5s); CHECK(Exited); std::string CapturedErr = Proc->GetCapturedStderr(); CHECK(CapturedErr.find("error_msg") != std::string::npos); } TEST_CASE("SubprocessManager.StdoutCallback") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); std::filesystem::path AppStub = GetAppStubPath(); std::string CmdLine = AppStub.string() + " -echo=callback_test"; StdoutPipeHandles StdoutPipe; REQUIRE(CreateOverlappedStdoutPipe(StdoutPipe)); CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; Options.StdoutPipe = &StdoutPipe; std::string ReceivedData; bool Exited = false; ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); Proc->SetStdoutCallback([&](ManagedProcess&, std::string_view Data) { ReceivedData.append(Data); }); IoContext.run_for(5s); CHECK(Exited); CHECK(ReceivedData.find("callback_test") != std::string::npos); // When a callback is set, accumulated buffer should be empty CHECK(Proc->GetCapturedStdout().empty()); } TEST_CASE("SubprocessManager.MetricsSampling") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 100, .MetricsBatchSize = 16}); std::filesystem::path AppStub = GetAppStubPath(); std::string CmdLine = AppStub.string() + " -t=2"; CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; bool Exited = false; ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); // Run for enough time to get metrics samples IoContext.run_for(1s); ProcessMetrics Metrics = Proc->GetLatestMetrics(); CHECK(Metrics.WorkingSetSize > 0); auto Snapshot = Manager.GetMetricsSnapshot(); CHECK(Snapshot.size() == 1); // Let it finish IoContext.run_for(3s); CHECK(Exited); } TEST_CASE("SubprocessManager.RemoveWhileRunning") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); std::filesystem::path AppStub = GetAppStubPath(); std::string CmdLine = AppStub.string() + " -t=10"; CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; bool CallbackFired = false; ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { CallbackFired = true; }); int Pid = Proc->Pid(); // Let it start IoContext.run_for(100ms); // Remove without killing — callback should NOT fire after this Manager.Remove(Pid); IoContext.run_for(500ms); CHECK_FALSE(CallbackFired); CHECK(Manager.GetProcessCount() == 0); } TEST_CASE("SubprocessManager.KillAndWaitForExit") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); std::filesystem::path AppStub = GetAppStubPath(); std::string CmdLine = AppStub.string() + " -t=60"; CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; bool CallbackFired = false; ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { CallbackFired = true; }); // Let it start IoContext.run_for(200ms); Proc->Kill(); IoContext.run_for(2s); CHECK(CallbackFired); } TEST_CASE("SubprocessManager.AdoptProcess") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); std::filesystem::path AppStub = GetAppStubPath(); std::string CmdLine = AppStub.string() + " -f=7"; CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; CreateProcResult Result = CreateProc(AppStub, CmdLine, Options); int ReceivedExitCode = -1; Manager.Adopt(ProcessHandle(Result), [&](ManagedProcess&, int ExitCode) { ReceivedExitCode = ExitCode; }); IoContext.run_for(5s); CHECK(ReceivedExitCode == 7); } TEST_CASE("SubprocessManager.UserTag") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); std::filesystem::path AppStub = GetAppStubPath(); std::string CmdLine = AppStub.string() + " -f=0"; CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; std::string ReceivedTag; ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess& P, int) { ReceivedTag = std::string(P.GetTag()); }); Proc->SetTag("my-worker-1"); CHECK(Proc->GetTag() == "my-worker-1"); IoContext.run_for(5s); CHECK(ReceivedTag == "my-worker-1"); } TEST_CASE("ProcessGroup.SpawnAndMembership") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); ProcessGroup* Group = Manager.CreateGroup("test-group"); REQUIRE(Group != nullptr); CHECK(Group->GetName() == "test-group"); std::filesystem::path AppStub = GetAppStubPath(); CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; int ExitCount = 0; std::string CmdLine1 = AppStub.string() + " -f=0"; std::string CmdLine2 = AppStub.string() + " -f=1"; Group->Spawn(AppStub, CmdLine1, Options, [&](ManagedProcess&, int) { ExitCount++; }); Group->Spawn(AppStub, CmdLine2, Options, [&](ManagedProcess&, int) { ExitCount++; }); CHECK(Group->GetProcessCount() == 2); CHECK(Manager.GetProcessCount() == 2); IoContext.run_for(5s); CHECK(ExitCount == 2); } TEST_CASE("ProcessGroup.KillAll") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); ProcessGroup* Group = Manager.CreateGroup("kill-group"); std::filesystem::path AppStub = GetAppStubPath(); CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; int ExitCount = 0; std::string CmdLine = AppStub.string() + " -t=60"; Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { ExitCount++; }); Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { ExitCount++; }); // Let them start IoContext.run_for(200ms); CHECK(Group->GetProcessCount() == 2); Group->KillAll(); CHECK(Group->GetProcessCount() == 0); } TEST_CASE("ProcessGroup.AggregateMetrics") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 100, .MetricsBatchSize = 16}); ProcessGroup* Group = Manager.CreateGroup("metrics-group"); std::filesystem::path AppStub = GetAppStubPath(); CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; std::string CmdLine = AppStub.string() + " -t=3"; Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); // Wait for metrics sampling IoContext.run_for(1s); AggregateProcessMetrics GroupAgg = Group->GetAggregateMetrics(); CHECK(GroupAgg.ProcessCount == 2); CHECK(GroupAgg.TotalWorkingSetSize > 0); // Manager-level metrics should include group processes AggregateProcessMetrics ManagerAgg = Manager.GetAggregateMetrics(); CHECK(ManagerAgg.ProcessCount == 2); Group->KillAll(); } TEST_CASE("ProcessGroup.DestroyGroup") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); ProcessGroup* Group = Manager.CreateGroup("destroy-group"); std::filesystem::path AppStub = GetAppStubPath(); CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; std::string CmdLine = AppStub.string() + " -t=60"; Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); IoContext.run_for(200ms); CHECK(Manager.GetProcessCount() == 2); Manager.DestroyGroup("destroy-group"); CHECK(Manager.FindGroup("destroy-group") == nullptr); CHECK(Manager.GetProcessCount() == 0); } TEST_CASE("ProcessGroup.MixedGroupedAndUngrouped") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); ProcessGroup* Group = Manager.CreateGroup("mixed-group"); std::filesystem::path AppStub = GetAppStubPath(); CreateProcOptions Options; Options.Flags = CreateProcOptions::Flag_NoConsole; int GroupExitCount = 0; int UngroupedExitCode = -1; std::string CmdLine = AppStub.string() + " -f=0"; // Grouped processes Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { GroupExitCount++; }); Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { GroupExitCount++; }); // Ungrouped process Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int ExitCode) { UngroupedExitCode = ExitCode; }); CHECK(Group->GetProcessCount() == 2); CHECK(Manager.GetProcessCount() == 3); IoContext.run_for(5s); CHECK(GroupExitCount == 2); CHECK(UngroupedExitCode == 0); } TEST_CASE("ProcessGroup.FindGroup") { asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); CHECK(Manager.FindGroup("nonexistent") == nullptr); ProcessGroup* Group = Manager.CreateGroup("findable"); CHECK(Manager.FindGroup("findable") == Group); CHECK(Manager.FindGroup("findable")->GetName() == "findable"); } TEST_CASE("SubprocessManager.StressTest" * doctest::skip()) { // Seed for reproducibility — change to explore different orderings // // Note that while this is a stress test, it is still single-threaded constexpr uint32_t Seed = 42; std::mt19937 Rng(Seed); ZEN_INFO("StressTest: seed={}", Seed); asio::io_context IoContext; SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 200, .MetricsBatchSize = 32}); std::filesystem::path AppStub = GetAppStubPath(); CreateProcOptions BaseOptions; BaseOptions.Flags = CreateProcOptions::Flag_NoConsole; std::atomic TotalExitCallbacks{0}; std::atomic KilledGroupProcessCount{0}; auto MakeExitCallback = [&](std::atomic& Counter) { return [&Counter, &TotalExitCallbacks](ManagedProcess&, int) { Counter++; TotalExitCallbacks++; }; }; // ======================================================================== // Phase 1: Spawn multiple groups with varied workloads // ======================================================================== ZEN_INFO("StressTest: Phase 1 — spawning initial groups"); constexpr int NumInitialGroups = 8; std::vector GroupNames; std::vector> GroupExitCounts(NumInitialGroups); std::uniform_int_distribution ProcCountDist(5, 100); std::uniform_int_distribution SleepDist(1, 5); std::uniform_int_distribution ExitCodeDist(0, 10); std::uniform_int_distribution WorkloadDist(0, 2); // 0=sleep, 1=exit-code, 2=echo+exit int TotalPhase1Spawned = 0; for (int G = 0; G < NumInitialGroups; G++) { std::string GroupName = fmt::format("stress-group-{}", G); ProcessGroup* Group = Manager.CreateGroup(GroupName); GroupNames.push_back(GroupName); int ProcCount = ProcCountDist(Rng); for (int P = 0; P < ProcCount; P++) { std::string CmdLine; int Workload = WorkloadDist(Rng); if (Workload == 0) { int Sleep = SleepDist(Rng); CmdLine = fmt::format("{} -t={}", AppStub.string(), Sleep); } else if (Workload == 1) { int Code = ExitCodeDist(Rng); CmdLine = fmt::format("{} -f={}", AppStub.string(), Code); } else { int Code = ExitCodeDist(Rng); CmdLine = fmt::format("{} -echo=stress_g{}_p{} -f={}", AppStub.string(), G, P, Code); } Group->Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(GroupExitCounts[G])); TotalPhase1Spawned++; } ZEN_INFO("StressTest: group '{}' spawned {} processes", GroupName, ProcCount); } ZEN_INFO("StressTest: Phase 1 total spawned: {}", TotalPhase1Spawned); // Let processes start running and some short-lived ones exit IoContext.run_for(1s); // ======================================================================== // Phase 2: Randomly kill some groups, create replacements, add ungrouped // ======================================================================== ZEN_INFO("StressTest: Phase 2 — random group kills and replacements"); constexpr int NumGroupsToKill = 3; // Pick random groups to kill std::vector GroupIndices(NumInitialGroups); std::iota(GroupIndices.begin(), GroupIndices.end(), 0); std::shuffle(GroupIndices.begin(), GroupIndices.end(), Rng); std::vector KilledIndices(GroupIndices.begin(), GroupIndices.begin() + NumGroupsToKill); for (int Idx : KilledIndices) { ProcessGroup* Group = Manager.FindGroup(GroupNames[Idx]); if (Group) { size_t Count = Group->GetProcessCount(); ZEN_INFO("StressTest: killing group '{}' ({} processes)", GroupNames[Idx], Count); Manager.DestroyGroup(GroupNames[Idx]); } } // Let kills propagate IoContext.run_for(500ms); // Create replacement groups std::atomic ReplacementExitCount{0}; std::uniform_int_distribution ReplacementCountDist(3, 10); for (int R = 0; R < NumGroupsToKill; R++) { std::string Name = fmt::format("replacement-group-{}", R); ProcessGroup* Group = Manager.CreateGroup(Name); int Count = ReplacementCountDist(Rng); for (int P = 0; P < Count; P++) { int Sleep = SleepDist(Rng); std::string CmdLine = fmt::format("{} -t={}", AppStub.string(), Sleep); Group->Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(ReplacementExitCount)); } ZEN_INFO("StressTest: replacement group '{}' spawned {} processes", Name, Count); } // Also spawn some ungrouped processes std::atomic UngroupedExitCount{0}; constexpr int NumUngrouped = 10; for (int U = 0; U < NumUngrouped; U++) { int ExitCode = ExitCodeDist(Rng); std::string CmdLine = fmt::format("{} -f={}", AppStub.string(), ExitCode); Manager.Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(UngroupedExitCount)); } ZEN_INFO("StressTest: spawned {} ungrouped processes", NumUngrouped); // Let things run IoContext.run_for(2s); // ======================================================================== // Phase 3: Rapid spawn/exit churn // ======================================================================== ZEN_INFO("StressTest: Phase 3 — rapid spawn/exit churn"); std::atomic ChurnExitCount{0}; int TotalChurnSpawned = 0; constexpr int NumChurnBatches = 10; std::uniform_int_distribution ChurnBatchSizeDist(10, 20); for (int Batch = 0; Batch < NumChurnBatches; Batch++) { std::string Name = fmt::format("churn-batch-{}", Batch); ProcessGroup* Group = Manager.CreateGroup(Name); int Count = ChurnBatchSizeDist(Rng); for (int P = 0; P < Count; P++) { // Immediate exit processes to stress spawn/exit path std::string CmdLine = fmt::format("{} -f=0", AppStub.string()); Group->Spawn(AppStub, CmdLine, BaseOptions, MakeExitCallback(ChurnExitCount)); TotalChurnSpawned++; } // Brief pump to allow some exits to be processed IoContext.run_for(200ms); // Destroy the group — any still-running processes get killed Manager.DestroyGroup(Name); } ZEN_INFO("StressTest: Phase 3 spawned {} churn processes across {} batches", TotalChurnSpawned, NumChurnBatches); // ======================================================================== // Phase 4: Drain and verify // ======================================================================== ZEN_INFO("StressTest: Phase 4 — draining remaining processes"); // Check metrics were collected before we wind down AggregateProcessMetrics Agg = Manager.GetAggregateMetrics(); ZEN_INFO("StressTest: aggregate metrics: {} processes, {} bytes working set", Agg.ProcessCount, Agg.TotalWorkingSetSize); // Let remaining processes finish (replacement groups have up to 5s sleep) IoContext.run_for(8s); // Kill anything still running Manager.RemoveAll(); // Final pump to process any remaining callbacks IoContext.run_for(1s); ZEN_INFO("StressTest: Results:"); ZEN_INFO("StressTest: total exit callbacks fired: {}", TotalExitCallbacks.load()); ZEN_INFO("StressTest: ungrouped exits: {}", UngroupedExitCount.load()); ZEN_INFO("StressTest: replacement exits: {}", ReplacementExitCount.load()); ZEN_INFO("StressTest: churn exits: {}", ChurnExitCount.load()); // Verify the manager is clean CHECK(Manager.GetProcessCount() == 0); // Ungrouped processes should all have exited (they were immediate-exit) CHECK(UngroupedExitCount.load() == NumUngrouped); // Verify we got a reasonable number of total callbacks // (exact count is hard to predict due to killed groups, but should be > 0) CHECK(TotalExitCallbacks.load() > 0); ZEN_INFO("StressTest: PASSED — seed={}", Seed); } TEST_SUITE_END(); #endif