diff options
| author | Stefan Boberg <[email protected]> | 2026-03-30 15:07:08 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-30 15:07:08 +0200 |
| commit | 3540d676733efaddecf504b30e9a596465bd43f8 (patch) | |
| tree | 7a8d8b3d2da993e30c34e3ff36f659b90a2b228e /src/zencompute | |
| parent | include rawHash in structure output for builds ls command (#903) (diff) | |
| download | zen-3540d676733efaddecf504b30e9a596465bd43f8.tar.xz zen-3540d676733efaddecf504b30e9a596465bd43f8.zip | |
Request validation and resilience improvements (#864)
### Security: Input validation & path safety
- **Reject local file references by default** in package parsing — only allow when explicitly opted in by the service (`ParseFlags::kAllowLocalReferences`) and validated by an `ILocalRefPolicy` (fail-closed: no policy = rejected)
- **`DataRootLocalRefPolicy`** restricts local ref paths to the server's data root via canonical path prefix matching
- **Validate attachment hashes** in compute HTTP handlers — decompresses and re-hashes each attachment at ingestion time to reject tampered payloads
- **Path traversal validation** for worker descriptions (`pathvalidation.h`) — rejects absolute paths, `..` components, Windows reserved device names, and invalid filename characters
- **Harden CbPackage parsing** against corrupt inputs — overflow-safe attachment count, bounds checks on local ref offset/size, graceful failure instead of `ZEN_ASSERT` for untrusted data
- **Harden legacy package parser** — reject zero-size binary fields, missing mappers, and optionally validate resolved attachment hashes
- **Bounds check in `CbPackageReader::MarshalLocalChunkReference`** — detect when `MakeFromFile` silently clamps offset+size to file size
### Reliability: Lock consolidation & bug fixes
- **Consolidate three action map locks into one** (`m_ActionMapLock`) — eliminates deadlock risk from multi-lock ordering, simplifies state transitions, and fixes a race where newly enqueued actions were briefly invisible to `GetActionResult`/`FindActionResult`
- **Fix infinite loop in `BaseRunnerGroup::SubmitActions`** when actions exceed total runner capacity — cap round-robin at `TotalCapacity` and default unassigned results to "No capacity"
- **Fix `MakeSafeAbsolutePathInPlace` for UNC paths** — `\server\share` now correctly becomes `\?\UNC\server\share` instead of `\?\server\share`
- **Fix `max_retries=0`** — previously fell through to the default of 3; now correctly means "no retries"
### New: ManagedProcessRunner
- Cross-platform process runner backed by `SubprocessManager` — uses async exit callbacks instead of polling, delegates CPU/memory metrics to the manager's built-in sampler
- `ProcessGroup` (JobObject on Windows, process group on POSIX) for bulk cancellation on shutdown
- `--managed` flag on `zen exec inproc` to select this runner
- Refactored monitor thread lifecycle — `StartMonitorThread()` now called from derived constructors to avoid calling virtual functions from base constructor
### Process management
- **Suppress crash dialogs** via `JOB_OBJECT_UILIMIT_ERRORMODE` + `SEM_NOGPFAULTERRORBOX` in both `WindowsProcessRunner` and `JobObject::Initialize` — prevents WER/Dr. Watson modal dialogs from blocking the monitor thread
- **CREATE_SUSPENDED → AssignProcessToJobObject → ResumeThread** pattern in `WindowsProcessRunner` — ensures job object assignment before process execution
- **Move stdout/stderr callbacks to `Spawn()` parameters** in `SubprocessManager` — prevents race where early output could be missed before callback installation
- Consistent PID logging across all runner types
### Test infrastructure
- **`zentest-appstub`**: Added `Fail` (configurable exit code) and `Crash` (abort / nullptr deref) test functions
- **Compute integration tests**: exit code handling, auto-retry exhaustion, manual reschedule after failure, mixed success/failure queues, crash handling (abort + nullptr), crash auto-retry, immediate query visibility after enqueue
- **Package format tests**: truncated header, bad magic, attachment count overflow, truncated data, local ref rejection/acceptance, policy enforcement (inside/outside root, traversal, no-policy fail-closed)
- **Legacy package parser tests**: empty input, zero-size binary, hash resolution with/without mapper, hash mismatch detection
- **UNC path tests** for `MakeSafeAbsolutePath`
### Misc
- ANSI color helper macros (`ZEN_RED`, `ZEN_BRIGHT_WHITE`, etc.) and `ZEN_BOLD`/`ZEN_DIM`/etc.
- Generic `fmt::formatter` for types with free `ToString` functions
- Compute dashboard: truncated hash display with monospace font and hover for full value
- Renamed `usonpackage_forcelink` → `cbpackage_forcelink`
- Compute enabled by default in xmake config (releases still explicitly disable)
Diffstat (limited to 'src/zencompute')
| -rw-r--r-- | src/zencompute/CLAUDE.md | 10 | ||||
| -rw-r--r-- | src/zencompute/computeservice.cpp | 257 | ||||
| -rw-r--r-- | src/zencompute/httpcomputeservice.cpp | 104 | ||||
| -rw-r--r-- | src/zencompute/include/zencompute/computeservice.h | 1 | ||||
| -rw-r--r-- | src/zencompute/pathvalidation.h | 118 | ||||
| -rw-r--r-- | src/zencompute/runners/functionrunner.cpp | 12 | ||||
| -rw-r--r-- | src/zencompute/runners/linuxrunner.cpp | 2 | ||||
| -rw-r--r-- | src/zencompute/runners/localrunner.cpp | 17 | ||||
| -rw-r--r-- | src/zencompute/runners/localrunner.h | 4 | ||||
| -rw-r--r-- | src/zencompute/runners/macrunner.cpp | 2 | ||||
| -rw-r--r-- | src/zencompute/runners/managedrunner.cpp | 279 | ||||
| -rw-r--r-- | src/zencompute/runners/managedrunner.h | 64 | ||||
| -rw-r--r-- | src/zencompute/runners/windowsrunner.cpp | 99 | ||||
| -rw-r--r-- | src/zencompute/runners/windowsrunner.h | 1 | ||||
| -rw-r--r-- | src/zencompute/runners/winerunner.cpp | 2 |
15 files changed, 767 insertions, 205 deletions
diff --git a/src/zencompute/CLAUDE.md b/src/zencompute/CLAUDE.md index a1a39fc3c..750879d5a 100644 --- a/src/zencompute/CLAUDE.md +++ b/src/zencompute/CLAUDE.md @@ -141,7 +141,7 @@ Actions that fail or are abandoned can be automatically retried or manually resc **Manual retry (API path):** `POST /compute/jobs/{lsn}` calls `RescheduleAction()`, which finds the action in `m_ResultsMap`, validates state (must be Failed or Abandoned), checks the retry limit, reverses queue counters (moving the LSN from `FinishedLsns` back to `ActiveLsns`), removes from results, and calls `ResetActionStateToPending()`. Returns 200 with `{lsn, retry_count}` on success, 409 Conflict with `{error}` on failure. -**Retry limit:** Default of 3, overridable per-queue via the `max_retries` integer field in the queue's `Config` CbObject (set at `CreateQueue` time). Both automatic and manual paths respect this limit. +**Retry limit:** Default of 3, overridable per-queue via the `max_retries` integer field in the queue's `Config` CbObject (set at `CreateQueue` time). Setting `max_retries=0` disables automatic retry entirely; omitting the field (or setting it to a negative value) uses the default of 3. Both automatic and manual paths respect this limit. **Retraction (API path):** `RetractAction(Lsn)` pulls a Pending/Submitting/Running action back for rescheduling on a different runner. The action transitions to Retracted, then `ResetActionStateToPending()` is called *without* incrementing `RetryCount`. Retraction is idempotent. @@ -156,7 +156,7 @@ Queues group actions from a single client session. A `QueueEntry` (internal) tra - `ActiveLsns` — for cancellation lookup (under `m_Lock`) - `FinishedLsns` — moved here when actions complete - `IdleSince` — used for 15-minute automatic expiry -- `Config` — CbObject set at creation; supports `max_retries` (int) to override the default retry limit +- `Config` — CbObject set at creation; supports `max_retries` (int, default 3) to override the default retry limit. `0` = no retries, negative or absent = use default **Queue state machine (`QueueState` enum):** ``` @@ -216,11 +216,7 @@ Worker handler logic is extracted into private helpers (`HandleWorkersGet`, `Han ## Concurrency Model -**Locking discipline:** When multiple locks must be held simultaneously, always acquire in this order to prevent deadlocks: -1. `m_ResultsLock` -2. `m_RunningLock` (comment in localrunner.h: "must be taken *after* m_ResultsLock") -3. `m_PendingLock` -4. `m_QueueLock` +**Locking discipline:** The three action maps (`m_PendingActions`, `m_RunningMap`, `m_ResultsMap`) are guarded by a single `m_ActionMapLock`. This eliminates lock-ordering concerns between maps and prevents actions from being temporarily absent from all maps during state transitions. Runner-level `m_RunningLock` in `LocalProcessRunner` / `RemoteHttpRunner` is a separate lock on a different class — unrelated to the session-level action map lock. **Atomic fields** for counters and simple state: queue counts, `CpuUsagePercent`, `CpuSeconds`, `RetryCount`, `RunnerAction::m_ActionState`. diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp index 92901de64..58761556a 100644 --- a/src/zencompute/computeservice.cpp +++ b/src/zencompute/computeservice.cpp @@ -8,6 +8,8 @@ # include "recording/actionrecorder.h" # include "runners/localrunner.h" # include "runners/remotehttprunner.h" +# include "runners/managedrunner.h" +# include "pathvalidation.h" # if ZEN_PLATFORM_LINUX # include "runners/linuxrunner.h" # elif ZEN_PLATFORM_WINDOWS @@ -195,13 +197,9 @@ struct ComputeServiceSession::Impl std::atomic<IComputeCompletionObserver*> m_CompletionObserver{nullptr}; - RwLock m_PendingLock; - std::map<int, Ref<RunnerAction>> m_PendingActions; - - RwLock m_RunningLock; + RwLock m_ActionMapLock; // Guards m_PendingActions, m_RunningMap, m_ResultsMap + std::map<int, Ref<RunnerAction>> m_PendingActions; std::unordered_map<int, Ref<RunnerAction>> m_RunningMap; - - RwLock m_ResultsLock; std::unordered_map<int, Ref<RunnerAction>> m_ResultsMap; metrics::Meter m_ResultRate; std::atomic<uint64_t> m_RetiredCount{0}; @@ -343,9 +341,12 @@ struct ComputeServiceSession::Impl ActionCounts GetActionCounts() { ActionCounts Counts; - Counts.Pending = (int)m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }); - Counts.Running = (int)m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }); - Counts.Completed = (int)m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }) + (int)m_RetiredCount.load(); + m_ActionMapLock.WithSharedLock([&] { + Counts.Pending = (int)m_PendingActions.size(); + Counts.Running = (int)m_RunningMap.size(); + Counts.Completed = (int)m_ResultsMap.size(); + }); + Counts.Completed += (int)m_RetiredCount.load(); Counts.ActiveQueues = (int)m_QueueLock.WithSharedLock([&] { size_t Count = 0; for (const auto& [Id, Queue] : m_Queues) @@ -364,8 +365,10 @@ struct ComputeServiceSession::Impl { Cbo << "session_state"sv << ToString(m_SessionState.load(std::memory_order_relaxed)); m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); }); - m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); }); - m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); }); + m_ActionMapLock.WithSharedLock([&] { + Cbo << "actions_complete"sv << m_ResultsMap.size(); + Cbo << "actions_pending"sv << m_PendingActions.size(); + }); Cbo << "actions_submitted"sv << GetSubmittedActionCount(); EmitSnapshot("actions_arrival"sv, m_ArrivalRate, Cbo); EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo); @@ -450,27 +453,17 @@ ComputeServiceSession::Impl::RequestStateTransition(SessionState NewState) void ComputeServiceSession::Impl::AbandonAllActions() { - // Collect all pending actions and mark them as Abandoned + // Collect all pending and running actions under a single lock scope std::vector<Ref<RunnerAction>> PendingToAbandon; + std::vector<Ref<RunnerAction>> RunningToAbandon; - m_PendingLock.WithSharedLock([&] { + m_ActionMapLock.WithSharedLock([&] { PendingToAbandon.reserve(m_PendingActions.size()); for (auto& [Lsn, Action] : m_PendingActions) { PendingToAbandon.push_back(Action); } - }); - - for (auto& Action : PendingToAbandon) - { - Action->SetActionState(RunnerAction::State::Abandoned); - } - // Collect all running actions and mark them as Abandoned, then - // best-effort cancel via the local runner group - std::vector<Ref<RunnerAction>> RunningToAbandon; - - m_RunningLock.WithSharedLock([&] { RunningToAbandon.reserve(m_RunningMap.size()); for (auto& [Lsn, Action] : m_RunningMap) { @@ -478,6 +471,11 @@ ComputeServiceSession::Impl::AbandonAllActions() } }); + for (auto& Action : PendingToAbandon) + { + Action->SetActionState(RunnerAction::State::Abandoned); + } + for (auto& Action : RunningToAbandon) { Action->SetActionState(RunnerAction::State::Abandoned); @@ -742,7 +740,7 @@ std::vector<ComputeServiceSession::RunningActionInfo> ComputeServiceSession::Impl::GetRunningActions() { std::vector<ComputeServiceSession::RunningActionInfo> Result; - m_RunningLock.WithSharedLock([&] { + m_ActionMapLock.WithSharedLock([&] { Result.reserve(m_RunningMap.size()); for (const auto& [Lsn, Action] : m_RunningMap) { @@ -810,6 +808,11 @@ void ComputeServiceSession::Impl::RegisterWorker(CbPackage Worker) { ZEN_TRACE_CPU("ComputeServiceSession::RegisterWorker"); + + // Validate all paths in the worker description upfront, before the worker is + // distributed to runners. This rejects malicious packages early at ingestion time. + ValidateWorkerDescriptionPaths(Worker.GetObject()); + RwLock::ExclusiveLockScope _(m_WorkerLock); const IoHash& WorkerId = Worker.GetObject().GetHash(); @@ -994,10 +997,15 @@ ComputeServiceSession::Impl::EnqueueResolvedAction(int QueueId, WorkerDesc Worke Pending->ActionObj = ActionObj; Pending->Priority = RequestPriority; - // For now simply put action into pending state, so we can do batch scheduling + // Insert into the pending map immediately so the action is visible to + // FindActionResult/GetActionResult right away. SetActionState will call + // PostUpdate which adds the action to m_UpdatedActions and signals the + // scheduler, but the scheduler's HandleActionUpdates inserts with + // std::map::insert which is a no-op for existing keys. ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn); + m_ActionMapLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Pending}); }); Pending->SetActionState(RunnerAction::State::Pending); if (m_Recorder) @@ -1043,11 +1051,7 @@ ComputeServiceSession::Impl::GetSubmittedActionCount() HttpResponseCode ComputeServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) { - // This lock is held for the duration of the function since we need to - // be sure that the action doesn't change state while we are checking the - // different data structures - - RwLock::ExclusiveLockScope _(m_ResultsLock); + RwLock::ExclusiveLockScope _(m_ActionMapLock); if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end()) { @@ -1058,25 +1062,14 @@ ComputeServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResult return HttpResponseCode::OK; } + if (m_PendingActions.find(ActionLsn) != m_PendingActions.end()) { - RwLock::SharedLockScope __(m_PendingLock); - - if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end()) - { - return HttpResponseCode::Accepted; - } + return HttpResponseCode::Accepted; } - // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must - // always be taken after m_ResultsLock if both are needed - + if (m_RunningMap.find(ActionLsn) != m_RunningMap.end()) { - RwLock::SharedLockScope __(m_RunningLock); - - if (m_RunningMap.find(ActionLsn) != m_RunningMap.end()) - { - return HttpResponseCode::Accepted; - } + return HttpResponseCode::Accepted; } return HttpResponseCode::NotFound; @@ -1085,11 +1078,7 @@ ComputeServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResult HttpResponseCode ComputeServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) { - // This lock is held for the duration of the function since we need to - // be sure that the action doesn't change state while we are checking the - // different data structures - - RwLock::ExclusiveLockScope _(m_ResultsLock); + RwLock::ExclusiveLockScope _(m_ActionMapLock); for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It) { @@ -1103,30 +1092,19 @@ ComputeServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& } } + for (const auto& [K, Pending] : m_PendingActions) { - RwLock::SharedLockScope __(m_PendingLock); - - for (const auto& [K, Pending] : m_PendingActions) + if (Pending->ActionId == ActionId) { - if (Pending->ActionId == ActionId) - { - return HttpResponseCode::Accepted; - } + return HttpResponseCode::Accepted; } } - // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must - // always be taken after m_ResultsLock if both are needed - + for (const auto& [K, v] : m_RunningMap) { - RwLock::SharedLockScope __(m_RunningLock); - - for (const auto& [K, v] : m_RunningMap) + if (v->ActionId == ActionId) { - if (v->ActionId == ActionId) - { - return HttpResponseCode::Accepted; - } + return HttpResponseCode::Accepted; } } @@ -1144,7 +1122,7 @@ ComputeServiceSession::Impl::GetCompleted(CbWriter& Cbo) { Cbo.BeginArray("completed"); - m_ResultsLock.WithSharedLock([&] { + m_ActionMapLock.WithSharedLock([&] { for (auto& [Lsn, Action] : m_ResultsMap) { Cbo.BeginObject(); @@ -1275,20 +1253,14 @@ ComputeServiceSession::Impl::CancelQueue(int QueueId) std::vector<Ref<RunnerAction>> PendingActionsToCancel; std::vector<int> RunningLsnsToCancel; - m_PendingLock.WithSharedLock([&] { + m_ActionMapLock.WithSharedLock([&] { for (int Lsn : LsnsToCancel) { if (auto It = m_PendingActions.find(Lsn); It != m_PendingActions.end()) { PendingActionsToCancel.push_back(It->second); } - } - }); - - m_RunningLock.WithSharedLock([&] { - for (int Lsn : LsnsToCancel) - { - if (m_RunningMap.find(Lsn) != m_RunningMap.end()) + else if (m_RunningMap.find(Lsn) != m_RunningMap.end()) { RunningLsnsToCancel.push_back(Lsn); } @@ -1307,7 +1279,7 @@ ComputeServiceSession::Impl::CancelQueue(int QueueId) // transition from the runner is blocked (Cancelled > Failed in the enum). for (int Lsn : RunningLsnsToCancel) { - m_RunningLock.WithSharedLock([&] { + m_ActionMapLock.WithSharedLock([&] { if (auto It = m_RunningMap.find(Lsn); It != m_RunningMap.end()) { It->second->SetActionState(RunnerAction::State::Cancelled); @@ -1445,7 +1417,7 @@ ComputeServiceSession::Impl::GetQueueCompleted(int QueueId, CbWriter& Cbo) if (Queue) { Queue->m_Lock.WithSharedLock([&] { - m_ResultsLock.WithSharedLock([&] { + m_ActionMapLock.WithSharedLock([&] { for (int Lsn : Queue->FinishedLsns) { if (m_ResultsMap.contains(Lsn)) @@ -1541,9 +1513,15 @@ ComputeServiceSession::Impl::SchedulePendingActions() { ZEN_TRACE_CPU("ComputeServiceSession::SchedulePendingActions"); int ScheduledCount = 0; - size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }); - size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }); - size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }); + size_t RunningCount = 0; + size_t PendingCount = 0; + size_t ResultCount = 0; + + m_ActionMapLock.WithSharedLock([&] { + RunningCount = m_RunningMap.size(); + PendingCount = m_PendingActions.size(); + ResultCount = m_ResultsMap.size(); + }); static Stopwatch DumpRunningTimer; @@ -1560,7 +1538,7 @@ ComputeServiceSession::Impl::SchedulePendingActions() DumpRunningTimer.Reset(); std::set<int> RunningList; - m_RunningLock.WithSharedLock([&] { + m_ActionMapLock.WithSharedLock([&] { for (auto& [K, V] : m_RunningMap) { RunningList.insert(K); @@ -1602,7 +1580,7 @@ ComputeServiceSession::Impl::SchedulePendingActions() // Also note that the m_PendingActions list is not maintained // here, that's done periodically in SchedulePendingActions() - m_PendingLock.WithExclusiveLock([&] { + m_ActionMapLock.WithExclusiveLock([&] { if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Paused) { return; @@ -1701,7 +1679,7 @@ ComputeServiceSession::Impl::SchedulerThreadFunction() { int TimeoutMs = 500; - auto PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }); + auto PendingCount = m_ActionMapLock.WithSharedLock([&] { return m_PendingActions.size(); }); if (PendingCount) { @@ -1720,22 +1698,22 @@ ComputeServiceSession::Impl::SchedulerThreadFunction() m_SchedulingThreadEvent.Reset(); } - ZEN_DEBUG("compute scheduler TICK (Pending: {} was {}, Running: {}, Results: {}) timeout: {}", - m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }), - PendingCount, - m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }), - m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }), - TimeoutMs); + m_ActionMapLock.WithSharedLock([&] { + ZEN_DEBUG("compute scheduler TICK (Pending: {}, Running: {}, Results: {}) timeout: {}", + m_PendingActions.size(), + m_RunningMap.size(), + m_ResultsMap.size(), + TimeoutMs); + }); HandleActionUpdates(); // Auto-transition Draining → Paused when all work is done if (m_SessionState.load(std::memory_order_relaxed) == SessionState::Draining) { - size_t Pending = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }); - size_t Running = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }); + bool AllDrained = m_ActionMapLock.WithSharedLock([&] { return m_PendingActions.empty() && m_RunningMap.empty(); }); - if (Pending == 0 && Running == 0) + if (AllDrained) { SessionState Expected = SessionState::Draining; if (m_SessionState.compare_exchange_strong(Expected, SessionState::Paused, std::memory_order_acq_rel)) @@ -1776,9 +1754,9 @@ ComputeServiceSession::Impl::GetMaxRetriesForQueue(int QueueId) if (Config) { - int Value = Config["max_retries"].AsInt32(0); + int Value = Config["max_retries"].AsInt32(-1); - if (Value > 0) + if (Value >= 0) { return Value; } @@ -1797,7 +1775,7 @@ ComputeServiceSession::Impl::RescheduleAction(int ActionLsn) // Find, validate, and remove atomically under a single lock scope to prevent // concurrent RescheduleAction calls from double-removing the same action. - m_ResultsLock.WithExclusiveLock([&] { + m_ActionMapLock.WithExclusiveLock([&] { auto It = m_ResultsMap.find(ActionLsn); if (It == m_ResultsMap.end()) { @@ -1871,26 +1849,20 @@ ComputeServiceSession::Impl::RetractAction(int ActionLsn) bool WasRunning = false; // Look for the action in pending or running maps - m_RunningLock.WithSharedLock([&] { + m_ActionMapLock.WithSharedLock([&] { if (auto It = m_RunningMap.find(ActionLsn); It != m_RunningMap.end()) { Action = It->second; WasRunning = true; } + else if (auto PIt = m_PendingActions.find(ActionLsn); PIt != m_PendingActions.end()) + { + Action = PIt->second; + } }); if (!Action) { - m_PendingLock.WithSharedLock([&] { - if (auto It = m_PendingActions.find(ActionLsn); It != m_PendingActions.end()) - { - Action = It->second; - } - }); - } - - if (!Action) - { return {.Success = false, .Error = "Action not found in pending or running maps"}; } @@ -1912,18 +1884,15 @@ ComputeServiceSession::Impl::RetractAction(int ActionLsn) void ComputeServiceSession::Impl::RemoveActionFromActiveMaps(int ActionLsn) { - m_RunningLock.WithExclusiveLock([&] { - m_PendingLock.WithExclusiveLock([&] { - if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end()) - { - m_PendingActions.erase(ActionLsn); - } - else - { - m_RunningMap.erase(FindIt); - } - }); - }); + // Caller must hold m_ActionMapLock exclusively. + if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end()) + { + m_PendingActions.erase(ActionLsn); + } + else + { + m_RunningMap.erase(FindIt); + } } void @@ -1973,7 +1942,7 @@ ComputeServiceSession::Impl::HandleActionUpdates() } else { - m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); }); + m_ActionMapLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); }); } break; @@ -1983,11 +1952,9 @@ ComputeServiceSession::Impl::HandleActionUpdates() // Dispatched to a runner — move from pending to running case RunnerAction::State::Running: - m_RunningLock.WithExclusiveLock([&] { - m_PendingLock.WithExclusiveLock([&] { - m_RunningMap.insert({ActionLsn, Action}); - m_PendingActions.erase(ActionLsn); - }); + m_ActionMapLock.WithExclusiveLock([&] { + m_RunningMap.insert({ActionLsn, Action}); + m_PendingActions.erase(ActionLsn); }); ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn); break; @@ -1995,7 +1962,10 @@ ComputeServiceSession::Impl::HandleActionUpdates() // Retracted — pull back for rescheduling without counting against retry limit case RunnerAction::State::Retracted: { - RemoveActionFromActiveMaps(ActionLsn); + m_ActionMapLock.WithExclusiveLock([&] { + m_RunningMap.erase(ActionLsn); + m_PendingActions[ActionLsn] = Action; + }); Action->ResetActionStateToPending(); ZEN_INFO("action {} ({}) retracted for rescheduling", Action->ActionId, ActionLsn); break; @@ -2019,7 +1989,10 @@ ComputeServiceSession::Impl::HandleActionUpdates() if (Action->RetryCount.load(std::memory_order_relaxed) < MaxRetries) { - RemoveActionFromActiveMaps(ActionLsn); + m_ActionMapLock.WithExclusiveLock([&] { + m_RunningMap.erase(ActionLsn); + m_PendingActions[ActionLsn] = Action; + }); // Reset triggers PostUpdate() which re-enters the action as Pending Action->ResetActionStateToPending(); @@ -2034,16 +2007,16 @@ ComputeServiceSession::Impl::HandleActionUpdates() } } - RemoveActionFromActiveMaps(ActionLsn); + m_ActionMapLock.WithExclusiveLock([&] { + RemoveActionFromActiveMaps(ActionLsn); - // Update queue counters BEFORE publishing the result into - // m_ResultsMap. GetActionResult erases from m_ResultsMap - // under m_ResultsLock, so if we updated counters after - // releasing that lock, a caller could observe ActiveCount - // still at 1 immediately after GetActionResult returned OK. - NotifyQueueActionComplete(Action->QueueId, ActionLsn, TerminalState); + // Update queue counters BEFORE publishing the result into + // m_ResultsMap. GetActionResult erases from m_ResultsMap + // under m_ActionMapLock, so if we updated counters after + // releasing that lock, a caller could observe ActiveCount + // still at 1 immediately after GetActionResult returned OK. + NotifyQueueActionComplete(Action->QueueId, ActionLsn, TerminalState); - m_ResultsLock.WithExclusiveLock([&] { m_ResultsMap[ActionLsn] = Action; // Append to bounded action history ring @@ -2282,6 +2255,18 @@ ComputeServiceSession::AddLocalRunner(ChunkResolver& InChunkResolver, std::files } void +ComputeServiceSession::AddManagedLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, int32_t MaxConcurrentActions) +{ + ZEN_TRACE_CPU("ComputeServiceSession::AddManagedLocalRunner"); + + auto* NewRunner = + new ManagedProcessRunner(InChunkResolver, BasePath, m_Impl->m_DeferredDeleter, m_Impl->m_LocalSubmitPool, MaxConcurrentActions); + + m_Impl->SyncWorkersToRunner(*NewRunner); + m_Impl->m_LocalRunnerGroup.AddRunner(NewRunner); +} + +void ComputeServiceSession::AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName) { ZEN_TRACE_CPU("ComputeServiceSession::AddRemoteRunner"); diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp index bdfd9d197..bd3f4e70e 100644 --- a/src/zencompute/httpcomputeservice.cpp +++ b/src/zencompute/httpcomputeservice.cpp @@ -93,13 +93,14 @@ struct HttpComputeService::Impl uint64_t NewBytes = 0; }; - IngestStats IngestPackageAttachments(const CbPackage& Package); - bool CheckAttachments(const CbObject& ActionObj, std::vector<IoHash>& NeedList); - void HandleWorkersGet(HttpServerRequest& HttpReq); - void HandleWorkersAllGet(HttpServerRequest& HttpReq); - void WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status); - void HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId); - void HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker); + bool IngestPackageAttachments(HttpServerRequest& HttpReq, const CbPackage& Package, IngestStats& OutStats); + bool CheckAttachments(const CbObject& ActionObj, std::vector<IoHash>& NeedList); + bool ValidateAttachmentHash(HttpServerRequest& HttpReq, const CbAttachment& Attachment); + void HandleWorkersGet(HttpServerRequest& HttpReq); + void HandleWorkersAllGet(HttpServerRequest& HttpReq); + void WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status); + void HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId); + void HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker); // WebSocket / observer void OnWebSocketOpen(Ref<WebSocketConnection> Connection); @@ -373,7 +374,7 @@ HttpComputeService::Impl::RegisterRoutes() if (HttpResponseCode ResponseCode = m_ComputeService.FindActionResult(ActionId, /* out */ Output); ResponseCode != HttpResponseCode::OK) { - ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode)) + ZEN_DEBUG("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode)) if (ResponseCode == HttpResponseCode::NotFound) { @@ -1167,35 +1168,81 @@ HttpComputeService::Impl::ResolveQueueRef(HttpServerRequest& HttpReq, std::strin return ParseInt<int>(Capture).value_or(0); } -HttpComputeService::Impl::IngestStats -HttpComputeService::Impl::IngestPackageAttachments(const CbPackage& Package) +bool +HttpComputeService::Impl::ValidateAttachmentHash(HttpServerRequest& HttpReq, const CbAttachment& Attachment) { - IngestStats Stats; + const IoHash ClaimedHash = Attachment.GetHash(); + CompressedBuffer Buffer = Attachment.AsCompressedBinary(); + const IoHash HeaderHash = Buffer.DecodeRawHash(); + + if (HeaderHash != ClaimedHash) + { + ZEN_WARN("attachment header hash mismatch: claimed {} but header contains {}", ClaimedHash, HeaderHash); + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + return false; + } + + IoHashStream Hasher; + + bool DecompressOk = Buffer.DecompressToStream( + 0, + Buffer.DecodeRawSize(), + [&](uint64_t /*SourceOffset*/, uint64_t /*SourceSize*/, uint64_t /*Offset*/, const CompositeBuffer& Range) -> bool { + for (const SharedBuffer& Segment : Range.GetSegments()) + { + Hasher.Append(Segment.GetView()); + } + return true; + }); + + if (!DecompressOk) + { + ZEN_WARN("attachment {}: failed to decompress", ClaimedHash); + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + return false; + } + + const IoHash ActualHash = Hasher.GetHash(); + + if (ActualHash != ClaimedHash) + { + ZEN_WARN("attachment hash mismatch: claimed {} but decompressed data hashes to {}", ClaimedHash, ActualHash); + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + return false; + } + + return true; +} +bool +HttpComputeService::Impl::IngestPackageAttachments(HttpServerRequest& HttpReq, const CbPackage& Package, IngestStats& OutStats) +{ for (const CbAttachment& Attachment : Package.GetAttachments()) { ZEN_ASSERT(Attachment.IsCompressedBinary()); - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); - - ZEN_UNUSED(DataHash); + if (!ValidateAttachmentHash(HttpReq, Attachment)) + { + return false; + } - const uint64_t CompressedSize = DataView.GetCompressedSize(); + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); + const uint64_t CompressedSize = DataView.GetCompressedSize(); - Stats.Bytes += CompressedSize; - ++Stats.Count; + OutStats.Bytes += CompressedSize; + ++OutStats.Count; const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { - Stats.NewBytes += CompressedSize; - ++Stats.NewCount; + OutStats.NewBytes += CompressedSize; + ++OutStats.NewCount; } } - return Stats; + return true; } bool @@ -1253,7 +1300,10 @@ HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int Que { CbPackage Package = HttpReq.ReadPayloadPackage(); Body = Package.GetObject(); - Stats = IngestPackageAttachments(Package); + if (!IngestPackageAttachments(HttpReq, Package, Stats)) + { + return; // validation failed, response already written + } break; } @@ -1268,8 +1318,7 @@ HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int Que { // --- Batch path --- - // For CbObject payloads, check all attachments upfront before enqueuing anything - if (HttpReq.RequestContentType() == HttpContentType::kCbObject) + // Verify all action attachment references exist in the store { std::vector<IoHash> NeedList; @@ -1345,7 +1394,6 @@ HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int Que // --- Single-action path: Body is the action itself --- - if (HttpReq.RequestContentType() == HttpContentType::kCbObject) { std::vector<IoHash> NeedList; @@ -1491,10 +1539,14 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const { ZEN_ASSERT(Attachment.IsCompressedBinary()); + if (!ValidateAttachmentHash(HttpReq, Attachment)) + { + return; + } + const IoHash DataHash = Attachment.GetHash(); CompressedBuffer Buffer = Attachment.AsCompressedBinary(); - ZEN_UNUSED(DataHash); TotalAttachmentBytes += Buffer.GetCompressedSize(); ++AttachmentCount; diff --git a/src/zencompute/include/zencompute/computeservice.h b/src/zencompute/include/zencompute/computeservice.h index 1ca78738a..ad556f546 100644 --- a/src/zencompute/include/zencompute/computeservice.h +++ b/src/zencompute/include/zencompute/computeservice.h @@ -167,6 +167,7 @@ public: // Action runners void AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, int32_t MaxConcurrentActions = 0); + void AddManagedLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, int32_t MaxConcurrentActions = 0); void AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName); // Action submission diff --git a/src/zencompute/pathvalidation.h b/src/zencompute/pathvalidation.h new file mode 100644 index 000000000..c2e30183a --- /dev/null +++ b/src/zencompute/pathvalidation.h @@ -0,0 +1,118 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compactbinary.h> +#include <zencore/except_fmt.h> +#include <zencore/string.h> + +#include <filesystem> +#include <string_view> + +namespace zen::compute { + +// Validate that a single path component contains only characters that are valid +// file/directory names on all supported platforms. Uses Windows rules as the most +// restrictive superset, since packages may be built on one platform and consumed +// on another. +inline void +ValidatePathComponent(std::string_view Component, std::string_view FullPath) +{ + // Reject control characters (0x00-0x1F) and characters forbidden on Windows + for (char Ch : Component) + { + if (static_cast<unsigned char>(Ch) < 0x20 || Ch == '<' || Ch == '>' || Ch == ':' || Ch == '"' || Ch == '|' || Ch == '?' || + Ch == '*') + { + throw zen::invalid_argument("invalid character in path component '{}' of '{}'", Component, FullPath); + } + } + + // Reject empty components and trailing dots or spaces (silently stripped on Windows, leading to confusion) + if (Component.empty() || Component.back() == '.' || Component.back() == ' ') + { + throw zen::invalid_argument("path component '{}' of '{}' has trailing dot or space", Component, FullPath); + } + + // Reject Windows reserved device names (CON, PRN, AUX, NUL, COM1-9, LPT1-9) + // These are reserved with or without an extension (e.g. "CON.txt" is still reserved). + std::string_view Stem = Component.substr(0, Component.find('.')); + + static constexpr std::string_view ReservedNames[] = { + "CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", + "COM8", "COM9", "LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9", + }; + + for (std::string_view Reserved : ReservedNames) + { + if (zen::StrCaseCompare(Stem, Reserved) == 0) + { + throw zen::invalid_argument("path component '{}' of '{}' uses reserved device name '{}'", Component, FullPath, Reserved); + } + } +} + +// Validate that a path extracted from a package is a safe relative path. +// Rejects absolute paths, ".." components, and invalid platform filenames. +inline void +ValidateSandboxRelativePath(std::string_view Name) +{ + if (Name.empty()) + { + throw zen::invalid_argument("path traversal detected: empty path name"); + } + + std::filesystem::path Parsed(Name); + + if (Parsed.is_absolute()) + { + throw zen::invalid_argument("path traversal detected: '{}' is an absolute path", Name); + } + + for (const auto& Component : Parsed) + { + std::string ComponentStr = Component.string(); + + if (ComponentStr == "..") + { + throw zen::invalid_argument("path traversal detected: '{}' contains '..' component", Name); + } + + // Skip "." (current directory) — harmless in relative paths + if (ComponentStr != ".") + { + ValidatePathComponent(ComponentStr, Name); + } + } +} + +// Validate all path entries in a worker description CbObject. +// Checks path, executables[].name, dirs[], and files[].name fields. +// Throws an exception if any invalid paths are found. +inline void +ValidateWorkerDescriptionPaths(const CbObject& WorkerDescription) +{ + using namespace std::literals; + + if (auto PathField = WorkerDescription["path"sv]; PathField.HasValue()) + { + ValidateSandboxRelativePath(PathField.AsString()); + } + + for (auto& It : WorkerDescription["executables"sv]) + { + ValidateSandboxRelativePath(It.AsObjectView()["name"sv].AsString()); + } + + for (auto& It : WorkerDescription["dirs"sv]) + { + ValidateSandboxRelativePath(It.AsString()); + } + + for (auto& It : WorkerDescription["files"sv]) + { + ValidateSandboxRelativePath(It.AsObjectView()["name"sv].AsString()); + } +} + +} // namespace zen::compute diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp index 4f116e7d8..67e12b84e 100644 --- a/src/zencompute/runners/functionrunner.cpp +++ b/src/zencompute/runners/functionrunner.cpp @@ -164,8 +164,9 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) } } - // Assign any remaining actions to runners with capacity (round-robin) - for (int i = 0; ActionIdx < Actions.size(); i = (i + 1) % RunnerCount) + // Assign any remaining actions to runners with capacity (round-robin). + // Cap at TotalCapacity to avoid spinning when there are more actions than runners can accept. + for (int i = 0; ActionIdx < Actions.size() && ActionIdx < TotalCapacity; i = (i + 1) % RunnerCount) { if (Capacities[i] > PerRunnerActions[i].size()) { @@ -186,11 +187,12 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) } } - // Reassemble results in original action order - std::vector<SubmitResult> Results(Actions.size()); + // Reassemble results in original action order. + // Actions beyond ActionIdx were not assigned to any runner (insufficient capacity). + std::vector<SubmitResult> Results(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No capacity"}); std::vector<size_t> PerRunnerIdx(RunnerCount, 0); - for (size_t i = 0; i < Actions.size(); ++i) + for (size_t i = 0; i < ActionIdx; ++i) { size_t RunnerIdx = ActionRunnerIndex[i]; size_t Idx = PerRunnerIdx[RunnerIdx]++; diff --git a/src/zencompute/runners/linuxrunner.cpp b/src/zencompute/runners/linuxrunner.cpp index e79a6c90f..9055005d9 100644 --- a/src/zencompute/runners/linuxrunner.cpp +++ b/src/zencompute/runners/linuxrunner.cpp @@ -331,6 +331,8 @@ LinuxProcessRunner::LinuxProcessRunner(ChunkResolver& Resolver, { ZEN_INFO("namespace sandboxing enabled for child processes"); } + + StartMonitorThread(); } SubmitResult diff --git a/src/zencompute/runners/localrunner.cpp b/src/zencompute/runners/localrunner.cpp index b61e0a46f..1b748c0e5 100644 --- a/src/zencompute/runners/localrunner.cpp +++ b/src/zencompute/runners/localrunner.cpp @@ -4,6 +4,8 @@ #if ZEN_WITH_COMPUTE_SERVICES +# include "pathvalidation.h" + # include <zencore/compactbinary.h> # include <zencore/compactbinarybuilder.h> # include <zencore/compactbinarypackage.h> @@ -104,8 +106,6 @@ LocalProcessRunner::LocalProcessRunner(ChunkResolver& Resolver, ZEN_INFO("Cleanup complete"); } - m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this}; - # if ZEN_PLATFORM_WINDOWS // Suppress any error dialogs caused by missing dependencies UINT OldMode = ::SetErrorMode(0); @@ -382,6 +382,8 @@ LocalProcessRunner::DecompressAttachmentToFile(const CbPackage& FromP const IoHash ChunkHash = FileEntry["hash"sv].AsHash(); const uint64_t Size = FileEntry["size"sv].AsUInt64(); + ValidateSandboxRelativePath(Name); + CompressedBuffer Compressed; if (const CbAttachment* Attachment = FromPackage.FindAttachment(ChunkHash)) @@ -457,7 +459,8 @@ LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage, for (auto& It : WorkerDescription["dirs"sv]) { - std::string_view Name = It.AsString(); + std::string_view Name = It.AsString(); + ValidateSandboxRelativePath(Name); std::filesystem::path DirPath{SandboxPath / std::filesystem::path(Name).make_preferred()}; // Validate dir path stays within sandbox @@ -482,6 +485,8 @@ LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage, } WriteFile(SandboxPath / "worker.zcb", WorkerDescription.GetBuffer().AsIoBuffer()); + + ZEN_INFO("manifested worker '{}' in '{}'", WorkerPackage.GetObjectHash(), SandboxPath); } CbPackage @@ -540,6 +545,12 @@ LocalProcessRunner::GatherActionOutputs(std::filesystem::path SandboxPath) } void +LocalProcessRunner::StartMonitorThread() +{ + m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this}; +} + +void LocalProcessRunner::MonitorThreadFunction() { SetCurrentThreadName("LocalProcessRunner_Monitor"); diff --git a/src/zencompute/runners/localrunner.h b/src/zencompute/runners/localrunner.h index b8cff6826..d6589db43 100644 --- a/src/zencompute/runners/localrunner.h +++ b/src/zencompute/runners/localrunner.h @@ -67,6 +67,7 @@ protected: { Ref<RunnerAction> Action; void* ProcessHandle = nullptr; + int Pid = 0; int ExitCode = 0; std::filesystem::path SandboxPath; @@ -83,8 +84,6 @@ protected: std::filesystem::path m_SandboxPath; int32_t m_MaxRunningActions = 64; // arbitrary limit for testing - // if used in conjuction with m_ResultsLock, this lock must be taken *after* - // m_ResultsLock to avoid deadlocks RwLock m_RunningLock; std::unordered_map<int, Ref<RunningAction>> m_RunningMap; @@ -95,6 +94,7 @@ protected: std::thread m_MonitorThread; std::atomic<bool> m_MonitorThreadEnabled{true}; Event m_MonitorThreadEvent; + void StartMonitorThread(); void MonitorThreadFunction(); virtual void SweepRunningActions(); virtual void CancelRunningActions(); diff --git a/src/zencompute/runners/macrunner.cpp b/src/zencompute/runners/macrunner.cpp index 5cec90699..c2ccca9a6 100644 --- a/src/zencompute/runners/macrunner.cpp +++ b/src/zencompute/runners/macrunner.cpp @@ -130,6 +130,8 @@ MacProcessRunner::MacProcessRunner(ChunkResolver& Resolver, { ZEN_INFO("Seatbelt sandboxing enabled for child processes"); } + + StartMonitorThread(); } SubmitResult diff --git a/src/zencompute/runners/managedrunner.cpp b/src/zencompute/runners/managedrunner.cpp new file mode 100644 index 000000000..e4a7ba388 --- /dev/null +++ b/src/zencompute/runners/managedrunner.cpp @@ -0,0 +1,279 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "managedrunner.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/except_fmt.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/scopeguard.h> +# include <zencore/timer.h> +# include <zencore/trace.h> + +ZEN_THIRD_PARTY_INCLUDES_START +# include <asio/io_context.hpp> +# include <asio/executor_work_guard.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen::compute { + +using namespace std::literals; + +ManagedProcessRunner::ManagedProcessRunner(ChunkResolver& Resolver, + const std::filesystem::path& BaseDir, + DeferredDirectoryDeleter& Deleter, + WorkerThreadPool& WorkerPool, + int32_t MaxConcurrentActions) +: LocalProcessRunner(Resolver, BaseDir, Deleter, WorkerPool, MaxConcurrentActions) +, m_IoContext(std::make_unique<asio::io_context>()) +, m_SubprocessManager(std::make_unique<SubprocessManager>(*m_IoContext)) +{ + m_ProcessGroup = m_SubprocessManager->CreateGroup("compute-workers"); + + // Run the io_context on a small thread pool so that exit callbacks and + // metrics sampling are dispatched without blocking each other. + for (int i = 0; i < kIoThreadCount; ++i) + { + m_IoThreads.emplace_back([this, i] { + SetCurrentThreadName(fmt::format("mrunner_{}", i)); + + // work_guard keeps run() alive even when there is no pending work yet + auto WorkGuard = asio::make_work_guard(*m_IoContext); + + m_IoContext->run(); + }); + } +} + +ManagedProcessRunner::~ManagedProcessRunner() +{ + try + { + Shutdown(); + } + catch (std::exception& Ex) + { + ZEN_WARN("exception during managed process runner shutdown: {}", Ex.what()); + } +} + +void +ManagedProcessRunner::Shutdown() +{ + ZEN_TRACE_CPU("ManagedProcessRunner::Shutdown"); + m_AcceptNewActions = false; + + CancelRunningActions(); + + // Tear down the SubprocessManager before stopping the io_context so that + // any in-flight callbacks are drained cleanly. + if (m_SubprocessManager) + { + m_SubprocessManager->DestroyGroup("compute-workers"); + m_ProcessGroup = nullptr; + m_SubprocessManager.reset(); + } + + if (m_IoContext) + { + m_IoContext->stop(); + } + + for (std::thread& Thread : m_IoThreads) + { + if (Thread.joinable()) + { + Thread.join(); + } + } + m_IoThreads.clear(); +} + +SubmitResult +ManagedProcessRunner::SubmitAction(Ref<RunnerAction> Action) +{ + ZEN_TRACE_CPU("ManagedProcessRunner::SubmitAction"); + std::optional<PreparedAction> Prepared = PrepareActionSubmission(Action); + + if (!Prepared) + { + return SubmitResult{.IsAccepted = false}; + } + + CbObject WorkerDescription = Prepared->WorkerPackage.GetObject(); + + // Parse environment variables from worker descriptor ("KEY=VALUE" strings) + // into the key-value pairs expected by CreateProcOptions. + std::vector<std::pair<std::string, std::string>> EnvPairs; + for (auto& It : WorkerDescription["environment"sv]) + { + std::string_view Str = It.AsString(); + size_t Eq = Str.find('='); + if (Eq != std::string_view::npos) + { + EnvPairs.emplace_back(std::string(Str.substr(0, Eq)), std::string(Str.substr(Eq + 1))); + } + } + + // Build command line + std::string_view ExecPath = WorkerDescription["path"sv].AsString(); + std::filesystem::path ExePath = Prepared->WorkerPath / std::filesystem::path(ExecPath).make_preferred(); + + std::string CommandLine = fmt::format("\"{}\" -Build=build.action"sv, ExePath.string()); + + ZEN_DEBUG("Executing (managed): '{}' (sandbox='{}')", CommandLine, Prepared->SandboxPath); + + CreateProcOptions Options; + Options.WorkingDirectory = &Prepared->SandboxPath; + Options.Flags = CreateProcOptions::Flag_NoConsole; + Options.Environment = std::move(EnvPairs); + + const int32_t ActionLsn = Prepared->ActionLsn; + + ManagedProcess* Proc = nullptr; + + try + { + Proc = m_ProcessGroup->Spawn(ExePath, CommandLine, Options, [this, ActionLsn](ManagedProcess& /*Process*/, int ExitCode) { + OnProcessExit(ActionLsn, ExitCode); + }); + } + catch (std::exception& Ex) + { + ZEN_ERROR("Failed to spawn process for action LSN {}: {}", ActionLsn, Ex.what()); + m_DeferredDeleter.Enqueue(ActionLsn, std::move(Prepared->SandboxPath)); + return SubmitResult{.IsAccepted = false}; + } + + { + Ref<RunningAction> NewAction{new RunningAction()}; + NewAction->Action = Action; + NewAction->ProcessHandle = static_cast<void*>(Proc); + NewAction->Pid = Proc->Pid(); + NewAction->SandboxPath = std::move(Prepared->SandboxPath); + + RwLock::ExclusiveLockScope _(m_RunningLock); + m_RunningMap[ActionLsn] = std::move(NewAction); + } + + Action->SetActionState(RunnerAction::State::Running); + + ZEN_DEBUG("Managed runner: action LSN {} -> PID {}", ActionLsn, Proc->Pid()); + + return SubmitResult{.IsAccepted = true}; +} + +void +ManagedProcessRunner::OnProcessExit(int ActionLsn, int ExitCode) +{ + ZEN_TRACE_CPU("ManagedProcessRunner::OnProcessExit"); + + Ref<RunningAction> Running; + + m_RunningLock.WithExclusiveLock([&] { + auto It = m_RunningMap.find(ActionLsn); + if (It != m_RunningMap.end()) + { + Running = std::move(It->second); + m_RunningMap.erase(It); + } + }); + + if (!Running) + { + return; + } + + ZEN_DEBUG("Managed runner: action LSN {} + PID {} exited with code " ZEN_BRIGHT_WHITE("{}"), ActionLsn, Running->Pid, ExitCode); + + Running->ExitCode = ExitCode; + + // Capture final CPU metrics from the managed process before it is removed. + auto* Proc = static_cast<ManagedProcess*>(Running->ProcessHandle); + if (Proc) + { + ProcessMetrics Metrics = Proc->GetLatestMetrics(); + float CpuMs = static_cast<float>(Metrics.UserTimeMs + Metrics.KernelTimeMs); + Running->Action->CpuSeconds.store(CpuMs / 1000.0f, std::memory_order_relaxed); + + float CpuPct = Proc->GetCpuUsagePercent(); + if (CpuPct >= 0.0f) + { + Running->Action->CpuUsagePercent.store(CpuPct, std::memory_order_relaxed); + } + } + + Running->ProcessHandle = nullptr; + + std::vector<Ref<RunningAction>> CompletedActions; + CompletedActions.push_back(std::move(Running)); + ProcessCompletedActions(CompletedActions); +} + +void +ManagedProcessRunner::CancelRunningActions() +{ + ZEN_TRACE_CPU("ManagedProcessRunner::CancelRunningActions"); + + std::unordered_map<int, Ref<RunningAction>> RunningMap; + m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); }); + + if (RunningMap.empty()) + { + return; + } + + ZEN_INFO("cancelling {} running actions via process group", RunningMap.size()); + + Stopwatch Timer; + + // Kill all processes in the group atomically (TerminateJobObject on Windows, + // SIGTERM+SIGKILL on POSIX). + if (m_ProcessGroup) + { + m_ProcessGroup->KillAll(); + } + + for (auto& [Lsn, Running] : RunningMap) + { + m_DeferredDeleter.Enqueue(Running->Action->ActionLsn, std::move(Running->SandboxPath)); + Running->Action->SetActionState(RunnerAction::State::Failed); + } + + ZEN_INFO("DONE - cancelled {} running processes (took {})", RunningMap.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); +} + +bool +ManagedProcessRunner::CancelAction(int ActionLsn) +{ + ZEN_TRACE_CPU("ManagedProcessRunner::CancelAction"); + + ManagedProcess* Proc = nullptr; + + m_RunningLock.WithSharedLock([&] { + auto It = m_RunningMap.find(ActionLsn); + if (It != m_RunningMap.end() && It->second->ProcessHandle != nullptr) + { + Proc = static_cast<ManagedProcess*>(It->second->ProcessHandle); + } + }); + + if (!Proc) + { + return false; + } + + // Terminate the process. The exit callback will handle the rest + // (remove from running map, gather outputs or mark failed). + Proc->Terminate(222); + + ZEN_DEBUG("CancelAction: initiated cancellation of LSN {}", ActionLsn); + return true; +} + +} // namespace zen::compute + +#endif diff --git a/src/zencompute/runners/managedrunner.h b/src/zencompute/runners/managedrunner.h new file mode 100644 index 000000000..21a44d43c --- /dev/null +++ b/src/zencompute/runners/managedrunner.h @@ -0,0 +1,64 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "localrunner.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zenutil/process/subprocessmanager.h> + +# include <memory> + +namespace asio { +class io_context; +} + +namespace zen::compute { + +/** Cross-platform process runner backed by SubprocessManager. + + Subclasses LocalProcessRunner, reusing sandbox management, worker manifesting, + input/output handling, and shared action preparation. Replaces the polling-based + monitor thread with async exit callbacks driven by SubprocessManager, and + delegates CPU/memory metrics sampling to the manager's built-in round-robin + sampler. + + A ProcessGroup (backed by a JobObject on Windows, process group on POSIX) is + used for bulk cancellation on shutdown. + + This runner does not perform any platform-specific sandboxing (AppContainer, + namespaces, Seatbelt). It is intended as a simpler, cross-platform alternative + to the platform-specific runners for non-sandboxed workloads. + */ +class ManagedProcessRunner : public LocalProcessRunner +{ +public: + ManagedProcessRunner(ChunkResolver& Resolver, + const std::filesystem::path& BaseDir, + DeferredDirectoryDeleter& Deleter, + WorkerThreadPool& WorkerPool, + int32_t MaxConcurrentActions = 0); + ~ManagedProcessRunner(); + + void Shutdown() override; + [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action) override; + void CancelRunningActions() override; + bool CancelAction(int ActionLsn) override; + [[nodiscard]] bool IsHealthy() override { return true; } + +private: + static constexpr int kIoThreadCount = 4; + + // Exit callback posted on an io_context thread. + void OnProcessExit(int ActionLsn, int ExitCode); + + std::unique_ptr<asio::io_context> m_IoContext; + std::unique_ptr<SubprocessManager> m_SubprocessManager; + ProcessGroup* m_ProcessGroup = nullptr; + std::vector<std::thread> m_IoThreads; +}; + +} // namespace zen::compute + +#endif diff --git a/src/zencompute/runners/windowsrunner.cpp b/src/zencompute/runners/windowsrunner.cpp index cd4b646e9..92ee65c2d 100644 --- a/src/zencompute/runners/windowsrunner.cpp +++ b/src/zencompute/runners/windowsrunner.cpp @@ -21,6 +21,12 @@ ZEN_THIRD_PARTY_INCLUDES_START # include <sddl.h> ZEN_THIRD_PARTY_INCLUDES_END +// JOB_OBJECT_UILIMIT_ERRORMODE is defined in winuser.h which may be +// excluded by WIN32_LEAN_AND_MEAN. +# if !defined(JOB_OBJECT_UILIMIT_ERRORMODE) +# define JOB_OBJECT_UILIMIT_ERRORMODE 0x00000400 +# endif + namespace zen::compute { using namespace std::literals; @@ -34,38 +40,65 @@ WindowsProcessRunner::WindowsProcessRunner(ChunkResolver& Resolver, : LocalProcessRunner(Resolver, BaseDir, Deleter, WorkerPool, MaxConcurrentActions) , m_Sandboxed(Sandboxed) { - if (!m_Sandboxed) + // Create a job object shared by all child processes. Restricting the + // error-mode UI prevents crash dialogs (WER / Dr. Watson) from + // blocking the monitor thread when a worker process terminates + // abnormally. + m_JobObject = CreateJobObjectW(nullptr, nullptr); + if (m_JobObject) { - return; + JOBOBJECT_EXTENDED_LIMIT_INFORMATION ExtLimits{}; + ExtLimits.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE | JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION; + SetInformationJobObject(m_JobObject, JobObjectExtendedLimitInformation, &ExtLimits, sizeof(ExtLimits)); + + JOBOBJECT_BASIC_UI_RESTRICTIONS UiRestrictions{}; + UiRestrictions.UIRestrictionsClass = JOB_OBJECT_UILIMIT_ERRORMODE; + SetInformationJobObject(m_JobObject, JobObjectBasicUIRestrictions, &UiRestrictions, sizeof(UiRestrictions)); + + // Set error mode on this process so children inherit it. The + // UILIMIT_ERRORMODE restriction above prevents them from clearing + // SEM_NOGPFAULTERRORBOX. + SetErrorMode(SEM_FAILCRITICALERRORS | SEM_NOGPFAULTERRORBOX); } - // Build a unique profile name per process to avoid collisions - m_AppContainerName = L"zenserver-sandbox-" + std::to_wstring(GetCurrentProcessId()); + if (m_Sandboxed) + { + // Build a unique profile name per process to avoid collisions + m_AppContainerName = L"zenserver-sandbox-" + std::to_wstring(GetCurrentProcessId()); - // Clean up any stale profile from a previous crash - DeleteAppContainerProfile(m_AppContainerName.c_str()); + // Clean up any stale profile from a previous crash + DeleteAppContainerProfile(m_AppContainerName.c_str()); - PSID Sid = nullptr; + PSID Sid = nullptr; - HRESULT Hr = CreateAppContainerProfile(m_AppContainerName.c_str(), - m_AppContainerName.c_str(), // display name - m_AppContainerName.c_str(), // description - nullptr, // no capabilities - 0, // capability count - &Sid); + HRESULT Hr = CreateAppContainerProfile(m_AppContainerName.c_str(), + m_AppContainerName.c_str(), // display name + m_AppContainerName.c_str(), // description + nullptr, // no capabilities + 0, // capability count + &Sid); - if (FAILED(Hr)) - { - throw zen::runtime_error("CreateAppContainerProfile failed: HRESULT 0x{:08X}", static_cast<uint32_t>(Hr)); - } + if (FAILED(Hr)) + { + throw zen::runtime_error("CreateAppContainerProfile failed: HRESULT 0x{:08X}", static_cast<uint32_t>(Hr)); + } - m_AppContainerSid = Sid; + m_AppContainerSid = Sid; + + ZEN_INFO("AppContainer sandboxing enabled for child processes (profile={})", WideToUtf8(m_AppContainerName)); + } - ZEN_INFO("AppContainer sandboxing enabled for child processes (profile={})", WideToUtf8(m_AppContainerName)); + StartMonitorThread(); } WindowsProcessRunner::~WindowsProcessRunner() { + if (m_JobObject) + { + CloseHandle(m_JobObject); + m_JobObject = nullptr; + } + if (m_AppContainerSid) { FreeSid(m_AppContainerSid); @@ -172,9 +205,9 @@ WindowsProcessRunner::SubmitAction(Ref<RunnerAction> Action) LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr; LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr; BOOL bInheritHandles = FALSE; - DWORD dwCreationFlags = DETACHED_PROCESS; + DWORD dwCreationFlags = CREATE_SUSPENDED | DETACHED_PROCESS; - ZEN_DEBUG("Executing: {} (sandboxed={})", WideToUtf8(CommandLine.c_str()), m_Sandboxed); + ZEN_DEBUG("{}: '{}' (sandbox='{}')", m_Sandboxed ? "Sandboxing" : "Executing", WideToUtf8(CommandLine.c_str()), Prepared->SandboxPath); CommandLine.EnsureNulTerminated(); @@ -260,14 +293,21 @@ WindowsProcessRunner::SubmitAction(Ref<RunnerAction> Action) } } - CloseHandle(ProcessInformation.hThread); + if (m_JobObject) + { + AssignProcessToJobObject(m_JobObject, ProcessInformation.hProcess); + } - Ref<RunningAction> NewAction{new RunningAction()}; - NewAction->Action = Action; - NewAction->ProcessHandle = ProcessInformation.hProcess; - NewAction->SandboxPath = std::move(Prepared->SandboxPath); + ResumeThread(ProcessInformation.hThread); + CloseHandle(ProcessInformation.hThread); { + Ref<RunningAction> NewAction{new RunningAction()}; + NewAction->Action = Action; + NewAction->ProcessHandle = ProcessInformation.hProcess; + NewAction->Pid = ProcessInformation.dwProcessId; + NewAction->SandboxPath = std::move(Prepared->SandboxPath); + RwLock::ExclusiveLockScope _(m_RunningLock); m_RunningMap[Prepared->ActionLsn] = std::move(NewAction); @@ -275,6 +315,8 @@ WindowsProcessRunner::SubmitAction(Ref<RunnerAction> Action) Action->SetActionState(RunnerAction::State::Running); + ZEN_DEBUG("Local runner: action LSN {} -> PID {}", Action->ActionLsn, ProcessInformation.dwProcessId); + return SubmitResult{.IsAccepted = true}; } @@ -294,6 +336,11 @@ WindowsProcessRunner::SweepRunningActions() if (IsSuccess && ExitCode != STILL_ACTIVE) { + ZEN_DEBUG("Local runner: action LSN {} + PID {} exited with code " ZEN_BRIGHT_WHITE("{}"), + Running->Action->ActionLsn, + Running->Pid, + ExitCode); + CloseHandle(Running->ProcessHandle); Running->ProcessHandle = INVALID_HANDLE_VALUE; Running->ExitCode = ExitCode; diff --git a/src/zencompute/runners/windowsrunner.h b/src/zencompute/runners/windowsrunner.h index 9f2385cc4..adeaf02fc 100644 --- a/src/zencompute/runners/windowsrunner.h +++ b/src/zencompute/runners/windowsrunner.h @@ -46,6 +46,7 @@ private: bool m_Sandboxed = false; PSID m_AppContainerSid = nullptr; std::wstring m_AppContainerName; + HANDLE m_JobObject = nullptr; }; } // namespace zen::compute diff --git a/src/zencompute/runners/winerunner.cpp b/src/zencompute/runners/winerunner.cpp index 506bec73b..b4fafb467 100644 --- a/src/zencompute/runners/winerunner.cpp +++ b/src/zencompute/runners/winerunner.cpp @@ -36,6 +36,8 @@ WineProcessRunner::WineProcessRunner(ChunkResolver& Resolver, sigemptyset(&Action.sa_mask); Action.sa_handler = SIG_DFL; sigaction(SIGCHLD, &Action, nullptr); + + StartMonitorThread(); } SubmitResult |