diff options
| author | Stefan Boberg <[email protected]> | 2026-03-30 15:07:08 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-30 15:07:08 +0200 |
| commit | 3540d676733efaddecf504b30e9a596465bd43f8 (patch) | |
| tree | 7a8d8b3d2da993e30c34e3ff36f659b90a2b228e /src/zencompute/runners/managedrunner.cpp | |
| parent | include rawHash in structure output for builds ls command (#903) (diff) | |
| download | zen-3540d676733efaddecf504b30e9a596465bd43f8.tar.xz zen-3540d676733efaddecf504b30e9a596465bd43f8.zip | |
Request validation and resilience improvements (#864)
### Security: Input validation & path safety
- **Reject local file references by default** in package parsing — only allow when explicitly opted in by the service (`ParseFlags::kAllowLocalReferences`) and validated by an `ILocalRefPolicy` (fail-closed: no policy = rejected)
- **`DataRootLocalRefPolicy`** restricts local ref paths to the server's data root via canonical path prefix matching
- **Validate attachment hashes** in compute HTTP handlers — decompresses and re-hashes each attachment at ingestion time to reject tampered payloads
- **Path traversal validation** for worker descriptions (`pathvalidation.h`) — rejects absolute paths, `..` components, Windows reserved device names, and invalid filename characters
- **Harden CbPackage parsing** against corrupt inputs — overflow-safe attachment count, bounds checks on local ref offset/size, graceful failure instead of `ZEN_ASSERT` for untrusted data
- **Harden legacy package parser** — reject zero-size binary fields, missing mappers, and optionally validate resolved attachment hashes
- **Bounds check in `CbPackageReader::MarshalLocalChunkReference`** — detect when `MakeFromFile` silently clamps offset+size to file size
### Reliability: Lock consolidation & bug fixes
- **Consolidate three action map locks into one** (`m_ActionMapLock`) — eliminates deadlock risk from multi-lock ordering, simplifies state transitions, and fixes a race where newly enqueued actions were briefly invisible to `GetActionResult`/`FindActionResult`
- **Fix infinite loop in `BaseRunnerGroup::SubmitActions`** when actions exceed total runner capacity — cap round-robin at `TotalCapacity` and default unassigned results to "No capacity"
- **Fix `MakeSafeAbsolutePathInPlace` for UNC paths** — `\server\share` now correctly becomes `\?\UNC\server\share` instead of `\?\server\share`
- **Fix `max_retries=0`** — previously fell through to the default of 3; now correctly means "no retries"
### New: ManagedProcessRunner
- Cross-platform process runner backed by `SubprocessManager` — uses async exit callbacks instead of polling, delegates CPU/memory metrics to the manager's built-in sampler
- `ProcessGroup` (JobObject on Windows, process group on POSIX) for bulk cancellation on shutdown
- `--managed` flag on `zen exec inproc` to select this runner
- Refactored monitor thread lifecycle — `StartMonitorThread()` now called from derived constructors to avoid calling virtual functions from base constructor
### Process management
- **Suppress crash dialogs** via `JOB_OBJECT_UILIMIT_ERRORMODE` + `SEM_NOGPFAULTERRORBOX` in both `WindowsProcessRunner` and `JobObject::Initialize` — prevents WER/Dr. Watson modal dialogs from blocking the monitor thread
- **CREATE_SUSPENDED → AssignProcessToJobObject → ResumeThread** pattern in `WindowsProcessRunner` — ensures job object assignment before process execution
- **Move stdout/stderr callbacks to `Spawn()` parameters** in `SubprocessManager` — prevents race where early output could be missed before callback installation
- Consistent PID logging across all runner types
### Test infrastructure
- **`zentest-appstub`**: Added `Fail` (configurable exit code) and `Crash` (abort / nullptr deref) test functions
- **Compute integration tests**: exit code handling, auto-retry exhaustion, manual reschedule after failure, mixed success/failure queues, crash handling (abort + nullptr), crash auto-retry, immediate query visibility after enqueue
- **Package format tests**: truncated header, bad magic, attachment count overflow, truncated data, local ref rejection/acceptance, policy enforcement (inside/outside root, traversal, no-policy fail-closed)
- **Legacy package parser tests**: empty input, zero-size binary, hash resolution with/without mapper, hash mismatch detection
- **UNC path tests** for `MakeSafeAbsolutePath`
### Misc
- ANSI color helper macros (`ZEN_RED`, `ZEN_BRIGHT_WHITE`, etc.) and `ZEN_BOLD`/`ZEN_DIM`/etc.
- Generic `fmt::formatter` for types with free `ToString` functions
- Compute dashboard: truncated hash display with monospace font and hover for full value
- Renamed `usonpackage_forcelink` → `cbpackage_forcelink`
- Compute enabled by default in xmake config (releases still explicitly disable)
Diffstat (limited to 'src/zencompute/runners/managedrunner.cpp')
| -rw-r--r-- | src/zencompute/runners/managedrunner.cpp | 279 |
1 files changed, 279 insertions, 0 deletions
diff --git a/src/zencompute/runners/managedrunner.cpp b/src/zencompute/runners/managedrunner.cpp new file mode 100644 index 000000000..e4a7ba388 --- /dev/null +++ b/src/zencompute/runners/managedrunner.cpp @@ -0,0 +1,279 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "managedrunner.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/except_fmt.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/scopeguard.h> +# include <zencore/timer.h> +# include <zencore/trace.h> + +ZEN_THIRD_PARTY_INCLUDES_START +# include <asio/io_context.hpp> +# include <asio/executor_work_guard.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen::compute { + +using namespace std::literals; + +ManagedProcessRunner::ManagedProcessRunner(ChunkResolver& Resolver, + const std::filesystem::path& BaseDir, + DeferredDirectoryDeleter& Deleter, + WorkerThreadPool& WorkerPool, + int32_t MaxConcurrentActions) +: LocalProcessRunner(Resolver, BaseDir, Deleter, WorkerPool, MaxConcurrentActions) +, m_IoContext(std::make_unique<asio::io_context>()) +, m_SubprocessManager(std::make_unique<SubprocessManager>(*m_IoContext)) +{ + m_ProcessGroup = m_SubprocessManager->CreateGroup("compute-workers"); + + // Run the io_context on a small thread pool so that exit callbacks and + // metrics sampling are dispatched without blocking each other. + for (int i = 0; i < kIoThreadCount; ++i) + { + m_IoThreads.emplace_back([this, i] { + SetCurrentThreadName(fmt::format("mrunner_{}", i)); + + // work_guard keeps run() alive even when there is no pending work yet + auto WorkGuard = asio::make_work_guard(*m_IoContext); + + m_IoContext->run(); + }); + } +} + +ManagedProcessRunner::~ManagedProcessRunner() +{ + try + { + Shutdown(); + } + catch (std::exception& Ex) + { + ZEN_WARN("exception during managed process runner shutdown: {}", Ex.what()); + } +} + +void +ManagedProcessRunner::Shutdown() +{ + ZEN_TRACE_CPU("ManagedProcessRunner::Shutdown"); + m_AcceptNewActions = false; + + CancelRunningActions(); + + // Tear down the SubprocessManager before stopping the io_context so that + // any in-flight callbacks are drained cleanly. + if (m_SubprocessManager) + { + m_SubprocessManager->DestroyGroup("compute-workers"); + m_ProcessGroup = nullptr; + m_SubprocessManager.reset(); + } + + if (m_IoContext) + { + m_IoContext->stop(); + } + + for (std::thread& Thread : m_IoThreads) + { + if (Thread.joinable()) + { + Thread.join(); + } + } + m_IoThreads.clear(); +} + +SubmitResult +ManagedProcessRunner::SubmitAction(Ref<RunnerAction> Action) +{ + ZEN_TRACE_CPU("ManagedProcessRunner::SubmitAction"); + std::optional<PreparedAction> Prepared = PrepareActionSubmission(Action); + + if (!Prepared) + { + return SubmitResult{.IsAccepted = false}; + } + + CbObject WorkerDescription = Prepared->WorkerPackage.GetObject(); + + // Parse environment variables from worker descriptor ("KEY=VALUE" strings) + // into the key-value pairs expected by CreateProcOptions. + std::vector<std::pair<std::string, std::string>> EnvPairs; + for (auto& It : WorkerDescription["environment"sv]) + { + std::string_view Str = It.AsString(); + size_t Eq = Str.find('='); + if (Eq != std::string_view::npos) + { + EnvPairs.emplace_back(std::string(Str.substr(0, Eq)), std::string(Str.substr(Eq + 1))); + } + } + + // Build command line + std::string_view ExecPath = WorkerDescription["path"sv].AsString(); + std::filesystem::path ExePath = Prepared->WorkerPath / std::filesystem::path(ExecPath).make_preferred(); + + std::string CommandLine = fmt::format("\"{}\" -Build=build.action"sv, ExePath.string()); + + ZEN_DEBUG("Executing (managed): '{}' (sandbox='{}')", CommandLine, Prepared->SandboxPath); + + CreateProcOptions Options; + Options.WorkingDirectory = &Prepared->SandboxPath; + Options.Flags = CreateProcOptions::Flag_NoConsole; + Options.Environment = std::move(EnvPairs); + + const int32_t ActionLsn = Prepared->ActionLsn; + + ManagedProcess* Proc = nullptr; + + try + { + Proc = m_ProcessGroup->Spawn(ExePath, CommandLine, Options, [this, ActionLsn](ManagedProcess& /*Process*/, int ExitCode) { + OnProcessExit(ActionLsn, ExitCode); + }); + } + catch (std::exception& Ex) + { + ZEN_ERROR("Failed to spawn process for action LSN {}: {}", ActionLsn, Ex.what()); + m_DeferredDeleter.Enqueue(ActionLsn, std::move(Prepared->SandboxPath)); + return SubmitResult{.IsAccepted = false}; + } + + { + Ref<RunningAction> NewAction{new RunningAction()}; + NewAction->Action = Action; + NewAction->ProcessHandle = static_cast<void*>(Proc); + NewAction->Pid = Proc->Pid(); + NewAction->SandboxPath = std::move(Prepared->SandboxPath); + + RwLock::ExclusiveLockScope _(m_RunningLock); + m_RunningMap[ActionLsn] = std::move(NewAction); + } + + Action->SetActionState(RunnerAction::State::Running); + + ZEN_DEBUG("Managed runner: action LSN {} -> PID {}", ActionLsn, Proc->Pid()); + + return SubmitResult{.IsAccepted = true}; +} + +void +ManagedProcessRunner::OnProcessExit(int ActionLsn, int ExitCode) +{ + ZEN_TRACE_CPU("ManagedProcessRunner::OnProcessExit"); + + Ref<RunningAction> Running; + + m_RunningLock.WithExclusiveLock([&] { + auto It = m_RunningMap.find(ActionLsn); + if (It != m_RunningMap.end()) + { + Running = std::move(It->second); + m_RunningMap.erase(It); + } + }); + + if (!Running) + { + return; + } + + ZEN_DEBUG("Managed runner: action LSN {} + PID {} exited with code " ZEN_BRIGHT_WHITE("{}"), ActionLsn, Running->Pid, ExitCode); + + Running->ExitCode = ExitCode; + + // Capture final CPU metrics from the managed process before it is removed. + auto* Proc = static_cast<ManagedProcess*>(Running->ProcessHandle); + if (Proc) + { + ProcessMetrics Metrics = Proc->GetLatestMetrics(); + float CpuMs = static_cast<float>(Metrics.UserTimeMs + Metrics.KernelTimeMs); + Running->Action->CpuSeconds.store(CpuMs / 1000.0f, std::memory_order_relaxed); + + float CpuPct = Proc->GetCpuUsagePercent(); + if (CpuPct >= 0.0f) + { + Running->Action->CpuUsagePercent.store(CpuPct, std::memory_order_relaxed); + } + } + + Running->ProcessHandle = nullptr; + + std::vector<Ref<RunningAction>> CompletedActions; + CompletedActions.push_back(std::move(Running)); + ProcessCompletedActions(CompletedActions); +} + +void +ManagedProcessRunner::CancelRunningActions() +{ + ZEN_TRACE_CPU("ManagedProcessRunner::CancelRunningActions"); + + std::unordered_map<int, Ref<RunningAction>> RunningMap; + m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); }); + + if (RunningMap.empty()) + { + return; + } + + ZEN_INFO("cancelling {} running actions via process group", RunningMap.size()); + + Stopwatch Timer; + + // Kill all processes in the group atomically (TerminateJobObject on Windows, + // SIGTERM+SIGKILL on POSIX). + if (m_ProcessGroup) + { + m_ProcessGroup->KillAll(); + } + + for (auto& [Lsn, Running] : RunningMap) + { + m_DeferredDeleter.Enqueue(Running->Action->ActionLsn, std::move(Running->SandboxPath)); + Running->Action->SetActionState(RunnerAction::State::Failed); + } + + ZEN_INFO("DONE - cancelled {} running processes (took {})", RunningMap.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); +} + +bool +ManagedProcessRunner::CancelAction(int ActionLsn) +{ + ZEN_TRACE_CPU("ManagedProcessRunner::CancelAction"); + + ManagedProcess* Proc = nullptr; + + m_RunningLock.WithSharedLock([&] { + auto It = m_RunningMap.find(ActionLsn); + if (It != m_RunningMap.end() && It->second->ProcessHandle != nullptr) + { + Proc = static_cast<ManagedProcess*>(It->second->ProcessHandle); + } + }); + + if (!Proc) + { + return false; + } + + // Terminate the process. The exit callback will handle the rest + // (remove from running map, gather outputs or mark failed). + Proc->Terminate(222); + + ZEN_DEBUG("CancelAction: initiated cancellation of LSN {}", ActionLsn); + return true; +} + +} // namespace zen::compute + +#endif |