aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/process/subprocessmanager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/process/subprocessmanager.cpp')
-rw-r--r--src/zenutil/process/subprocessmanager.cpp80
1 files changed, 55 insertions, 25 deletions
diff --git a/src/zenutil/process/subprocessmanager.cpp b/src/zenutil/process/subprocessmanager.cpp
index 597df3c15..acb518808 100644
--- a/src/zenutil/process/subprocessmanager.cpp
+++ b/src/zenutil/process/subprocessmanager.cpp
@@ -236,7 +236,7 @@ ManagedProcess::GetTag() const
// SubprocessManager::Impl
// ============================================================================
-struct SubprocessManager::Impl
+struct SubprocessManager::Impl : public std::enable_shared_from_this<Impl>
{
asio::io_context& m_IoContext;
SubprocessManagerConfig m_Config;
@@ -308,7 +308,10 @@ SubprocessManager::Impl::Impl(asio::io_context& IoContext, SubprocessManagerConf
if (m_Config.MetricsSampleIntervalMs > 0)
{
m_MetricsTimer = std::make_unique<asio::steady_timer>(IoContext);
- EnqueueMetricsTimer();
+ // Don't start the timer here: EnqueueMetricsTimer captures
+ // weak_from_this(), which requires the enclosing shared_ptr to
+ // already own this. The caller (SubprocessManager ctor) invokes
+ // EnqueueMetricsTimer() after the shared_ptr is established.
}
}
@@ -381,8 +384,17 @@ SubprocessManager::Impl::SetupExitWatcher(ManagedProcess* Proc, ProcessExitCallb
{
int Pid = Proc->Pid();
- Proc->m_Impl->m_ExitWatcher.Watch(Proc->m_Impl->m_Handle, [this, Pid, Callback = std::move(OnExit)](int ExitCode) {
- ManagedProcess* Found = FindProcess(Pid);
+ // Capture a weak_ptr so the handler safely no-ops if the manager is
+ // destroyed (or the process has been Remove()'d) before the exit
+ // completion is dispatched on the io_context.
+ Proc->m_Impl->m_ExitWatcher.Watch(Proc->m_Impl->m_Handle, [Self = weak_from_this(), Pid, Callback = std::move(OnExit)](int ExitCode) {
+ auto Locked = Self.lock();
+ if (!Locked)
+ {
+ return;
+ }
+
+ ManagedProcess* Found = Locked->FindProcess(Pid);
if (Found)
{
@@ -399,17 +411,22 @@ SubprocessManager::Impl::SetupStdoutReader(ManagedProcess* Proc, StdoutPipeHandl
Proc->m_Impl->m_StdoutReader = std::make_unique<AsyncPipeReader>(m_IoContext);
Proc->m_Impl->m_StdoutReader->Start(
std::move(Pipe),
- [this, Pid](std::string_view Data) {
- ManagedProcess* Found = FindProcess(Pid);
+ [Self = weak_from_this(), Pid](std::string_view Data) {
+ auto Locked = Self.lock();
+ if (!Locked)
+ {
+ return;
+ }
+ ManagedProcess* Found = Locked->FindProcess(Pid);
if (Found)
{
if (Found->m_Impl->m_StdoutCallback)
{
Found->m_Impl->m_StdoutCallback(*Found, Data);
}
- else if (m_DefaultStdoutCallback)
+ else if (Locked->m_DefaultStdoutCallback)
{
- m_DefaultStdoutCallback(*Found, Data);
+ Locked->m_DefaultStdoutCallback(*Found, Data);
}
else
{
@@ -427,17 +444,22 @@ SubprocessManager::Impl::SetupStderrReader(ManagedProcess* Proc, StdoutPipeHandl
Proc->m_Impl->m_StderrReader = std::make_unique<AsyncPipeReader>(m_IoContext);
Proc->m_Impl->m_StderrReader->Start(
std::move(Pipe),
- [this, Pid](std::string_view Data) {
- ManagedProcess* Found = FindProcess(Pid);
+ [Self = weak_from_this(), Pid](std::string_view Data) {
+ auto Locked = Self.lock();
+ if (!Locked)
+ {
+ return;
+ }
+ ManagedProcess* Found = Locked->FindProcess(Pid);
if (Found)
{
if (Found->m_Impl->m_StderrCallback)
{
Found->m_Impl->m_StderrCallback(*Found, Data);
}
- else if (m_DefaultStderrCallback)
+ else if (Locked->m_DefaultStderrCallback)
{
- m_DefaultStderrCallback(*Found, Data);
+ Locked->m_DefaultStderrCallback(*Found, Data);
}
else
{
@@ -557,14 +579,19 @@ SubprocessManager::Impl::EnqueueMetricsTimer()
}
m_MetricsTimer->expires_after(std::chrono::milliseconds(m_Config.MetricsSampleIntervalMs));
- m_MetricsTimer->async_wait([this](const asio::error_code& Ec) {
- if (Ec || !m_Running.load())
+ m_MetricsTimer->async_wait([Self = weak_from_this()](const asio::error_code& Ec) {
+ auto Locked = Self.lock();
+ if (!Locked)
+ {
+ return;
+ }
+ if (Ec || !Locked->m_Running.load())
{
return;
}
- SampleBatch();
- EnqueueMetricsTimer();
+ Locked->SampleBatch();
+ Locked->EnqueueMetricsTimer();
});
}
@@ -711,8 +738,11 @@ SubprocessManager::Impl::EnumerateGroups(std::function<void(const ProcessGroup&)
// ============================================================================
SubprocessManager::SubprocessManager(asio::io_context& IoContext, SubprocessManagerConfig Config)
-: m_Impl(std::make_unique<Impl>(IoContext, Config))
+: m_Impl(std::make_shared<Impl>(IoContext, Config))
{
+ // Start the metrics timer now that the shared_ptr owns the Impl - only
+ // then does weak_from_this() produce a valid weak_ptr for the handler.
+ m_Impl->EnqueueMetricsTimer();
}
SubprocessManager::~SubprocessManager() = default;
@@ -1447,7 +1477,7 @@ TEST_CASE("SubprocessManager.RemoveWhileRunning")
// Let it start
IoContext.run_for(100ms);
- // Remove without killing — callback should NOT fire after this
+ // Remove without killing - callback should NOT fire after this
Manager.Remove(Pid);
IoContext.run_for(500ms);
@@ -1756,7 +1786,7 @@ TEST_CASE("ProcessGroup.FindGroup")
TEST_CASE("SubprocessManager.StressTest" * doctest::skip())
{
- // Seed for reproducibility — change to explore different orderings
+ // Seed for reproducibility - change to explore different orderings
//
// Note that while this is a stress test, it is still single-threaded
@@ -1785,7 +1815,7 @@ TEST_CASE("SubprocessManager.StressTest" * doctest::skip())
// Phase 1: Spawn multiple groups with varied workloads
// ========================================================================
- ZEN_INFO("StressTest: Phase 1 — spawning initial groups");
+ ZEN_INFO("StressTest: Phase 1 - spawning initial groups");
constexpr int NumInitialGroups = 8;
std::vector<std::string> GroupNames;
@@ -1839,7 +1869,7 @@ TEST_CASE("SubprocessManager.StressTest" * doctest::skip())
// Phase 2: Randomly kill some groups, create replacements, add ungrouped
// ========================================================================
- ZEN_INFO("StressTest: Phase 2 — random group kills and replacements");
+ ZEN_INFO("StressTest: Phase 2 - random group kills and replacements");
constexpr int NumGroupsToKill = 3;
@@ -1904,7 +1934,7 @@ TEST_CASE("SubprocessManager.StressTest" * doctest::skip())
// Phase 3: Rapid spawn/exit churn
// ========================================================================
- ZEN_INFO("StressTest: Phase 3 — rapid spawn/exit churn");
+ ZEN_INFO("StressTest: Phase 3 - rapid spawn/exit churn");
std::atomic<int> ChurnExitCount{0};
int TotalChurnSpawned = 0;
@@ -1928,7 +1958,7 @@ TEST_CASE("SubprocessManager.StressTest" * doctest::skip())
// Brief pump to allow some exits to be processed
IoContext.run_for(200ms);
- // Destroy the group — any still-running processes get killed
+ // Destroy the group - any still-running processes get killed
Manager.DestroyGroup(Name);
}
@@ -1938,7 +1968,7 @@ TEST_CASE("SubprocessManager.StressTest" * doctest::skip())
// Phase 4: Drain and verify
// ========================================================================
- ZEN_INFO("StressTest: Phase 4 — draining remaining processes");
+ ZEN_INFO("StressTest: Phase 4 - draining remaining processes");
// Check metrics were collected before we wind down
AggregateProcessMetrics Agg = Manager.GetAggregateMetrics();
@@ -1969,7 +1999,7 @@ TEST_CASE("SubprocessManager.StressTest" * doctest::skip())
// (exact count is hard to predict due to killed groups, but should be > 0)
CHECK(TotalExitCallbacks.load() > 0);
- ZEN_INFO("StressTest: PASSED — seed={}", Seed);
+ ZEN_INFO("StressTest: PASSED - seed={}", Seed);
}
TEST_SUITE_END();