// Copyright Epic Games, Inc. All Rights Reserved. #include "managedrunner.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include # include # include # include ZEN_THIRD_PARTY_INCLUDES_START # include # include ZEN_THIRD_PARTY_INCLUDES_END namespace zen::compute { using namespace std::literals; ManagedProcessRunner::ManagedProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir, DeferredDirectoryDeleter& Deleter, WorkerThreadPool& WorkerPool, int32_t MaxConcurrentActions) : LocalProcessRunner(Resolver, BaseDir, Deleter, WorkerPool, MaxConcurrentActions) , m_IoContext(std::make_unique()) , m_SubprocessManager(std::make_unique(*m_IoContext)) { m_ProcessGroup = m_SubprocessManager->CreateGroup("compute-workers"); // Run the io_context on a small thread pool so that exit callbacks and // metrics sampling are dispatched without blocking each other. for (int i = 0; i < kIoThreadCount; ++i) { m_IoThreads.emplace_back([this, i] { SetCurrentThreadName(fmt::format("mrunner_{}", i)); // work_guard keeps run() alive even when there is no pending work yet auto WorkGuard = asio::make_work_guard(*m_IoContext); m_IoContext->run(); }); } } ManagedProcessRunner::~ManagedProcessRunner() { try { Shutdown(); } catch (std::exception& Ex) { ZEN_WARN("exception during managed process runner shutdown: {}", Ex.what()); } } void ManagedProcessRunner::Shutdown() { ZEN_TRACE_CPU("ManagedProcessRunner::Shutdown"); m_AcceptNewActions = false; CancelRunningActions(); // Tear down the SubprocessManager before stopping the io_context so that // any in-flight callbacks are drained cleanly. if (m_SubprocessManager) { m_SubprocessManager->DestroyGroup("compute-workers"); m_ProcessGroup = nullptr; m_SubprocessManager.reset(); } if (m_IoContext) { m_IoContext->stop(); } for (std::thread& Thread : m_IoThreads) { if (Thread.joinable()) { Thread.join(); } } m_IoThreads.clear(); } SubmitResult ManagedProcessRunner::SubmitAction(Ref Action) { ZEN_TRACE_CPU("ManagedProcessRunner::SubmitAction"); std::optional Prepared = PrepareActionSubmission(Action); if (!Prepared) { return SubmitResult{.IsAccepted = false}; } CbObject WorkerDescription = Prepared->WorkerPackage.GetObject(); // Parse environment variables from worker descriptor ("KEY=VALUE" strings) // into the key-value pairs expected by CreateProcOptions. std::vector> EnvPairs; for (auto& It : WorkerDescription["environment"sv]) { std::string_view Str = It.AsString(); size_t Eq = Str.find('='); if (Eq != std::string_view::npos) { EnvPairs.emplace_back(std::string(Str.substr(0, Eq)), std::string(Str.substr(Eq + 1))); } } // Build command line std::string_view ExecPath = WorkerDescription["path"sv].AsString(); std::filesystem::path ExePath = Prepared->WorkerPath / std::filesystem::path(ExecPath).make_preferred(); std::string CommandLine = fmt::format("\"{}\" -Build=build.action"sv, ExePath.string()); ZEN_DEBUG("Executing (managed): '{}' (sandbox='{}')", CommandLine, Prepared->SandboxPath); CreateProcOptions Options; Options.WorkingDirectory = &Prepared->SandboxPath; Options.Flags = CreateProcOptions::Flag_NoConsole; Options.Environment = std::move(EnvPairs); const int32_t ActionLsn = Prepared->ActionLsn; ManagedProcess* Proc = nullptr; try { Proc = m_ProcessGroup->Spawn(ExePath, CommandLine, Options, [this, ActionLsn](ManagedProcess& /*Process*/, int ExitCode) { OnProcessExit(ActionLsn, ExitCode); }); } catch (std::exception& Ex) { ZEN_ERROR("Failed to spawn process for action LSN {}: {}", ActionLsn, Ex.what()); m_DeferredDeleter.Enqueue(ActionLsn, std::move(Prepared->SandboxPath)); return SubmitResult{.IsAccepted = false}; } { Ref NewAction{new RunningAction()}; NewAction->Action = Action; NewAction->ProcessHandle = static_cast(Proc); NewAction->Pid = Proc->Pid(); NewAction->SandboxPath = std::move(Prepared->SandboxPath); RwLock::ExclusiveLockScope _(m_RunningLock); m_RunningMap[ActionLsn] = std::move(NewAction); } Action->SetActionState(RunnerAction::State::Running); ZEN_DEBUG("Managed runner: action LSN {} -> PID {}", ActionLsn, Proc->Pid()); return SubmitResult{.IsAccepted = true}; } void ManagedProcessRunner::OnProcessExit(int ActionLsn, int ExitCode) { ZEN_TRACE_CPU("ManagedProcessRunner::OnProcessExit"); Ref Running; m_RunningLock.WithExclusiveLock([&] { auto It = m_RunningMap.find(ActionLsn); if (It != m_RunningMap.end()) { Running = std::move(It->second); m_RunningMap.erase(It); } }); if (!Running) { return; } ZEN_DEBUG("Managed runner: action LSN {} + PID {} exited with code " ZEN_BRIGHT_WHITE("{}"), ActionLsn, Running->Pid, ExitCode); Running->ExitCode = ExitCode; // Capture final CPU metrics from the managed process before it is removed. auto* Proc = static_cast(Running->ProcessHandle); if (Proc) { ProcessMetrics Metrics = Proc->GetLatestMetrics(); float CpuMs = static_cast(Metrics.UserTimeMs + Metrics.KernelTimeMs); Running->Action->CpuSeconds.store(CpuMs / 1000.0f, std::memory_order_relaxed); float CpuPct = Proc->GetCpuUsagePercent(); if (CpuPct >= 0.0f) { Running->Action->CpuUsagePercent.store(CpuPct, std::memory_order_relaxed); } } Running->ProcessHandle = nullptr; std::vector> CompletedActions; CompletedActions.push_back(std::move(Running)); ProcessCompletedActions(CompletedActions); } void ManagedProcessRunner::CancelRunningActions() { ZEN_TRACE_CPU("ManagedProcessRunner::CancelRunningActions"); std::unordered_map> RunningMap; m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); }); if (RunningMap.empty()) { return; } ZEN_INFO("cancelling {} running actions via process group", RunningMap.size()); Stopwatch Timer; // Kill all processes in the group atomically (TerminateJobObject on Windows, // SIGTERM+SIGKILL on POSIX). if (m_ProcessGroup) { m_ProcessGroup->KillAll(); } for (auto& [Lsn, Running] : RunningMap) { m_DeferredDeleter.Enqueue(Running->Action->ActionLsn, std::move(Running->SandboxPath)); Running->Action->SetActionState(RunnerAction::State::Failed); } ZEN_INFO("DONE - cancelled {} running processes (took {})", RunningMap.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } bool ManagedProcessRunner::CancelAction(int ActionLsn) { ZEN_TRACE_CPU("ManagedProcessRunner::CancelAction"); ManagedProcess* Proc = nullptr; m_RunningLock.WithSharedLock([&] { auto It = m_RunningMap.find(ActionLsn); if (It != m_RunningMap.end() && It->second->ProcessHandle != nullptr) { Proc = static_cast(It->second->ProcessHandle); } }); if (!Proc) { return false; } // Terminate the process. The exit callback will handle the rest // (remove from running map, gather outputs or mark failed). Proc->Terminate(222); ZEN_DEBUG("CancelAction: initiated cancellation of LSN {}", ActionLsn); return true; } } // namespace zen::compute #endif