diff options
Diffstat (limited to 'src/zenutil/process/subprocessmanager.cpp')
| -rw-r--r-- | src/zenutil/process/subprocessmanager.cpp | 80 |
1 files changed, 55 insertions, 25 deletions
diff --git a/src/zenutil/process/subprocessmanager.cpp b/src/zenutil/process/subprocessmanager.cpp index 597df3c15..acb518808 100644 --- a/src/zenutil/process/subprocessmanager.cpp +++ b/src/zenutil/process/subprocessmanager.cpp @@ -236,7 +236,7 @@ ManagedProcess::GetTag() const // SubprocessManager::Impl // ============================================================================ -struct SubprocessManager::Impl +struct SubprocessManager::Impl : public std::enable_shared_from_this<Impl> { asio::io_context& m_IoContext; SubprocessManagerConfig m_Config; @@ -308,7 +308,10 @@ SubprocessManager::Impl::Impl(asio::io_context& IoContext, SubprocessManagerConf if (m_Config.MetricsSampleIntervalMs > 0) { m_MetricsTimer = std::make_unique<asio::steady_timer>(IoContext); - EnqueueMetricsTimer(); + // Don't start the timer here: EnqueueMetricsTimer captures + // weak_from_this(), which requires the enclosing shared_ptr to + // already own this. The caller (SubprocessManager ctor) invokes + // EnqueueMetricsTimer() after the shared_ptr is established. } } @@ -381,8 +384,17 @@ SubprocessManager::Impl::SetupExitWatcher(ManagedProcess* Proc, ProcessExitCallb { int Pid = Proc->Pid(); - Proc->m_Impl->m_ExitWatcher.Watch(Proc->m_Impl->m_Handle, [this, Pid, Callback = std::move(OnExit)](int ExitCode) { - ManagedProcess* Found = FindProcess(Pid); + // Capture a weak_ptr so the handler safely no-ops if the manager is + // destroyed (or the process has been Remove()'d) before the exit + // completion is dispatched on the io_context. + Proc->m_Impl->m_ExitWatcher.Watch(Proc->m_Impl->m_Handle, [Self = weak_from_this(), Pid, Callback = std::move(OnExit)](int ExitCode) { + auto Locked = Self.lock(); + if (!Locked) + { + return; + } + + ManagedProcess* Found = Locked->FindProcess(Pid); if (Found) { @@ -399,17 +411,22 @@ SubprocessManager::Impl::SetupStdoutReader(ManagedProcess* Proc, StdoutPipeHandl Proc->m_Impl->m_StdoutReader = std::make_unique<AsyncPipeReader>(m_IoContext); Proc->m_Impl->m_StdoutReader->Start( std::move(Pipe), - [this, Pid](std::string_view Data) { - ManagedProcess* Found = FindProcess(Pid); + [Self = weak_from_this(), Pid](std::string_view Data) { + auto Locked = Self.lock(); + if (!Locked) + { + return; + } + ManagedProcess* Found = Locked->FindProcess(Pid); if (Found) { if (Found->m_Impl->m_StdoutCallback) { Found->m_Impl->m_StdoutCallback(*Found, Data); } - else if (m_DefaultStdoutCallback) + else if (Locked->m_DefaultStdoutCallback) { - m_DefaultStdoutCallback(*Found, Data); + Locked->m_DefaultStdoutCallback(*Found, Data); } else { @@ -427,17 +444,22 @@ SubprocessManager::Impl::SetupStderrReader(ManagedProcess* Proc, StdoutPipeHandl Proc->m_Impl->m_StderrReader = std::make_unique<AsyncPipeReader>(m_IoContext); Proc->m_Impl->m_StderrReader->Start( std::move(Pipe), - [this, Pid](std::string_view Data) { - ManagedProcess* Found = FindProcess(Pid); + [Self = weak_from_this(), Pid](std::string_view Data) { + auto Locked = Self.lock(); + if (!Locked) + { + return; + } + ManagedProcess* Found = Locked->FindProcess(Pid); if (Found) { if (Found->m_Impl->m_StderrCallback) { Found->m_Impl->m_StderrCallback(*Found, Data); } - else if (m_DefaultStderrCallback) + else if (Locked->m_DefaultStderrCallback) { - m_DefaultStderrCallback(*Found, Data); + Locked->m_DefaultStderrCallback(*Found, Data); } else { @@ -557,14 +579,19 @@ SubprocessManager::Impl::EnqueueMetricsTimer() } m_MetricsTimer->expires_after(std::chrono::milliseconds(m_Config.MetricsSampleIntervalMs)); - m_MetricsTimer->async_wait([this](const asio::error_code& Ec) { - if (Ec || !m_Running.load()) + m_MetricsTimer->async_wait([Self = weak_from_this()](const asio::error_code& Ec) { + auto Locked = Self.lock(); + if (!Locked) + { + return; + } + if (Ec || !Locked->m_Running.load()) { return; } - SampleBatch(); - EnqueueMetricsTimer(); + Locked->SampleBatch(); + Locked->EnqueueMetricsTimer(); }); } @@ -711,8 +738,11 @@ SubprocessManager::Impl::EnumerateGroups(std::function<void(const ProcessGroup&) // ============================================================================ SubprocessManager::SubprocessManager(asio::io_context& IoContext, SubprocessManagerConfig Config) -: m_Impl(std::make_unique<Impl>(IoContext, Config)) +: m_Impl(std::make_shared<Impl>(IoContext, Config)) { + // Start the metrics timer now that the shared_ptr owns the Impl - only + // then does weak_from_this() produce a valid weak_ptr for the handler. + m_Impl->EnqueueMetricsTimer(); } SubprocessManager::~SubprocessManager() = default; @@ -1447,7 +1477,7 @@ TEST_CASE("SubprocessManager.RemoveWhileRunning") // Let it start IoContext.run_for(100ms); - // Remove without killing — callback should NOT fire after this + // Remove without killing - callback should NOT fire after this Manager.Remove(Pid); IoContext.run_for(500ms); @@ -1756,7 +1786,7 @@ TEST_CASE("ProcessGroup.FindGroup") TEST_CASE("SubprocessManager.StressTest" * doctest::skip()) { - // Seed for reproducibility — change to explore different orderings + // Seed for reproducibility - change to explore different orderings // // Note that while this is a stress test, it is still single-threaded @@ -1785,7 +1815,7 @@ TEST_CASE("SubprocessManager.StressTest" * doctest::skip()) // Phase 1: Spawn multiple groups with varied workloads // ======================================================================== - ZEN_INFO("StressTest: Phase 1 — spawning initial groups"); + ZEN_INFO("StressTest: Phase 1 - spawning initial groups"); constexpr int NumInitialGroups = 8; std::vector<std::string> GroupNames; @@ -1839,7 +1869,7 @@ TEST_CASE("SubprocessManager.StressTest" * doctest::skip()) // Phase 2: Randomly kill some groups, create replacements, add ungrouped // ======================================================================== - ZEN_INFO("StressTest: Phase 2 — random group kills and replacements"); + ZEN_INFO("StressTest: Phase 2 - random group kills and replacements"); constexpr int NumGroupsToKill = 3; @@ -1904,7 +1934,7 @@ TEST_CASE("SubprocessManager.StressTest" * doctest::skip()) // Phase 3: Rapid spawn/exit churn // ======================================================================== - ZEN_INFO("StressTest: Phase 3 — rapid spawn/exit churn"); + ZEN_INFO("StressTest: Phase 3 - rapid spawn/exit churn"); std::atomic<int> ChurnExitCount{0}; int TotalChurnSpawned = 0; @@ -1928,7 +1958,7 @@ TEST_CASE("SubprocessManager.StressTest" * doctest::skip()) // Brief pump to allow some exits to be processed IoContext.run_for(200ms); - // Destroy the group — any still-running processes get killed + // Destroy the group - any still-running processes get killed Manager.DestroyGroup(Name); } @@ -1938,7 +1968,7 @@ TEST_CASE("SubprocessManager.StressTest" * doctest::skip()) // Phase 4: Drain and verify // ======================================================================== - ZEN_INFO("StressTest: Phase 4 — draining remaining processes"); + ZEN_INFO("StressTest: Phase 4 - draining remaining processes"); // Check metrics were collected before we wind down AggregateProcessMetrics Agg = Manager.GetAggregateMetrics(); @@ -1969,7 +1999,7 @@ TEST_CASE("SubprocessManager.StressTest" * doctest::skip()) // (exact count is hard to predict due to killed groups, but should be > 0) CHECK(TotalExitCallbacks.load() > 0); - ZEN_INFO("StressTest: PASSED — seed={}", Seed); + ZEN_INFO("StressTest: PASSED - seed={}", Seed); } TEST_SUITE_END(); |