diff options
| author | Stefan Boberg <[email protected]> | 2026-03-23 23:27:24 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2026-03-23 23:27:24 +0100 |
| commit | 65da422ad9cc5339a17252e30a0e88d757e21586 (patch) | |
| tree | 2af1ba57474ed2e48d0c8d131857153588b28551 | |
| parent | updated comment (diff) | |
| download | zen-sb/proc1000.tar.xz zen-sb/proc1000.zip | |
Add ProcessGroup for OS-backed process groupingsb/proc1000
Introduces ProcessGroup as a first-class object for managing related
processes together. On Windows, groups are backed by a JobObject with
kill-on-close guarantees. On Linux/macOS, groups use setpgid() for bulk
signal delivery via kill(-pgid).
Groups provide per-group metrics aggregation, bulk KillAll, and
independent Spawn/Adopt/Remove. The SubprocessManager metrics timer
samples across both grouped and ungrouped processes via a unified index.
| -rw-r--r-- | src/zencore/include/zencore/process.h | 12 | ||||
| -rw-r--r-- | src/zencore/process.cpp | 5 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/process/subprocessmanager.h | 79 | ||||
| -rw-r--r-- | src/zenutil/process/subprocessmanager.cpp | 641 |
4 files changed, 677 insertions, 60 deletions
diff --git a/src/zencore/include/zencore/process.h b/src/zencore/include/zencore/process.h index d115bf11f..ea21a74de 100644 --- a/src/zencore/include/zencore/process.h +++ b/src/zencore/include/zencore/process.h @@ -171,6 +171,11 @@ struct CreateProcOptions #if ZEN_PLATFORM_WINDOWS JobObject* AssignToJob = nullptr; // When set, the process is created suspended, assigned to the job, then resumed +#else + /// POSIX process group id. When > 0, the child is placed into this process + /// group via setpgid() before exec. Use the pid of the first child as the + /// pgid to create a group, then pass the same pgid for subsequent children. + int ProcessGroupId = 0; #endif }; @@ -224,9 +229,10 @@ public: JobObject(const JobObject&) = delete; JobObject& operator=(const JobObject&) = delete; - void Initialize(); - bool AssignProcess(void* ProcessHandle); - [[nodiscard]] bool IsValid() const; + void Initialize(); + bool AssignProcess(void* ProcessHandle); + [[nodiscard]] bool IsValid() const; + [[nodiscard]] void* Handle() const { return m_JobHandle; } private: void* m_JobHandle = nullptr; diff --git a/src/zencore/process.cpp b/src/zencore/process.cpp index dcb8b2422..03eef2bc7 100644 --- a/src/zencore/process.cpp +++ b/src/zencore/process.cpp @@ -1070,6 +1070,11 @@ CreateProc(const std::filesystem::path& Executable, std::string_view CommandLine } } + if (Options.ProcessGroupId > 0) + { + setpgid(0, Options.ProcessGroupId); + } + for (const auto& [Key, Value] : Options.Environment) { setenv(Key.c_str(), Value.c_str(), 1); diff --git a/src/zenutil/include/zenutil/process/subprocessmanager.h b/src/zenutil/include/zenutil/process/subprocessmanager.h index 91b887449..f241a4c3e 100644 --- a/src/zenutil/include/zenutil/process/subprocessmanager.h +++ b/src/zenutil/include/zenutil/process/subprocessmanager.h @@ -21,6 +21,7 @@ class io_context; namespace zen { class ManagedProcess; +class ProcessGroup; /// Callback invoked when a managed process exits. using ProcessExitCallback = std::function<void(ManagedProcess& Process, int ExitCode)>; @@ -107,7 +108,23 @@ public: /// Enumerate all managed processes under a shared lock. void Enumerate(std::function<void(const ManagedProcess&)> Callback) const; + /// Create a new process group. The group is owned by this manager. + /// On Windows the group is backed by a JobObject (kill-on-close guarantee). + /// On POSIX the group uses setpgid for bulk signal delivery. + ProcessGroup* CreateGroup(std::string Name); + + /// Destroy a group by name. Kills all processes in the group first. + void DestroyGroup(std::string_view Name); + + /// Find a group by name. Returns nullptr if not found. + [[nodiscard]] ProcessGroup* FindGroup(std::string_view Name) const; + + /// Enumerate all groups. + void EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const; + private: + friend class ProcessGroup; + struct Impl; std::unique_ptr<Impl> m_Impl; }; @@ -167,6 +184,7 @@ public: private: friend class SubprocessManager; + friend class ProcessGroup; struct Impl; std::unique_ptr<Impl> m_Impl; @@ -174,6 +192,67 @@ private: explicit ManagedProcess(std::unique_ptr<Impl> InImpl); }; +/// A group of managed processes with OS-level backing. +/// +/// On Windows: backed by a JobObject. All processes assigned on spawn. +/// Kill-on-close guarantee — if the group is destroyed, the OS terminates +/// all member processes. +/// On Linux/macOS: uses setpgid() so children share a process group. +/// Enables bulk signal delivery via kill(-pgid, sig). +/// +/// Created via SubprocessManager::CreateGroup(). Not user-constructible. +class ProcessGroup +{ +public: + ~ProcessGroup(); + + ProcessGroup(const ProcessGroup&) = delete; + ProcessGroup& operator=(const ProcessGroup&) = delete; + + /// Group name (as passed to CreateGroup). + [[nodiscard]] std::string_view GetName() const; + + /// Spawn a process into this group. + ManagedProcess* Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit); + + /// Adopt an already-running process into this group. + /// On Windows the process is assigned to the group's JobObject. + /// On POSIX the process cannot be moved into a different process group + /// after creation, so OS-level grouping is best-effort for adopted processes. + ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit); + + /// Remove a process from this group. Does NOT kill it. + void Remove(int Pid); + + /// Kill all processes in the group. + /// On Windows: uses TerminateJobObject for atomic group kill. + /// On POSIX: sends SIGTERM then SIGKILL to the process group. + void KillAll(); + + /// Aggregate metrics for this group's processes. + [[nodiscard]] AggregateProcessMetrics GetAggregateMetrics() const; + + /// Per-process metrics snapshot for this group. + [[nodiscard]] std::vector<TrackedProcessEntry> GetMetricsSnapshot() const; + + /// Number of processes in this group. + [[nodiscard]] size_t GetProcessCount() const; + + /// Enumerate processes in this group. + void Enumerate(std::function<void(const ManagedProcess&)> Callback) const; + +private: + friend class SubprocessManager; + + struct Impl; + std::unique_ptr<Impl> m_Impl; + + explicit ProcessGroup(std::unique_ptr<Impl> InImpl); +}; + void subprocessmanager_forcelink(); // internal } // namespace zen diff --git a/src/zenutil/process/subprocessmanager.cpp b/src/zenutil/process/subprocessmanager.cpp index b49f0360b..155327dd3 100644 --- a/src/zenutil/process/subprocessmanager.cpp +++ b/src/zenutil/process/subprocessmanager.cpp @@ -15,6 +15,11 @@ #include <vector> ZEN_THIRD_PARTY_INCLUDES_START +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> +#else +# include <csignal> +#endif #include <asio/io_context.hpp> #include <asio/steady_timer.hpp> ZEN_THIRD_PARTY_INCLUDES_END @@ -244,10 +249,19 @@ struct SubprocessManager::Impl asio::io_context& m_IoContext; SubprocessManagerConfig m_Config; + // Ungrouped processes mutable RwLock m_Lock; std::unordered_map<int, std::unique_ptr<ManagedProcess>> m_Processes; - std::vector<int> m_KeyOrder; - size_t m_NextSampleIndex = 0; + + // Groups + mutable RwLock m_GroupsLock; + std::unordered_map<std::string, std::unique_ptr<ProcessGroup>> m_Groups; + + // Cross-group metrics index: all pids (grouped + ungrouped) for round-robin sampling + mutable RwLock m_MetricsLock; + std::unordered_map<int, ManagedProcess*> m_AllProcesses; // non-owning + std::vector<int> m_KeyOrder; + size_t m_NextSampleIndex = 0; ProcessDataCallback m_DefaultStdoutCallback; ProcessDataCallback m_DefaultStderrCallback; @@ -271,6 +285,13 @@ struct SubprocessManager::Impl { m_MetricsTimer->cancel(); } + + // Destroy groups first (they reference m_Manager back to us) + { + RwLock::ExclusiveLockScope $(m_GroupsLock); + m_Groups.clear(); + } + RemoveAll(); } @@ -279,27 +300,50 @@ struct SubprocessManager::Impl int Pid = Process->Pid(); ManagedProcess* Ptr = Process.get(); - RwLock::ExclusiveLockScope $(m_Lock); - m_Processes[Pid] = std::move(Process); - m_KeyOrder.push_back(Pid); + { + RwLock::ExclusiveLockScope $(m_Lock); + m_Processes[Pid] = std::move(Process); + } + RegisterForMetrics(Pid, Ptr); return Ptr; } + void RegisterForMetrics(int Pid, ManagedProcess* Ptr) + { + RwLock::ExclusiveLockScope $(m_MetricsLock); + m_AllProcesses[Pid] = Ptr; + m_KeyOrder.push_back(Pid); + } + + void UnregisterFromMetrics(int Pid) + { + RwLock::ExclusiveLockScope $(m_MetricsLock); + m_AllProcesses.erase(Pid); + m_KeyOrder.erase(std::remove(m_KeyOrder.begin(), m_KeyOrder.end(), Pid), m_KeyOrder.end()); + if (m_NextSampleIndex >= m_KeyOrder.size()) + { + m_NextSampleIndex = 0; + } + } + + ManagedProcess* FindProcess(int Pid) const + { + RwLock::SharedLockScope $(m_MetricsLock); + auto It = m_AllProcesses.find(Pid); + if (It != m_AllProcesses.end()) + { + return It->second; + } + return nullptr; + } + void SetupExitWatcher(ManagedProcess* Proc, ProcessExitCallback OnExit) { int Pid = Proc->Pid(); Proc->m_Impl->m_ExitWatcher.Watch(Proc->m_Impl->m_Handle, [this, Pid, Callback = std::move(OnExit)](int ExitCode) { - ManagedProcess* Found = nullptr; - { - RwLock::SharedLockScope $(m_Lock); - auto It = m_Processes.find(Pid); - if (It != m_Processes.end()) - { - Found = It->second.get(); - } - } + ManagedProcess* Found = FindProcess(Pid); if (Found) { @@ -316,16 +360,7 @@ struct SubprocessManager::Impl Proc->m_Impl->m_StdoutReader->Start( std::move(Pipe), [this, Pid](std::string_view Data) { - ManagedProcess* Found = nullptr; - { - RwLock::SharedLockScope $(m_Lock); - auto It = m_Processes.find(Pid); - if (It != m_Processes.end()) - { - Found = It->second.get(); - } - } - + ManagedProcess* Found = FindProcess(Pid); if (Found) { if (Found->m_Impl->m_StdoutCallback) @@ -352,16 +387,7 @@ struct SubprocessManager::Impl Proc->m_Impl->m_StderrReader->Start( std::move(Pipe), [this, Pid](std::string_view Data) { - ManagedProcess* Found = nullptr; - { - RwLock::SharedLockScope $(m_Lock); - auto It = m_Processes.find(Pid); - if (It != m_Processes.end()) - { - Found = It->second.get(); - } - } - + ManagedProcess* Found = FindProcess(Pid); if (Found) { if (Found->m_Impl->m_StderrCallback) @@ -383,6 +409,8 @@ struct SubprocessManager::Impl void Remove(int Pid) { + UnregisterFromMetrics(Pid); + RwLock::ExclusiveLockScope $(m_Lock); auto It = m_Processes.find(Pid); if (It != m_Processes.end()) @@ -390,11 +418,6 @@ struct SubprocessManager::Impl It->second->m_Impl->CancelAll(); m_Processes.erase(It); } - m_KeyOrder.erase(std::remove(m_KeyOrder.begin(), m_KeyOrder.end(), Pid), m_KeyOrder.end()); - if (m_NextSampleIndex >= m_KeyOrder.size()) - { - m_NextSampleIndex = 0; - } } void SetDefaultStdoutCallback(ProcessDataCallback Callback) { m_DefaultStdoutCallback = std::move(Callback); } @@ -402,14 +425,21 @@ struct SubprocessManager::Impl void RemoveAll() { - RwLock::ExclusiveLockScope $(m_Lock); - for (auto& [Pid, Proc] : m_Processes) { - Proc->m_Impl->CancelAll(); + RwLock::ExclusiveLockScope $(m_Lock); + for (auto& [Pid, Proc] : m_Processes) + { + Proc->m_Impl->CancelAll(); + } + m_Processes.clear(); + } + + { + RwLock::ExclusiveLockScope $(m_MetricsLock); + m_AllProcesses.clear(); + m_KeyOrder.clear(); + m_NextSampleIndex = 0; } - m_Processes.clear(); - m_KeyOrder.clear(); - m_NextSampleIndex = 0; } void EnqueueMetricsTimer() @@ -433,7 +463,7 @@ struct SubprocessManager::Impl void SampleBatch() { - RwLock::SharedLockScope $(m_Lock); + RwLock::SharedLockScope $(m_MetricsLock); if (m_KeyOrder.empty()) { @@ -450,9 +480,9 @@ struct SubprocessManager::Impl } int Pid = m_KeyOrder[m_NextSampleIndex]; - auto It = m_Processes.find(Pid); + auto It = m_AllProcesses.find(Pid); - if (It != m_Processes.end()) + if (It != m_AllProcesses.end()) { It->second->m_Impl->SampleMetrics(); } @@ -466,10 +496,10 @@ struct SubprocessManager::Impl { std::vector<TrackedProcessEntry> Result; - RwLock::SharedLockScope $(m_Lock); - Result.reserve(m_Processes.size()); + RwLock::SharedLockScope $(m_MetricsLock); + Result.reserve(m_AllProcesses.size()); - for (const auto& [Pid, Proc] : m_Processes) + for (const auto& [Pid, Proc] : m_AllProcesses) { TrackedProcessEntry Entry; Entry.Pid = Pid; @@ -485,9 +515,9 @@ struct SubprocessManager::Impl { AggregateProcessMetrics Agg; - RwLock::SharedLockScope $(m_Lock); + RwLock::SharedLockScope $(m_MetricsLock); - for (const auto& [Pid, Proc] : m_Processes) + for (const auto& [Pid, Proc] : m_AllProcesses) { const ProcessMetrics& M = Proc->m_Impl->m_LastMetrics; Agg.TotalWorkingSetSize += M.WorkingSetSize; @@ -502,18 +532,63 @@ struct SubprocessManager::Impl [[nodiscard]] size_t GetProcessCount() const { - RwLock::SharedLockScope $(m_Lock); - return m_Processes.size(); + RwLock::SharedLockScope $(m_MetricsLock); + return m_AllProcesses.size(); } void Enumerate(std::function<void(const ManagedProcess&)> Callback) const { - RwLock::SharedLockScope $(m_Lock); - for (const auto& [Pid, Proc] : m_Processes) + RwLock::SharedLockScope $(m_MetricsLock); + for (const auto& [Pid, Proc] : m_AllProcesses) { Callback(*Proc); } } + + ProcessGroup* CreateGroup(std::string Name) + { + auto GroupImpl = std::make_unique<ProcessGroup::Impl>(std::move(Name), *this, m_IoContext); + ProcessGroup* Ptr = nullptr; + + auto Group = std::unique_ptr<ProcessGroup>(new ProcessGroup(std::move(GroupImpl))); + Ptr = Group.get(); + + RwLock::ExclusiveLockScope $(m_GroupsLock); + m_Groups[std::string(Ptr->GetName())] = std::move(Group); + + return Ptr; + } + + void DestroyGroup(std::string_view Name) + { + RwLock::ExclusiveLockScope $(m_GroupsLock); + auto It = m_Groups.find(std::string(Name)); + if (It != m_Groups.end()) + { + It->second->KillAll(); + m_Groups.erase(It); + } + } + + ProcessGroup* FindGroup(std::string_view Name) const + { + RwLock::SharedLockScope $(m_GroupsLock); + auto It = m_Groups.find(std::string(Name)); + if (It != m_Groups.end()) + { + return It->second.get(); + } + return nullptr; + } + + void EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const + { + RwLock::SharedLockScope $(m_GroupsLock); + for (const auto& [Name, Group] : m_Groups) + { + Callback(*Group); + } + } }; // ============================================================================ @@ -630,6 +705,299 @@ SubprocessManager::Enumerate(std::function<void(const ManagedProcess&)> Callback m_Impl->Enumerate(std::move(Callback)); } +ProcessGroup* +SubprocessManager::CreateGroup(std::string Name) +{ + return m_Impl->CreateGroup(std::move(Name)); +} + +void +SubprocessManager::DestroyGroup(std::string_view Name) +{ + m_Impl->DestroyGroup(Name); +} + +ProcessGroup* +SubprocessManager::FindGroup(std::string_view Name) const +{ + return m_Impl->FindGroup(Name); +} + +void +SubprocessManager::EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const +{ + m_Impl->EnumerateGroups(std::move(Callback)); +} + +// ============================================================================ +// ProcessGroup::Impl +// ============================================================================ + +struct ProcessGroup::Impl +{ + std::string m_Name; + SubprocessManager::Impl& m_Manager; + asio::io_context& m_IoContext; + + mutable RwLock m_Lock; + std::unordered_map<int, std::unique_ptr<ManagedProcess>> m_Processes; + +#if ZEN_PLATFORM_WINDOWS + JobObject m_JobObject; +#else + int m_Pgid = 0; +#endif + + Impl(std::string Name, SubprocessManager::Impl& Manager, asio::io_context& IoContext) + : m_Name(std::move(Name)) + , m_Manager(Manager) + , m_IoContext(IoContext) + { +#if ZEN_PLATFORM_WINDOWS + m_JobObject.Initialize(); +#endif + } + + ~Impl() { KillAll(); } + + ManagedProcess* AddProcess(std::unique_ptr<ManagedProcess> Process) + { + int Pid = Process->Pid(); + ManagedProcess* Ptr = Process.get(); + + { + RwLock::ExclusiveLockScope $(m_Lock); + m_Processes[Pid] = std::move(Process); + } + + m_Manager.RegisterForMetrics(Pid, Ptr); + return Ptr; + } + + void Remove(int Pid) + { + m_Manager.UnregisterFromMetrics(Pid); + + RwLock::ExclusiveLockScope $(m_Lock); + auto It = m_Processes.find(Pid); + if (It != m_Processes.end()) + { + It->second->m_Impl->CancelAll(); + m_Processes.erase(It); + } + } + + void KillAll() + { +#if ZEN_PLATFORM_WINDOWS + if (m_JobObject.IsValid()) + { + TerminateJobObject(static_cast<HANDLE>(m_JobObject.Handle()), 1); + } +#else + if (m_Pgid > 0) + { + kill(-m_Pgid, SIGTERM); + } +#endif + // Also kill individually as fallback and clean up + RwLock::ExclusiveLockScope $(m_Lock); + for (auto& [Pid, Proc] : m_Processes) + { + if (Proc->IsRunning()) + { + Proc->Kill(); + } + m_Manager.UnregisterFromMetrics(Pid); + Proc->m_Impl->CancelAll(); + } + m_Processes.clear(); + } + + AggregateProcessMetrics GetAggregateMetrics() const + { + AggregateProcessMetrics Agg; + + RwLock::SharedLockScope $(m_Lock); + + for (const auto& [Pid, Proc] : m_Processes) + { + const ProcessMetrics& M = Proc->m_Impl->m_LastMetrics; + Agg.TotalWorkingSetSize += M.WorkingSetSize; + Agg.TotalPeakWorkingSetSize += M.PeakWorkingSetSize; + Agg.TotalUserTimeMs += M.UserTimeMs; + Agg.TotalKernelTimeMs += M.KernelTimeMs; + Agg.ProcessCount++; + } + + return Agg; + } + + std::vector<TrackedProcessEntry> GetMetricsSnapshot() const + { + std::vector<TrackedProcessEntry> Result; + + RwLock::SharedLockScope $(m_Lock); + Result.reserve(m_Processes.size()); + + for (const auto& [Pid, Proc] : m_Processes) + { + TrackedProcessEntry Entry; + Entry.Pid = Pid; + Entry.Metrics = Proc->m_Impl->m_LastMetrics; + Entry.CpuUsagePercent = Proc->m_Impl->m_CpuUsagePercent.load(); + Result.push_back(std::move(Entry)); + } + + return Result; + } + + [[nodiscard]] size_t GetProcessCount() const + { + RwLock::SharedLockScope $(m_Lock); + return m_Processes.size(); + } + + void Enumerate(std::function<void(const ManagedProcess&)> Callback) const + { + RwLock::SharedLockScope $(m_Lock); + for (const auto& [Pid, Proc] : m_Processes) + { + Callback(*Proc); + } + } +}; + +// ============================================================================ +// ProcessGroup +// ============================================================================ + +ProcessGroup::ProcessGroup(std::unique_ptr<Impl> InImpl) : m_Impl(std::move(InImpl)) +{ +} + +ProcessGroup::~ProcessGroup() = default; + +std::string_view +ProcessGroup::GetName() const +{ + return m_Impl->m_Name; +} + +ManagedProcess* +ProcessGroup::Spawn(const std::filesystem::path& Executable, + std::string_view CommandLine, + CreateProcOptions& Options, + ProcessExitCallback OnExit) +{ + bool HasStdout = Options.StdoutPipe != nullptr; + bool HasStderr = Options.StderrPipe != nullptr; + +#if ZEN_PLATFORM_WINDOWS + if (m_Impl->m_JobObject.IsValid()) + { + Options.AssignToJob = &m_Impl->m_JobObject; + } +#else + if (m_Impl->m_Pgid > 0) + { + Options.ProcessGroupId = m_Impl->m_Pgid; + } +#endif + + CreateProcResult Result = CreateProc(Executable, CommandLine, Options); + + auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_Impl->m_IoContext); +#if ZEN_PLATFORM_WINDOWS + ImplPtr->m_Handle.Initialize(Result); +#else + int Pid = static_cast<int>(Result); + ImplPtr->m_Handle.Initialize(Pid); + + // First process becomes the group leader + if (m_Impl->m_Pgid == 0) + { + m_Impl->m_Pgid = Pid; + } +#endif + + auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr))); + + ManagedProcess* Ptr = m_Impl->AddProcess(std::move(Proc)); + m_Impl->m_Manager.SetupExitWatcher(Ptr, std::move(OnExit)); + + if (HasStdout) + { + m_Impl->m_Manager.SetupStdoutReader(Ptr, std::move(*Options.StdoutPipe)); + } + if (HasStderr) + { + m_Impl->m_Manager.SetupStderrReader(Ptr, std::move(*Options.StderrPipe)); + } + + return Ptr; +} + +ManagedProcess* +ProcessGroup::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit) +{ + int Pid = Handle.Pid(); + + auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_Impl->m_IoContext); + ImplPtr->m_Handle.Initialize(Pid); + Handle.Reset(); + +#if ZEN_PLATFORM_WINDOWS + if (m_Impl->m_JobObject.IsValid()) + { + m_Impl->m_JobObject.AssignProcess(ImplPtr->m_Handle.Handle()); + } +#endif + + auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr))); + + ManagedProcess* Ptr = m_Impl->AddProcess(std::move(Proc)); + m_Impl->m_Manager.SetupExitWatcher(Ptr, std::move(OnExit)); + + return Ptr; +} + +void +ProcessGroup::Remove(int Pid) +{ + m_Impl->Remove(Pid); +} + +void +ProcessGroup::KillAll() +{ + m_Impl->KillAll(); +} + +AggregateProcessMetrics +ProcessGroup::GetAggregateMetrics() const +{ + return m_Impl->GetAggregateMetrics(); +} + +std::vector<TrackedProcessEntry> +ProcessGroup::GetMetricsSnapshot() const +{ + return m_Impl->GetMetricsSnapshot(); +} + +size_t +ProcessGroup::GetProcessCount() const +{ + return m_Impl->GetProcessCount(); +} + +void +ProcessGroup::Enumerate(std::function<void(const ManagedProcess&)> Callback) const +{ + m_Impl->Enumerate(std::move(Callback)); +} + } // namespace zen // ============================================================================ @@ -935,6 +1303,165 @@ TEST_CASE("SubprocessManager.UserTag") CHECK(ReceivedTag == "my-worker-1"); } +TEST_CASE("ProcessGroup.SpawnAndMembership") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + ProcessGroup* Group = Manager.CreateGroup("test-group"); + REQUIRE(Group != nullptr); + CHECK(Group->GetName() == "test-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + int ExitCount = 0; + + std::string CmdLine1 = AppStub.string() + " -f=0"; + std::string CmdLine2 = AppStub.string() + " -f=1"; + + Group->Spawn(AppStub, CmdLine1, Options, [&](ManagedProcess&, int) { ExitCount++; }); + Group->Spawn(AppStub, CmdLine2, Options, [&](ManagedProcess&, int) { ExitCount++; }); + + CHECK(Group->GetProcessCount() == 2); + CHECK(Manager.GetProcessCount() == 2); + + IoContext.run_for(5s); + + CHECK(ExitCount == 2); +} + +TEST_CASE("ProcessGroup.KillAll") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + ProcessGroup* Group = Manager.CreateGroup("kill-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + int ExitCount = 0; + + std::string CmdLine = AppStub.string() + " -t=60"; + + Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { ExitCount++; }); + Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { ExitCount++; }); + + // Let them start + IoContext.run_for(200ms); + CHECK(Group->GetProcessCount() == 2); + + Group->KillAll(); + CHECK(Group->GetProcessCount() == 0); +} + +TEST_CASE("ProcessGroup.AggregateMetrics") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 100, .MetricsBatchSize = 16}); + + ProcessGroup* Group = Manager.CreateGroup("metrics-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + std::string CmdLine = AppStub.string() + " -t=3"; + + Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); + Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); + + // Wait for metrics sampling + IoContext.run_for(1s); + + AggregateProcessMetrics GroupAgg = Group->GetAggregateMetrics(); + CHECK(GroupAgg.ProcessCount == 2); + CHECK(GroupAgg.TotalWorkingSetSize > 0); + + // Manager-level metrics should include group processes + AggregateProcessMetrics ManagerAgg = Manager.GetAggregateMetrics(); + CHECK(ManagerAgg.ProcessCount == 2); + + Group->KillAll(); +} + +TEST_CASE("ProcessGroup.DestroyGroup") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + ProcessGroup* Group = Manager.CreateGroup("destroy-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + std::string CmdLine = AppStub.string() + " -t=60"; + + Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); + Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); + + IoContext.run_for(200ms); + CHECK(Manager.GetProcessCount() == 2); + + Manager.DestroyGroup("destroy-group"); + + CHECK(Manager.FindGroup("destroy-group") == nullptr); + CHECK(Manager.GetProcessCount() == 0); +} + +TEST_CASE("ProcessGroup.MixedGroupedAndUngrouped") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + ProcessGroup* Group = Manager.CreateGroup("mixed-group"); + + std::filesystem::path AppStub = GetAppStubPath(); + + CreateProcOptions Options; + Options.Flags = CreateProcOptions::Flag_NoConsole; + + int GroupExitCount = 0; + int UngroupedExitCode = -1; + + std::string CmdLine = AppStub.string() + " -f=0"; + + // Grouped processes + Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { GroupExitCount++; }); + Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { GroupExitCount++; }); + + // Ungrouped process + Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int ExitCode) { UngroupedExitCode = ExitCode; }); + + CHECK(Group->GetProcessCount() == 2); + CHECK(Manager.GetProcessCount() == 3); + + IoContext.run_for(5s); + + CHECK(GroupExitCount == 2); + CHECK(UngroupedExitCode == 0); +} + +TEST_CASE("ProcessGroup.FindGroup") +{ + asio::io_context IoContext; + SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0}); + + CHECK(Manager.FindGroup("nonexistent") == nullptr); + + ProcessGroup* Group = Manager.CreateGroup("findable"); + CHECK(Manager.FindGroup("findable") == Group); + CHECK(Manager.FindGroup("findable")->GetName() == "findable"); +} + TEST_SUITE_END(); #endif |