aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-23 23:27:24 +0100
committerStefan Boberg <[email protected]>2026-03-23 23:27:24 +0100
commit65da422ad9cc5339a17252e30a0e88d757e21586 (patch)
tree2af1ba57474ed2e48d0c8d131857153588b28551
parentupdated comment (diff)
downloadzen-sb/proc1000.tar.xz
zen-sb/proc1000.zip
Add ProcessGroup for OS-backed process groupingsb/proc1000
Introduces ProcessGroup as a first-class object for managing related processes together. On Windows, groups are backed by a JobObject with kill-on-close guarantees. On Linux/macOS, groups use setpgid() for bulk signal delivery via kill(-pgid). Groups provide per-group metrics aggregation, bulk KillAll, and independent Spawn/Adopt/Remove. The SubprocessManager metrics timer samples across both grouped and ungrouped processes via a unified index.
-rw-r--r--src/zencore/include/zencore/process.h12
-rw-r--r--src/zencore/process.cpp5
-rw-r--r--src/zenutil/include/zenutil/process/subprocessmanager.h79
-rw-r--r--src/zenutil/process/subprocessmanager.cpp641
4 files changed, 677 insertions, 60 deletions
diff --git a/src/zencore/include/zencore/process.h b/src/zencore/include/zencore/process.h
index d115bf11f..ea21a74de 100644
--- a/src/zencore/include/zencore/process.h
+++ b/src/zencore/include/zencore/process.h
@@ -171,6 +171,11 @@ struct CreateProcOptions
#if ZEN_PLATFORM_WINDOWS
JobObject* AssignToJob = nullptr; // When set, the process is created suspended, assigned to the job, then resumed
+#else
+ /// POSIX process group id. When > 0, the child is placed into this process
+ /// group via setpgid() before exec. Use the pid of the first child as the
+ /// pgid to create a group, then pass the same pgid for subsequent children.
+ int ProcessGroupId = 0;
#endif
};
@@ -224,9 +229,10 @@ public:
JobObject(const JobObject&) = delete;
JobObject& operator=(const JobObject&) = delete;
- void Initialize();
- bool AssignProcess(void* ProcessHandle);
- [[nodiscard]] bool IsValid() const;
+ void Initialize();
+ bool AssignProcess(void* ProcessHandle);
+ [[nodiscard]] bool IsValid() const;
+ [[nodiscard]] void* Handle() const { return m_JobHandle; }
private:
void* m_JobHandle = nullptr;
diff --git a/src/zencore/process.cpp b/src/zencore/process.cpp
index dcb8b2422..03eef2bc7 100644
--- a/src/zencore/process.cpp
+++ b/src/zencore/process.cpp
@@ -1070,6 +1070,11 @@ CreateProc(const std::filesystem::path& Executable, std::string_view CommandLine
}
}
+ if (Options.ProcessGroupId > 0)
+ {
+ setpgid(0, Options.ProcessGroupId);
+ }
+
for (const auto& [Key, Value] : Options.Environment)
{
setenv(Key.c_str(), Value.c_str(), 1);
diff --git a/src/zenutil/include/zenutil/process/subprocessmanager.h b/src/zenutil/include/zenutil/process/subprocessmanager.h
index 91b887449..f241a4c3e 100644
--- a/src/zenutil/include/zenutil/process/subprocessmanager.h
+++ b/src/zenutil/include/zenutil/process/subprocessmanager.h
@@ -21,6 +21,7 @@ class io_context;
namespace zen {
class ManagedProcess;
+class ProcessGroup;
/// Callback invoked when a managed process exits.
using ProcessExitCallback = std::function<void(ManagedProcess& Process, int ExitCode)>;
@@ -107,7 +108,23 @@ public:
/// Enumerate all managed processes under a shared lock.
void Enumerate(std::function<void(const ManagedProcess&)> Callback) const;
+ /// Create a new process group. The group is owned by this manager.
+ /// On Windows the group is backed by a JobObject (kill-on-close guarantee).
+ /// On POSIX the group uses setpgid for bulk signal delivery.
+ ProcessGroup* CreateGroup(std::string Name);
+
+ /// Destroy a group by name. Kills all processes in the group first.
+ void DestroyGroup(std::string_view Name);
+
+ /// Find a group by name. Returns nullptr if not found.
+ [[nodiscard]] ProcessGroup* FindGroup(std::string_view Name) const;
+
+ /// Enumerate all groups.
+ void EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const;
+
private:
+ friend class ProcessGroup;
+
struct Impl;
std::unique_ptr<Impl> m_Impl;
};
@@ -167,6 +184,7 @@ public:
private:
friend class SubprocessManager;
+ friend class ProcessGroup;
struct Impl;
std::unique_ptr<Impl> m_Impl;
@@ -174,6 +192,67 @@ private:
explicit ManagedProcess(std::unique_ptr<Impl> InImpl);
};
+/// A group of managed processes with OS-level backing.
+///
+/// On Windows: backed by a JobObject. All processes assigned on spawn.
+/// Kill-on-close guarantee — if the group is destroyed, the OS terminates
+/// all member processes.
+/// On Linux/macOS: uses setpgid() so children share a process group.
+/// Enables bulk signal delivery via kill(-pgid, sig).
+///
+/// Created via SubprocessManager::CreateGroup(). Not user-constructible.
+class ProcessGroup
+{
+public:
+ ~ProcessGroup();
+
+ ProcessGroup(const ProcessGroup&) = delete;
+ ProcessGroup& operator=(const ProcessGroup&) = delete;
+
+ /// Group name (as passed to CreateGroup).
+ [[nodiscard]] std::string_view GetName() const;
+
+ /// Spawn a process into this group.
+ ManagedProcess* Spawn(const std::filesystem::path& Executable,
+ std::string_view CommandLine,
+ CreateProcOptions& Options,
+ ProcessExitCallback OnExit);
+
+ /// Adopt an already-running process into this group.
+ /// On Windows the process is assigned to the group's JobObject.
+ /// On POSIX the process cannot be moved into a different process group
+ /// after creation, so OS-level grouping is best-effort for adopted processes.
+ ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit);
+
+ /// Remove a process from this group. Does NOT kill it.
+ void Remove(int Pid);
+
+ /// Kill all processes in the group.
+ /// On Windows: uses TerminateJobObject for atomic group kill.
+ /// On POSIX: sends SIGTERM then SIGKILL to the process group.
+ void KillAll();
+
+ /// Aggregate metrics for this group's processes.
+ [[nodiscard]] AggregateProcessMetrics GetAggregateMetrics() const;
+
+ /// Per-process metrics snapshot for this group.
+ [[nodiscard]] std::vector<TrackedProcessEntry> GetMetricsSnapshot() const;
+
+ /// Number of processes in this group.
+ [[nodiscard]] size_t GetProcessCount() const;
+
+ /// Enumerate processes in this group.
+ void Enumerate(std::function<void(const ManagedProcess&)> Callback) const;
+
+private:
+ friend class SubprocessManager;
+
+ struct Impl;
+ std::unique_ptr<Impl> m_Impl;
+
+ explicit ProcessGroup(std::unique_ptr<Impl> InImpl);
+};
+
void subprocessmanager_forcelink(); // internal
} // namespace zen
diff --git a/src/zenutil/process/subprocessmanager.cpp b/src/zenutil/process/subprocessmanager.cpp
index b49f0360b..155327dd3 100644
--- a/src/zenutil/process/subprocessmanager.cpp
+++ b/src/zenutil/process/subprocessmanager.cpp
@@ -15,6 +15,11 @@
#include <vector>
ZEN_THIRD_PARTY_INCLUDES_START
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+#else
+# include <csignal>
+#endif
#include <asio/io_context.hpp>
#include <asio/steady_timer.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
@@ -244,10 +249,19 @@ struct SubprocessManager::Impl
asio::io_context& m_IoContext;
SubprocessManagerConfig m_Config;
+ // Ungrouped processes
mutable RwLock m_Lock;
std::unordered_map<int, std::unique_ptr<ManagedProcess>> m_Processes;
- std::vector<int> m_KeyOrder;
- size_t m_NextSampleIndex = 0;
+
+ // Groups
+ mutable RwLock m_GroupsLock;
+ std::unordered_map<std::string, std::unique_ptr<ProcessGroup>> m_Groups;
+
+ // Cross-group metrics index: all pids (grouped + ungrouped) for round-robin sampling
+ mutable RwLock m_MetricsLock;
+ std::unordered_map<int, ManagedProcess*> m_AllProcesses; // non-owning
+ std::vector<int> m_KeyOrder;
+ size_t m_NextSampleIndex = 0;
ProcessDataCallback m_DefaultStdoutCallback;
ProcessDataCallback m_DefaultStderrCallback;
@@ -271,6 +285,13 @@ struct SubprocessManager::Impl
{
m_MetricsTimer->cancel();
}
+
+ // Destroy groups first (they reference m_Manager back to us)
+ {
+ RwLock::ExclusiveLockScope $(m_GroupsLock);
+ m_Groups.clear();
+ }
+
RemoveAll();
}
@@ -279,27 +300,50 @@ struct SubprocessManager::Impl
int Pid = Process->Pid();
ManagedProcess* Ptr = Process.get();
- RwLock::ExclusiveLockScope $(m_Lock);
- m_Processes[Pid] = std::move(Process);
- m_KeyOrder.push_back(Pid);
+ {
+ RwLock::ExclusiveLockScope $(m_Lock);
+ m_Processes[Pid] = std::move(Process);
+ }
+ RegisterForMetrics(Pid, Ptr);
return Ptr;
}
+ void RegisterForMetrics(int Pid, ManagedProcess* Ptr)
+ {
+ RwLock::ExclusiveLockScope $(m_MetricsLock);
+ m_AllProcesses[Pid] = Ptr;
+ m_KeyOrder.push_back(Pid);
+ }
+
+ void UnregisterFromMetrics(int Pid)
+ {
+ RwLock::ExclusiveLockScope $(m_MetricsLock);
+ m_AllProcesses.erase(Pid);
+ m_KeyOrder.erase(std::remove(m_KeyOrder.begin(), m_KeyOrder.end(), Pid), m_KeyOrder.end());
+ if (m_NextSampleIndex >= m_KeyOrder.size())
+ {
+ m_NextSampleIndex = 0;
+ }
+ }
+
+ ManagedProcess* FindProcess(int Pid) const
+ {
+ RwLock::SharedLockScope $(m_MetricsLock);
+ auto It = m_AllProcesses.find(Pid);
+ if (It != m_AllProcesses.end())
+ {
+ return It->second;
+ }
+ return nullptr;
+ }
+
void SetupExitWatcher(ManagedProcess* Proc, ProcessExitCallback OnExit)
{
int Pid = Proc->Pid();
Proc->m_Impl->m_ExitWatcher.Watch(Proc->m_Impl->m_Handle, [this, Pid, Callback = std::move(OnExit)](int ExitCode) {
- ManagedProcess* Found = nullptr;
- {
- RwLock::SharedLockScope $(m_Lock);
- auto It = m_Processes.find(Pid);
- if (It != m_Processes.end())
- {
- Found = It->second.get();
- }
- }
+ ManagedProcess* Found = FindProcess(Pid);
if (Found)
{
@@ -316,16 +360,7 @@ struct SubprocessManager::Impl
Proc->m_Impl->m_StdoutReader->Start(
std::move(Pipe),
[this, Pid](std::string_view Data) {
- ManagedProcess* Found = nullptr;
- {
- RwLock::SharedLockScope $(m_Lock);
- auto It = m_Processes.find(Pid);
- if (It != m_Processes.end())
- {
- Found = It->second.get();
- }
- }
-
+ ManagedProcess* Found = FindProcess(Pid);
if (Found)
{
if (Found->m_Impl->m_StdoutCallback)
@@ -352,16 +387,7 @@ struct SubprocessManager::Impl
Proc->m_Impl->m_StderrReader->Start(
std::move(Pipe),
[this, Pid](std::string_view Data) {
- ManagedProcess* Found = nullptr;
- {
- RwLock::SharedLockScope $(m_Lock);
- auto It = m_Processes.find(Pid);
- if (It != m_Processes.end())
- {
- Found = It->second.get();
- }
- }
-
+ ManagedProcess* Found = FindProcess(Pid);
if (Found)
{
if (Found->m_Impl->m_StderrCallback)
@@ -383,6 +409,8 @@ struct SubprocessManager::Impl
void Remove(int Pid)
{
+ UnregisterFromMetrics(Pid);
+
RwLock::ExclusiveLockScope $(m_Lock);
auto It = m_Processes.find(Pid);
if (It != m_Processes.end())
@@ -390,11 +418,6 @@ struct SubprocessManager::Impl
It->second->m_Impl->CancelAll();
m_Processes.erase(It);
}
- m_KeyOrder.erase(std::remove(m_KeyOrder.begin(), m_KeyOrder.end(), Pid), m_KeyOrder.end());
- if (m_NextSampleIndex >= m_KeyOrder.size())
- {
- m_NextSampleIndex = 0;
- }
}
void SetDefaultStdoutCallback(ProcessDataCallback Callback) { m_DefaultStdoutCallback = std::move(Callback); }
@@ -402,14 +425,21 @@ struct SubprocessManager::Impl
void RemoveAll()
{
- RwLock::ExclusiveLockScope $(m_Lock);
- for (auto& [Pid, Proc] : m_Processes)
{
- Proc->m_Impl->CancelAll();
+ RwLock::ExclusiveLockScope $(m_Lock);
+ for (auto& [Pid, Proc] : m_Processes)
+ {
+ Proc->m_Impl->CancelAll();
+ }
+ m_Processes.clear();
+ }
+
+ {
+ RwLock::ExclusiveLockScope $(m_MetricsLock);
+ m_AllProcesses.clear();
+ m_KeyOrder.clear();
+ m_NextSampleIndex = 0;
}
- m_Processes.clear();
- m_KeyOrder.clear();
- m_NextSampleIndex = 0;
}
void EnqueueMetricsTimer()
@@ -433,7 +463,7 @@ struct SubprocessManager::Impl
void SampleBatch()
{
- RwLock::SharedLockScope $(m_Lock);
+ RwLock::SharedLockScope $(m_MetricsLock);
if (m_KeyOrder.empty())
{
@@ -450,9 +480,9 @@ struct SubprocessManager::Impl
}
int Pid = m_KeyOrder[m_NextSampleIndex];
- auto It = m_Processes.find(Pid);
+ auto It = m_AllProcesses.find(Pid);
- if (It != m_Processes.end())
+ if (It != m_AllProcesses.end())
{
It->second->m_Impl->SampleMetrics();
}
@@ -466,10 +496,10 @@ struct SubprocessManager::Impl
{
std::vector<TrackedProcessEntry> Result;
- RwLock::SharedLockScope $(m_Lock);
- Result.reserve(m_Processes.size());
+ RwLock::SharedLockScope $(m_MetricsLock);
+ Result.reserve(m_AllProcesses.size());
- for (const auto& [Pid, Proc] : m_Processes)
+ for (const auto& [Pid, Proc] : m_AllProcesses)
{
TrackedProcessEntry Entry;
Entry.Pid = Pid;
@@ -485,9 +515,9 @@ struct SubprocessManager::Impl
{
AggregateProcessMetrics Agg;
- RwLock::SharedLockScope $(m_Lock);
+ RwLock::SharedLockScope $(m_MetricsLock);
- for (const auto& [Pid, Proc] : m_Processes)
+ for (const auto& [Pid, Proc] : m_AllProcesses)
{
const ProcessMetrics& M = Proc->m_Impl->m_LastMetrics;
Agg.TotalWorkingSetSize += M.WorkingSetSize;
@@ -502,18 +532,63 @@ struct SubprocessManager::Impl
[[nodiscard]] size_t GetProcessCount() const
{
- RwLock::SharedLockScope $(m_Lock);
- return m_Processes.size();
+ RwLock::SharedLockScope $(m_MetricsLock);
+ return m_AllProcesses.size();
}
void Enumerate(std::function<void(const ManagedProcess&)> Callback) const
{
- RwLock::SharedLockScope $(m_Lock);
- for (const auto& [Pid, Proc] : m_Processes)
+ RwLock::SharedLockScope $(m_MetricsLock);
+ for (const auto& [Pid, Proc] : m_AllProcesses)
{
Callback(*Proc);
}
}
+
+ ProcessGroup* CreateGroup(std::string Name)
+ {
+ auto GroupImpl = std::make_unique<ProcessGroup::Impl>(std::move(Name), *this, m_IoContext);
+ ProcessGroup* Ptr = nullptr;
+
+ auto Group = std::unique_ptr<ProcessGroup>(new ProcessGroup(std::move(GroupImpl)));
+ Ptr = Group.get();
+
+ RwLock::ExclusiveLockScope $(m_GroupsLock);
+ m_Groups[std::string(Ptr->GetName())] = std::move(Group);
+
+ return Ptr;
+ }
+
+ void DestroyGroup(std::string_view Name)
+ {
+ RwLock::ExclusiveLockScope $(m_GroupsLock);
+ auto It = m_Groups.find(std::string(Name));
+ if (It != m_Groups.end())
+ {
+ It->second->KillAll();
+ m_Groups.erase(It);
+ }
+ }
+
+ ProcessGroup* FindGroup(std::string_view Name) const
+ {
+ RwLock::SharedLockScope $(m_GroupsLock);
+ auto It = m_Groups.find(std::string(Name));
+ if (It != m_Groups.end())
+ {
+ return It->second.get();
+ }
+ return nullptr;
+ }
+
+ void EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const
+ {
+ RwLock::SharedLockScope $(m_GroupsLock);
+ for (const auto& [Name, Group] : m_Groups)
+ {
+ Callback(*Group);
+ }
+ }
};
// ============================================================================
@@ -630,6 +705,299 @@ SubprocessManager::Enumerate(std::function<void(const ManagedProcess&)> Callback
m_Impl->Enumerate(std::move(Callback));
}
+ProcessGroup*
+SubprocessManager::CreateGroup(std::string Name)
+{
+ return m_Impl->CreateGroup(std::move(Name));
+}
+
+void
+SubprocessManager::DestroyGroup(std::string_view Name)
+{
+ m_Impl->DestroyGroup(Name);
+}
+
+ProcessGroup*
+SubprocessManager::FindGroup(std::string_view Name) const
+{
+ return m_Impl->FindGroup(Name);
+}
+
+void
+SubprocessManager::EnumerateGroups(std::function<void(const ProcessGroup&)> Callback) const
+{
+ m_Impl->EnumerateGroups(std::move(Callback));
+}
+
+// ============================================================================
+// ProcessGroup::Impl
+// ============================================================================
+
+struct ProcessGroup::Impl
+{
+ std::string m_Name;
+ SubprocessManager::Impl& m_Manager;
+ asio::io_context& m_IoContext;
+
+ mutable RwLock m_Lock;
+ std::unordered_map<int, std::unique_ptr<ManagedProcess>> m_Processes;
+
+#if ZEN_PLATFORM_WINDOWS
+ JobObject m_JobObject;
+#else
+ int m_Pgid = 0;
+#endif
+
+ Impl(std::string Name, SubprocessManager::Impl& Manager, asio::io_context& IoContext)
+ : m_Name(std::move(Name))
+ , m_Manager(Manager)
+ , m_IoContext(IoContext)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ m_JobObject.Initialize();
+#endif
+ }
+
+ ~Impl() { KillAll(); }
+
+ ManagedProcess* AddProcess(std::unique_ptr<ManagedProcess> Process)
+ {
+ int Pid = Process->Pid();
+ ManagedProcess* Ptr = Process.get();
+
+ {
+ RwLock::ExclusiveLockScope $(m_Lock);
+ m_Processes[Pid] = std::move(Process);
+ }
+
+ m_Manager.RegisterForMetrics(Pid, Ptr);
+ return Ptr;
+ }
+
+ void Remove(int Pid)
+ {
+ m_Manager.UnregisterFromMetrics(Pid);
+
+ RwLock::ExclusiveLockScope $(m_Lock);
+ auto It = m_Processes.find(Pid);
+ if (It != m_Processes.end())
+ {
+ It->second->m_Impl->CancelAll();
+ m_Processes.erase(It);
+ }
+ }
+
+ void KillAll()
+ {
+#if ZEN_PLATFORM_WINDOWS
+ if (m_JobObject.IsValid())
+ {
+ TerminateJobObject(static_cast<HANDLE>(m_JobObject.Handle()), 1);
+ }
+#else
+ if (m_Pgid > 0)
+ {
+ kill(-m_Pgid, SIGTERM);
+ }
+#endif
+ // Also kill individually as fallback and clean up
+ RwLock::ExclusiveLockScope $(m_Lock);
+ for (auto& [Pid, Proc] : m_Processes)
+ {
+ if (Proc->IsRunning())
+ {
+ Proc->Kill();
+ }
+ m_Manager.UnregisterFromMetrics(Pid);
+ Proc->m_Impl->CancelAll();
+ }
+ m_Processes.clear();
+ }
+
+ AggregateProcessMetrics GetAggregateMetrics() const
+ {
+ AggregateProcessMetrics Agg;
+
+ RwLock::SharedLockScope $(m_Lock);
+
+ for (const auto& [Pid, Proc] : m_Processes)
+ {
+ const ProcessMetrics& M = Proc->m_Impl->m_LastMetrics;
+ Agg.TotalWorkingSetSize += M.WorkingSetSize;
+ Agg.TotalPeakWorkingSetSize += M.PeakWorkingSetSize;
+ Agg.TotalUserTimeMs += M.UserTimeMs;
+ Agg.TotalKernelTimeMs += M.KernelTimeMs;
+ Agg.ProcessCount++;
+ }
+
+ return Agg;
+ }
+
+ std::vector<TrackedProcessEntry> GetMetricsSnapshot() const
+ {
+ std::vector<TrackedProcessEntry> Result;
+
+ RwLock::SharedLockScope $(m_Lock);
+ Result.reserve(m_Processes.size());
+
+ for (const auto& [Pid, Proc] : m_Processes)
+ {
+ TrackedProcessEntry Entry;
+ Entry.Pid = Pid;
+ Entry.Metrics = Proc->m_Impl->m_LastMetrics;
+ Entry.CpuUsagePercent = Proc->m_Impl->m_CpuUsagePercent.load();
+ Result.push_back(std::move(Entry));
+ }
+
+ return Result;
+ }
+
+ [[nodiscard]] size_t GetProcessCount() const
+ {
+ RwLock::SharedLockScope $(m_Lock);
+ return m_Processes.size();
+ }
+
+ void Enumerate(std::function<void(const ManagedProcess&)> Callback) const
+ {
+ RwLock::SharedLockScope $(m_Lock);
+ for (const auto& [Pid, Proc] : m_Processes)
+ {
+ Callback(*Proc);
+ }
+ }
+};
+
+// ============================================================================
+// ProcessGroup
+// ============================================================================
+
+ProcessGroup::ProcessGroup(std::unique_ptr<Impl> InImpl) : m_Impl(std::move(InImpl))
+{
+}
+
+ProcessGroup::~ProcessGroup() = default;
+
+std::string_view
+ProcessGroup::GetName() const
+{
+ return m_Impl->m_Name;
+}
+
+ManagedProcess*
+ProcessGroup::Spawn(const std::filesystem::path& Executable,
+ std::string_view CommandLine,
+ CreateProcOptions& Options,
+ ProcessExitCallback OnExit)
+{
+ bool HasStdout = Options.StdoutPipe != nullptr;
+ bool HasStderr = Options.StderrPipe != nullptr;
+
+#if ZEN_PLATFORM_WINDOWS
+ if (m_Impl->m_JobObject.IsValid())
+ {
+ Options.AssignToJob = &m_Impl->m_JobObject;
+ }
+#else
+ if (m_Impl->m_Pgid > 0)
+ {
+ Options.ProcessGroupId = m_Impl->m_Pgid;
+ }
+#endif
+
+ CreateProcResult Result = CreateProc(Executable, CommandLine, Options);
+
+ auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_Impl->m_IoContext);
+#if ZEN_PLATFORM_WINDOWS
+ ImplPtr->m_Handle.Initialize(Result);
+#else
+ int Pid = static_cast<int>(Result);
+ ImplPtr->m_Handle.Initialize(Pid);
+
+ // First process becomes the group leader
+ if (m_Impl->m_Pgid == 0)
+ {
+ m_Impl->m_Pgid = Pid;
+ }
+#endif
+
+ auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr)));
+
+ ManagedProcess* Ptr = m_Impl->AddProcess(std::move(Proc));
+ m_Impl->m_Manager.SetupExitWatcher(Ptr, std::move(OnExit));
+
+ if (HasStdout)
+ {
+ m_Impl->m_Manager.SetupStdoutReader(Ptr, std::move(*Options.StdoutPipe));
+ }
+ if (HasStderr)
+ {
+ m_Impl->m_Manager.SetupStderrReader(Ptr, std::move(*Options.StderrPipe));
+ }
+
+ return Ptr;
+}
+
+ManagedProcess*
+ProcessGroup::Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit)
+{
+ int Pid = Handle.Pid();
+
+ auto ImplPtr = std::make_unique<ManagedProcess::Impl>(m_Impl->m_IoContext);
+ ImplPtr->m_Handle.Initialize(Pid);
+ Handle.Reset();
+
+#if ZEN_PLATFORM_WINDOWS
+ if (m_Impl->m_JobObject.IsValid())
+ {
+ m_Impl->m_JobObject.AssignProcess(ImplPtr->m_Handle.Handle());
+ }
+#endif
+
+ auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr)));
+
+ ManagedProcess* Ptr = m_Impl->AddProcess(std::move(Proc));
+ m_Impl->m_Manager.SetupExitWatcher(Ptr, std::move(OnExit));
+
+ return Ptr;
+}
+
+void
+ProcessGroup::Remove(int Pid)
+{
+ m_Impl->Remove(Pid);
+}
+
+void
+ProcessGroup::KillAll()
+{
+ m_Impl->KillAll();
+}
+
+AggregateProcessMetrics
+ProcessGroup::GetAggregateMetrics() const
+{
+ return m_Impl->GetAggregateMetrics();
+}
+
+std::vector<TrackedProcessEntry>
+ProcessGroup::GetMetricsSnapshot() const
+{
+ return m_Impl->GetMetricsSnapshot();
+}
+
+size_t
+ProcessGroup::GetProcessCount() const
+{
+ return m_Impl->GetProcessCount();
+}
+
+void
+ProcessGroup::Enumerate(std::function<void(const ManagedProcess&)> Callback) const
+{
+ m_Impl->Enumerate(std::move(Callback));
+}
+
} // namespace zen
// ============================================================================
@@ -935,6 +1303,165 @@ TEST_CASE("SubprocessManager.UserTag")
CHECK(ReceivedTag == "my-worker-1");
}
+TEST_CASE("ProcessGroup.SpawnAndMembership")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ ProcessGroup* Group = Manager.CreateGroup("test-group");
+ REQUIRE(Group != nullptr);
+ CHECK(Group->GetName() == "test-group");
+
+ std::filesystem::path AppStub = GetAppStubPath();
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ int ExitCount = 0;
+
+ std::string CmdLine1 = AppStub.string() + " -f=0";
+ std::string CmdLine2 = AppStub.string() + " -f=1";
+
+ Group->Spawn(AppStub, CmdLine1, Options, [&](ManagedProcess&, int) { ExitCount++; });
+ Group->Spawn(AppStub, CmdLine2, Options, [&](ManagedProcess&, int) { ExitCount++; });
+
+ CHECK(Group->GetProcessCount() == 2);
+ CHECK(Manager.GetProcessCount() == 2);
+
+ IoContext.run_for(5s);
+
+ CHECK(ExitCount == 2);
+}
+
+TEST_CASE("ProcessGroup.KillAll")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ ProcessGroup* Group = Manager.CreateGroup("kill-group");
+
+ std::filesystem::path AppStub = GetAppStubPath();
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ int ExitCount = 0;
+
+ std::string CmdLine = AppStub.string() + " -t=60";
+
+ Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { ExitCount++; });
+ Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { ExitCount++; });
+
+ // Let them start
+ IoContext.run_for(200ms);
+ CHECK(Group->GetProcessCount() == 2);
+
+ Group->KillAll();
+ CHECK(Group->GetProcessCount() == 0);
+}
+
+TEST_CASE("ProcessGroup.AggregateMetrics")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 100, .MetricsBatchSize = 16});
+
+ ProcessGroup* Group = Manager.CreateGroup("metrics-group");
+
+ std::filesystem::path AppStub = GetAppStubPath();
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ std::string CmdLine = AppStub.string() + " -t=3";
+
+ Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {});
+ Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {});
+
+ // Wait for metrics sampling
+ IoContext.run_for(1s);
+
+ AggregateProcessMetrics GroupAgg = Group->GetAggregateMetrics();
+ CHECK(GroupAgg.ProcessCount == 2);
+ CHECK(GroupAgg.TotalWorkingSetSize > 0);
+
+ // Manager-level metrics should include group processes
+ AggregateProcessMetrics ManagerAgg = Manager.GetAggregateMetrics();
+ CHECK(ManagerAgg.ProcessCount == 2);
+
+ Group->KillAll();
+}
+
+TEST_CASE("ProcessGroup.DestroyGroup")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ ProcessGroup* Group = Manager.CreateGroup("destroy-group");
+
+ std::filesystem::path AppStub = GetAppStubPath();
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ std::string CmdLine = AppStub.string() + " -t=60";
+
+ Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {});
+ Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {});
+
+ IoContext.run_for(200ms);
+ CHECK(Manager.GetProcessCount() == 2);
+
+ Manager.DestroyGroup("destroy-group");
+
+ CHECK(Manager.FindGroup("destroy-group") == nullptr);
+ CHECK(Manager.GetProcessCount() == 0);
+}
+
+TEST_CASE("ProcessGroup.MixedGroupedAndUngrouped")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ ProcessGroup* Group = Manager.CreateGroup("mixed-group");
+
+ std::filesystem::path AppStub = GetAppStubPath();
+
+ CreateProcOptions Options;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+
+ int GroupExitCount = 0;
+ int UngroupedExitCode = -1;
+
+ std::string CmdLine = AppStub.string() + " -f=0";
+
+ // Grouped processes
+ Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { GroupExitCount++; });
+ Group->Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { GroupExitCount++; });
+
+ // Ungrouped process
+ Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int ExitCode) { UngroupedExitCode = ExitCode; });
+
+ CHECK(Group->GetProcessCount() == 2);
+ CHECK(Manager.GetProcessCount() == 3);
+
+ IoContext.run_for(5s);
+
+ CHECK(GroupExitCount == 2);
+ CHECK(UngroupedExitCode == 0);
+}
+
+TEST_CASE("ProcessGroup.FindGroup")
+{
+ asio::io_context IoContext;
+ SubprocessManager Manager(IoContext, {.MetricsSampleIntervalMs = 0});
+
+ CHECK(Manager.FindGroup("nonexistent") == nullptr);
+
+ ProcessGroup* Group = Manager.CreateGroup("findable");
+ CHECK(Manager.FindGroup("findable") == Group);
+ CHECK(Manager.FindGroup("findable")->GetName() == "findable");
+}
+
TEST_SUITE_END();
#endif