diff options
Diffstat (limited to 'src/zenutil/process')
| -rw-r--r-- | src/zenutil/process/subprocessmanager.cpp | 246 |
1 files changed, 206 insertions, 40 deletions
diff --git a/src/zenutil/process/subprocessmanager.cpp b/src/zenutil/process/subprocessmanager.cpp index 3a91b0a61..597df3c15 100644 --- a/src/zenutil/process/subprocessmanager.cpp +++ b/src/zenutil/process/subprocessmanager.cpp @@ -196,18 +196,6 @@ ManagedProcess::GetCpuUsagePercent() const return m_Impl->m_CpuUsagePercent.load(); } -void -ManagedProcess::SetStdoutCallback(ProcessDataCallback Callback) -{ - m_Impl->m_StdoutCallback = std::move(Callback); -} - -void -ManagedProcess::SetStderrCallback(ProcessDataCallback Callback) -{ - m_Impl->m_StderrCallback = std::move(Callback); -} - std::string ManagedProcess::GetCapturedStdout() const { @@ -288,7 +276,9 @@ struct SubprocessManager::Impl ManagedProcess* Spawn(const std::filesystem::path& Executable, std::string_view CommandLine, CreateProcOptions& Options, - ProcessExitCallback OnExit); + ProcessExitCallback OnExit, + ProcessDataCallback OnStdout, + ProcessDataCallback OnStderr); ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit); void Remove(int Pid); void RemoveAll(); @@ -462,7 +452,9 @@ ManagedProcess* SubprocessManager::Impl::Spawn(const std::filesystem::path& Executable, std::string_view CommandLine, CreateProcOptions& Options, - ProcessExitCallback OnExit) + ProcessExitCallback OnExit, + ProcessDataCallback OnStdout, + ProcessDataCallback OnStderr) { bool HasStdout = Options.StdoutPipe != nullptr; bool HasStderr = Options.StderrPipe != nullptr; @@ -476,6 +468,16 @@ SubprocessManager::Impl::Spawn(const std::filesystem::path& Executable, ImplPtr->m_Handle.Initialize(static_cast<int>(Result)); #endif + // Install callbacks before starting async readers so no data is missed. + if (OnStdout) + { + ImplPtr->m_StdoutCallback = std::move(OnStdout); + } + if (OnStderr) + { + ImplPtr->m_StderrCallback = std::move(OnStderr); + } + auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr))); ManagedProcess* Ptr = AddProcess(std::move(Proc)); @@ -719,10 +721,12 @@ ManagedProcess* SubprocessManager::Spawn(const std::filesystem::path& Executable, std::string_view CommandLine, CreateProcOptions& Options, - ProcessExitCallback OnExit) + ProcessExitCallback OnExit, + ProcessDataCallback OnStdout, + ProcessDataCallback OnStderr) { ZEN_TRACE_CPU("SubprocessManager::Spawn"); - return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit)); + return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit), std::move(OnStdout), std::move(OnStderr)); } ManagedProcess* @@ -835,7 +839,9 @@ struct ProcessGroup::Impl ManagedProcess* Spawn(const std::filesystem::path& Executable, std::string_view CommandLine, CreateProcOptions& Options, - ProcessExitCallback OnExit); + ProcessExitCallback OnExit, + ProcessDataCallback OnStdout, + ProcessDataCallback OnStderr); ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit); void Remove(int Pid); void KillAll(); @@ -884,7 +890,9 @@ ManagedProcess* ProcessGroup::Impl::Spawn(const std::filesystem::path& Executable, std::string_view CommandLine, CreateProcOptions& Options, - ProcessExitCallback OnExit) + ProcessExitCallback OnExit, + ProcessDataCallback OnStdout, + ProcessDataCallback OnStderr) { bool HasStdout = Options.StdoutPipe != nullptr; bool HasStderr = Options.StderrPipe != nullptr; @@ -895,7 +903,11 @@ ProcessGroup::Impl::Spawn(const std::filesystem::path& Executable, Options.AssignToJob = &m_JobObject; } #else - if (m_Pgid > 0) + if (m_Pgid == 0) + { + Options.Flags |= CreateProcOptions::Flag_NewProcessGroup; + } + else { Options.ProcessGroupId = m_Pgid; } @@ -917,6 +929,16 @@ ProcessGroup::Impl::Spawn(const std::filesystem::path& Executable, } #endif + // Install callbacks before starting async readers so no data is missed. + if (OnStdout) + { + ImplPtr->m_StdoutCallback = std::move(OnStdout); + } + if (OnStderr) + { + ImplPtr->m_StderrCallback = std::move(OnStderr); + } + auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr))); ManagedProcess* Ptr = AddProcess(std::move(Proc)); @@ -1077,10 +1099,12 @@ ManagedProcess* ProcessGroup::Spawn(const std::filesystem::path& Executable, std::string_view CommandLine, CreateProcOptions& Options, - ProcessExitCallback OnExit) + ProcessExitCallback OnExit, + ProcessDataCallback OnStdout, + ProcessDataCallback OnStderr) { ZEN_TRACE_CPU("ProcessGroup::Spawn"); - return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit)); + return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit), std::move(OnStdout), std::move(OnStderr)); } ManagedProcess* @@ -1185,7 +1209,17 @@ TEST_CASE("SubprocessManager.SpawnAndDetectExit") CallbackFired = true; }); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (CallbackFired) + { + break; + } + } + } CHECK(CallbackFired); CHECK(ReceivedExitCode == 42); @@ -1210,7 +1244,17 @@ TEST_CASE("SubprocessManager.SpawnAndDetectCleanExit") CallbackFired = true; }); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (CallbackFired) + { + break; + } + } + } CHECK(CallbackFired); CHECK(ReceivedExitCode == 0); @@ -1235,7 +1279,17 @@ TEST_CASE("SubprocessManager.StdoutCapture") ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (Exited) + { + break; + } + } + } CHECK(Exited); std::string Captured = Proc->GetCapturedStdout(); @@ -1264,7 +1318,17 @@ TEST_CASE("SubprocessManager.StderrCapture") ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (Exited) + { + break; + } + } + } CHECK(Exited); std::string CapturedErr = Proc->GetCapturedStderr(); @@ -1289,11 +1353,24 @@ TEST_CASE("SubprocessManager.StdoutCallback") std::string ReceivedData; bool Exited = false; - ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); + ManagedProcess* Proc = Manager.Spawn( + AppStub, + CmdLine, + Options, + [&](ManagedProcess&, int) { Exited = true; }, + [&](ManagedProcess&, std::string_view Data) { ReceivedData.append(Data); }); - Proc->SetStdoutCallback([&](ManagedProcess&, std::string_view Data) { ReceivedData.append(Data); }); - - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (Exited) + { + break; + } + } + } CHECK(Exited); CHECK(ReceivedData.find("callback_test") != std::string::npos); @@ -1316,8 +1393,18 @@ TEST_CASE("SubprocessManager.MetricsSampling") ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); - // Run for enough time to get metrics samples - IoContext.run_for(1s); + // Poll until metrics are available + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (Proc->GetLatestMetrics().WorkingSetSize > 0) + { + break; + } + } + } ProcessMetrics Metrics = Proc->GetLatestMetrics(); CHECK(Metrics.WorkingSetSize > 0); @@ -1326,7 +1413,17 @@ TEST_CASE("SubprocessManager.MetricsSampling") CHECK(Snapshot.size() == 1); // Let it finish - IoContext.run_for(3s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 10'000) + { + IoContext.run_for(10ms); + if (Exited) + { + break; + } + } + } CHECK(Exited); } @@ -1375,12 +1472,31 @@ TEST_CASE("SubprocessManager.KillAndWaitForExit") ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { CallbackFired = true; }); // Let it start - IoContext.run_for(200ms); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (Proc->IsRunning()) + { + break; + } + } + } Proc->Kill(); - IoContext.run_for(2s); - + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (CallbackFired) + { + break; + } + } + } CHECK(CallbackFired); } @@ -1401,7 +1517,17 @@ TEST_CASE("SubprocessManager.AdoptProcess") Manager.Adopt(ProcessHandle(Result), [&](ManagedProcess&, int ExitCode) { ReceivedExitCode = ExitCode; }); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (ReceivedExitCode != -1) + { + break; + } + } + } CHECK(ReceivedExitCode == 7); } @@ -1424,7 +1550,17 @@ TEST_CASE("SubprocessManager.UserTag") Proc->SetTag("my-worker-1"); CHECK(Proc->GetTag() == "my-worker-1"); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (!ReceivedTag.empty()) + { + break; + } + } + } CHECK(ReceivedTag == "my-worker-1"); } @@ -1454,7 +1590,17 @@ TEST_CASE("ProcessGroup.SpawnAndMembership") CHECK(Group->GetProcessCount() == 2); CHECK(Manager.GetProcessCount() == 2); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (ExitCount == 2) + { + break; + } + } + } CHECK(ExitCount == 2); } @@ -1504,7 +1650,17 @@ TEST_CASE("ProcessGroup.AggregateMetrics") Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); // Wait for metrics sampling - IoContext.run_for(1s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (Group->GetAggregateMetrics().TotalWorkingSetSize > 0) + { + break; + } + } + } AggregateProcessMetrics GroupAgg = Group->GetAggregateMetrics(); CHECK(GroupAgg.ProcessCount == 2); @@ -1570,7 +1726,17 @@ TEST_CASE("ProcessGroup.MixedGroupedAndUngrouped") CHECK(Group->GetProcessCount() == 2); CHECK(Manager.GetProcessCount() == 3); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (GroupExitCount == 2 && UngroupedExitCode != -1) + { + break; + } + } + } CHECK(GroupExitCount == 2); CHECK(UngroupedExitCode == 0); |