aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/process
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/process')
-rw-r--r--src/zenutil/process/subprocessmanager.cpp246
1 files changed, 206 insertions, 40 deletions
diff --git a/src/zenutil/process/subprocessmanager.cpp b/src/zenutil/process/subprocessmanager.cpp
index 3a91b0a61..597df3c15 100644
--- a/src/zenutil/process/subprocessmanager.cpp
+++ b/src/zenutil/process/subprocessmanager.cpp
@@ -196,18 +196,6 @@ ManagedProcess::GetCpuUsagePercent() const
return m_Impl->m_CpuUsagePercent.load();
}
-void
-ManagedProcess::SetStdoutCallback(ProcessDataCallback Callback)
-{
- m_Impl->m_StdoutCallback = std::move(Callback);
-}
-
-void
-ManagedProcess::SetStderrCallback(ProcessDataCallback Callback)
-{
- m_Impl->m_StderrCallback = std::move(Callback);
-}
-
std::string
ManagedProcess::GetCapturedStdout() const
{
@@ -288,7 +276,9 @@ struct SubprocessManager::Impl
ManagedProcess* Spawn(const std::filesystem::path& Executable,
std::string_view CommandLine,
CreateProcOptions& Options,
- ProcessExitCallback OnExit);
+ ProcessExitCallback OnExit,
+ ProcessDataCallback OnStdout,
+ ProcessDataCallback OnStderr);
ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit);
void Remove(int Pid);
void RemoveAll();
@@ -462,7 +452,9 @@ ManagedProcess*
SubprocessManager::Impl::Spawn(const std::filesystem::path& Executable,
std::string_view CommandLine,
CreateProcOptions& Options,
- ProcessExitCallback OnExit)
+ ProcessExitCallback OnExit,
+ ProcessDataCallback OnStdout,
+ ProcessDataCallback OnStderr)
{
bool HasStdout = Options.StdoutPipe != nullptr;
bool HasStderr = Options.StderrPipe != nullptr;
@@ -476,6 +468,16 @@ SubprocessManager::Impl::Spawn(const std::filesystem::path& Executable,
ImplPtr->m_Handle.Initialize(static_cast<int>(Result));
#endif
+ // Install callbacks before starting async readers so no data is missed.
+ if (OnStdout)
+ {
+ ImplPtr->m_StdoutCallback = std::move(OnStdout);
+ }
+ if (OnStderr)
+ {
+ ImplPtr->m_StderrCallback = std::move(OnStderr);
+ }
+
auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr)));
ManagedProcess* Ptr = AddProcess(std::move(Proc));
@@ -719,10 +721,12 @@ ManagedProcess*
SubprocessManager::Spawn(const std::filesystem::path& Executable,
std::string_view CommandLine,
CreateProcOptions& Options,
- ProcessExitCallback OnExit)
+ ProcessExitCallback OnExit,
+ ProcessDataCallback OnStdout,
+ ProcessDataCallback OnStderr)
{
ZEN_TRACE_CPU("SubprocessManager::Spawn");
- return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit));
+ return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit), std::move(OnStdout), std::move(OnStderr));
}
ManagedProcess*
@@ -835,7 +839,9 @@ struct ProcessGroup::Impl
ManagedProcess* Spawn(const std::filesystem::path& Executable,
std::string_view CommandLine,
CreateProcOptions& Options,
- ProcessExitCallback OnExit);
+ ProcessExitCallback OnExit,
+ ProcessDataCallback OnStdout,
+ ProcessDataCallback OnStderr);
ManagedProcess* Adopt(ProcessHandle&& Handle, ProcessExitCallback OnExit);
void Remove(int Pid);
void KillAll();
@@ -884,7 +890,9 @@ ManagedProcess*
ProcessGroup::Impl::Spawn(const std::filesystem::path& Executable,
std::string_view CommandLine,
CreateProcOptions& Options,
- ProcessExitCallback OnExit)
+ ProcessExitCallback OnExit,
+ ProcessDataCallback OnStdout,
+ ProcessDataCallback OnStderr)
{
bool HasStdout = Options.StdoutPipe != nullptr;
bool HasStderr = Options.StderrPipe != nullptr;
@@ -895,7 +903,11 @@ ProcessGroup::Impl::Spawn(const std::filesystem::path& Executable,
Options.AssignToJob = &m_JobObject;
}
#else
- if (m_Pgid > 0)
+ if (m_Pgid == 0)
+ {
+ Options.Flags |= CreateProcOptions::Flag_NewProcessGroup;
+ }
+ else
{
Options.ProcessGroupId = m_Pgid;
}
@@ -917,6 +929,16 @@ ProcessGroup::Impl::Spawn(const std::filesystem::path& Executable,
}
#endif
+ // Install callbacks before starting async readers so no data is missed.
+ if (OnStdout)
+ {
+ ImplPtr->m_StdoutCallback = std::move(OnStdout);
+ }
+ if (OnStderr)
+ {
+ ImplPtr->m_StderrCallback = std::move(OnStderr);
+ }
+
auto Proc = std::unique_ptr<ManagedProcess>(new ManagedProcess(std::move(ImplPtr)));
ManagedProcess* Ptr = AddProcess(std::move(Proc));
@@ -1077,10 +1099,12 @@ ManagedProcess*
ProcessGroup::Spawn(const std::filesystem::path& Executable,
std::string_view CommandLine,
CreateProcOptions& Options,
- ProcessExitCallback OnExit)
+ ProcessExitCallback OnExit,
+ ProcessDataCallback OnStdout,
+ ProcessDataCallback OnStderr)
{
ZEN_TRACE_CPU("ProcessGroup::Spawn");
- return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit));
+ return m_Impl->Spawn(Executable, CommandLine, Options, std::move(OnExit), std::move(OnStdout), std::move(OnStderr));
}
ManagedProcess*
@@ -1185,7 +1209,17 @@ TEST_CASE("SubprocessManager.SpawnAndDetectExit")
CallbackFired = true;
});
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (CallbackFired)
+ {
+ break;
+ }
+ }
+ }
CHECK(CallbackFired);
CHECK(ReceivedExitCode == 42);
@@ -1210,7 +1244,17 @@ TEST_CASE("SubprocessManager.SpawnAndDetectCleanExit")
CallbackFired = true;
});
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (CallbackFired)
+ {
+ break;
+ }
+ }
+ }
CHECK(CallbackFired);
CHECK(ReceivedExitCode == 0);
@@ -1235,7 +1279,17 @@ TEST_CASE("SubprocessManager.StdoutCapture")
ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; });
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (Exited)
+ {
+ break;
+ }
+ }
+ }
CHECK(Exited);
std::string Captured = Proc->GetCapturedStdout();
@@ -1264,7 +1318,17 @@ TEST_CASE("SubprocessManager.StderrCapture")
ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; });
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (Exited)
+ {
+ break;
+ }
+ }
+ }
CHECK(Exited);
std::string CapturedErr = Proc->GetCapturedStderr();
@@ -1289,11 +1353,24 @@ TEST_CASE("SubprocessManager.StdoutCallback")
std::string ReceivedData;
bool Exited = false;
- ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; });
+ ManagedProcess* Proc = Manager.Spawn(
+ AppStub,
+ CmdLine,
+ Options,
+ [&](ManagedProcess&, int) { Exited = true; },
+ [&](ManagedProcess&, std::string_view Data) { ReceivedData.append(Data); });
- Proc->SetStdoutCallback([&](ManagedProcess&, std::string_view Data) { ReceivedData.append(Data); });
-
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (Exited)
+ {
+ break;
+ }
+ }
+ }
CHECK(Exited);
CHECK(ReceivedData.find("callback_test") != std::string::npos);
@@ -1316,8 +1393,18 @@ TEST_CASE("SubprocessManager.MetricsSampling")
ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; });
- // Run for enough time to get metrics samples
- IoContext.run_for(1s);
+ // Poll until metrics are available
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (Proc->GetLatestMetrics().WorkingSetSize > 0)
+ {
+ break;
+ }
+ }
+ }
ProcessMetrics Metrics = Proc->GetLatestMetrics();
CHECK(Metrics.WorkingSetSize > 0);
@@ -1326,7 +1413,17 @@ TEST_CASE("SubprocessManager.MetricsSampling")
CHECK(Snapshot.size() == 1);
// Let it finish
- IoContext.run_for(3s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 10'000)
+ {
+ IoContext.run_for(10ms);
+ if (Exited)
+ {
+ break;
+ }
+ }
+ }
CHECK(Exited);
}
@@ -1375,12 +1472,31 @@ TEST_CASE("SubprocessManager.KillAndWaitForExit")
ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { CallbackFired = true; });
// Let it start
- IoContext.run_for(200ms);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (Proc->IsRunning())
+ {
+ break;
+ }
+ }
+ }
Proc->Kill();
- IoContext.run_for(2s);
-
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (CallbackFired)
+ {
+ break;
+ }
+ }
+ }
CHECK(CallbackFired);
}
@@ -1401,7 +1517,17 @@ TEST_CASE("SubprocessManager.AdoptProcess")
Manager.Adopt(ProcessHandle(Result), [&](ManagedProcess&, int ExitCode) { ReceivedExitCode = ExitCode; });
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (ReceivedExitCode != -1)
+ {
+ break;
+ }
+ }
+ }
CHECK(ReceivedExitCode == 7);
}
@@ -1424,7 +1550,17 @@ TEST_CASE("SubprocessManager.UserTag")
Proc->SetTag("my-worker-1");
CHECK(Proc->GetTag() == "my-worker-1");
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (!ReceivedTag.empty())
+ {
+ break;
+ }
+ }
+ }
CHECK(ReceivedTag == "my-worker-1");
}
@@ -1454,7 +1590,17 @@ TEST_CASE("ProcessGroup.SpawnAndMembership")
CHECK(Group->GetProcessCount() == 2);
CHECK(Manager.GetProcessCount() == 2);
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (ExitCount == 2)
+ {
+ break;
+ }
+ }
+ }
CHECK(ExitCount == 2);
}
@@ -1504,7 +1650,17 @@ TEST_CASE("ProcessGroup.AggregateMetrics")
Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {});
// Wait for metrics sampling
- IoContext.run_for(1s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (Group->GetAggregateMetrics().TotalWorkingSetSize > 0)
+ {
+ break;
+ }
+ }
+ }
AggregateProcessMetrics GroupAgg = Group->GetAggregateMetrics();
CHECK(GroupAgg.ProcessCount == 2);
@@ -1570,7 +1726,17 @@ TEST_CASE("ProcessGroup.MixedGroupedAndUngrouped")
CHECK(Group->GetProcessCount() == 2);
CHECK(Manager.GetProcessCount() == 3);
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (GroupExitCount == 2 && UngroupedExitCode != -1)
+ {
+ break;
+ }
+ }
+ }
CHECK(GroupExitCount == 2);
CHECK(UngroupedExitCode == 0);