aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-30 15:07:08 +0200
committerGitHub Enterprise <[email protected]>2026-03-30 15:07:08 +0200
commit3540d676733efaddecf504b30e9a596465bd43f8 (patch)
tree7a8d8b3d2da993e30c34e3ff36f659b90a2b228e /src/zencompute
parentinclude rawHash in structure output for builds ls command (#903) (diff)
downloadzen-3540d676733efaddecf504b30e9a596465bd43f8.tar.xz
zen-3540d676733efaddecf504b30e9a596465bd43f8.zip
Request validation and resilience improvements (#864)
### Security: Input validation & path safety - **Reject local file references by default** in package parsing — only allow when explicitly opted in by the service (`ParseFlags::kAllowLocalReferences`) and validated by an `ILocalRefPolicy` (fail-closed: no policy = rejected) - **`DataRootLocalRefPolicy`** restricts local ref paths to the server's data root via canonical path prefix matching - **Validate attachment hashes** in compute HTTP handlers — decompresses and re-hashes each attachment at ingestion time to reject tampered payloads - **Path traversal validation** for worker descriptions (`pathvalidation.h`) — rejects absolute paths, `..` components, Windows reserved device names, and invalid filename characters - **Harden CbPackage parsing** against corrupt inputs — overflow-safe attachment count, bounds checks on local ref offset/size, graceful failure instead of `ZEN_ASSERT` for untrusted data - **Harden legacy package parser** — reject zero-size binary fields, missing mappers, and optionally validate resolved attachment hashes - **Bounds check in `CbPackageReader::MarshalLocalChunkReference`** — detect when `MakeFromFile` silently clamps offset+size to file size ### Reliability: Lock consolidation & bug fixes - **Consolidate three action map locks into one** (`m_ActionMapLock`) — eliminates deadlock risk from multi-lock ordering, simplifies state transitions, and fixes a race where newly enqueued actions were briefly invisible to `GetActionResult`/`FindActionResult` - **Fix infinite loop in `BaseRunnerGroup::SubmitActions`** when actions exceed total runner capacity — cap round-robin at `TotalCapacity` and default unassigned results to "No capacity" - **Fix `MakeSafeAbsolutePathInPlace` for UNC paths** — `\server\share` now correctly becomes `\?\UNC\server\share` instead of `\?\server\share` - **Fix `max_retries=0`** — previously fell through to the default of 3; now correctly means "no retries" ### New: ManagedProcessRunner - Cross-platform process runner backed by `SubprocessManager` — uses async exit callbacks instead of polling, delegates CPU/memory metrics to the manager's built-in sampler - `ProcessGroup` (JobObject on Windows, process group on POSIX) for bulk cancellation on shutdown - `--managed` flag on `zen exec inproc` to select this runner - Refactored monitor thread lifecycle — `StartMonitorThread()` now called from derived constructors to avoid calling virtual functions from base constructor ### Process management - **Suppress crash dialogs** via `JOB_OBJECT_UILIMIT_ERRORMODE` + `SEM_NOGPFAULTERRORBOX` in both `WindowsProcessRunner` and `JobObject::Initialize` — prevents WER/Dr. Watson modal dialogs from blocking the monitor thread - **CREATE_SUSPENDED → AssignProcessToJobObject → ResumeThread** pattern in `WindowsProcessRunner` — ensures job object assignment before process execution - **Move stdout/stderr callbacks to `Spawn()` parameters** in `SubprocessManager` — prevents race where early output could be missed before callback installation - Consistent PID logging across all runner types ### Test infrastructure - **`zentest-appstub`**: Added `Fail` (configurable exit code) and `Crash` (abort / nullptr deref) test functions - **Compute integration tests**: exit code handling, auto-retry exhaustion, manual reschedule after failure, mixed success/failure queues, crash handling (abort + nullptr), crash auto-retry, immediate query visibility after enqueue - **Package format tests**: truncated header, bad magic, attachment count overflow, truncated data, local ref rejection/acceptance, policy enforcement (inside/outside root, traversal, no-policy fail-closed) - **Legacy package parser tests**: empty input, zero-size binary, hash resolution with/without mapper, hash mismatch detection - **UNC path tests** for `MakeSafeAbsolutePath` ### Misc - ANSI color helper macros (`ZEN_RED`, `ZEN_BRIGHT_WHITE`, etc.) and `ZEN_BOLD`/`ZEN_DIM`/etc. - Generic `fmt::formatter` for types with free `ToString` functions - Compute dashboard: truncated hash display with monospace font and hover for full value - Renamed `usonpackage_forcelink` → `cbpackage_forcelink` - Compute enabled by default in xmake config (releases still explicitly disable)
Diffstat (limited to 'src/zencompute')
-rw-r--r--src/zencompute/CLAUDE.md10
-rw-r--r--src/zencompute/computeservice.cpp257
-rw-r--r--src/zencompute/httpcomputeservice.cpp104
-rw-r--r--src/zencompute/include/zencompute/computeservice.h1
-rw-r--r--src/zencompute/pathvalidation.h118
-rw-r--r--src/zencompute/runners/functionrunner.cpp12
-rw-r--r--src/zencompute/runners/linuxrunner.cpp2
-rw-r--r--src/zencompute/runners/localrunner.cpp17
-rw-r--r--src/zencompute/runners/localrunner.h4
-rw-r--r--src/zencompute/runners/macrunner.cpp2
-rw-r--r--src/zencompute/runners/managedrunner.cpp279
-rw-r--r--src/zencompute/runners/managedrunner.h64
-rw-r--r--src/zencompute/runners/windowsrunner.cpp99
-rw-r--r--src/zencompute/runners/windowsrunner.h1
-rw-r--r--src/zencompute/runners/winerunner.cpp2
15 files changed, 767 insertions, 205 deletions
diff --git a/src/zencompute/CLAUDE.md b/src/zencompute/CLAUDE.md
index a1a39fc3c..750879d5a 100644
--- a/src/zencompute/CLAUDE.md
+++ b/src/zencompute/CLAUDE.md
@@ -141,7 +141,7 @@ Actions that fail or are abandoned can be automatically retried or manually resc
**Manual retry (API path):** `POST /compute/jobs/{lsn}` calls `RescheduleAction()`, which finds the action in `m_ResultsMap`, validates state (must be Failed or Abandoned), checks the retry limit, reverses queue counters (moving the LSN from `FinishedLsns` back to `ActiveLsns`), removes from results, and calls `ResetActionStateToPending()`. Returns 200 with `{lsn, retry_count}` on success, 409 Conflict with `{error}` on failure.
-**Retry limit:** Default of 3, overridable per-queue via the `max_retries` integer field in the queue's `Config` CbObject (set at `CreateQueue` time). Both automatic and manual paths respect this limit.
+**Retry limit:** Default of 3, overridable per-queue via the `max_retries` integer field in the queue's `Config` CbObject (set at `CreateQueue` time). Setting `max_retries=0` disables automatic retry entirely; omitting the field (or setting it to a negative value) uses the default of 3. Both automatic and manual paths respect this limit.
**Retraction (API path):** `RetractAction(Lsn)` pulls a Pending/Submitting/Running action back for rescheduling on a different runner. The action transitions to Retracted, then `ResetActionStateToPending()` is called *without* incrementing `RetryCount`. Retraction is idempotent.
@@ -156,7 +156,7 @@ Queues group actions from a single client session. A `QueueEntry` (internal) tra
- `ActiveLsns` — for cancellation lookup (under `m_Lock`)
- `FinishedLsns` — moved here when actions complete
- `IdleSince` — used for 15-minute automatic expiry
-- `Config` — CbObject set at creation; supports `max_retries` (int) to override the default retry limit
+- `Config` — CbObject set at creation; supports `max_retries` (int, default 3) to override the default retry limit. `0` = no retries, negative or absent = use default
**Queue state machine (`QueueState` enum):**
```
@@ -216,11 +216,7 @@ Worker handler logic is extracted into private helpers (`HandleWorkersGet`, `Han
## Concurrency Model
-**Locking discipline:** When multiple locks must be held simultaneously, always acquire in this order to prevent deadlocks:
-1. `m_ResultsLock`
-2. `m_RunningLock` (comment in localrunner.h: "must be taken *after* m_ResultsLock")
-3. `m_PendingLock`
-4. `m_QueueLock`
+**Locking discipline:** The three action maps (`m_PendingActions`, `m_RunningMap`, `m_ResultsMap`) are guarded by a single `m_ActionMapLock`. This eliminates lock-ordering concerns between maps and prevents actions from being temporarily absent from all maps during state transitions. Runner-level `m_RunningLock` in `LocalProcessRunner` / `RemoteHttpRunner` is a separate lock on a different class — unrelated to the session-level action map lock.
**Atomic fields** for counters and simple state: queue counts, `CpuUsagePercent`, `CpuSeconds`, `RetryCount`, `RunnerAction::m_ActionState`.
diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp
index 92901de64..58761556a 100644
--- a/src/zencompute/computeservice.cpp
+++ b/src/zencompute/computeservice.cpp
@@ -8,6 +8,8 @@
# include "recording/actionrecorder.h"
# include "runners/localrunner.h"
# include "runners/remotehttprunner.h"
+# include "runners/managedrunner.h"
+# include "pathvalidation.h"
# if ZEN_PLATFORM_LINUX
# include "runners/linuxrunner.h"
# elif ZEN_PLATFORM_WINDOWS
@@ -195,13 +197,9 @@ struct ComputeServiceSession::Impl
std::atomic<IComputeCompletionObserver*> m_CompletionObserver{nullptr};
- RwLock m_PendingLock;
- std::map<int, Ref<RunnerAction>> m_PendingActions;
-
- RwLock m_RunningLock;
+ RwLock m_ActionMapLock; // Guards m_PendingActions, m_RunningMap, m_ResultsMap
+ std::map<int, Ref<RunnerAction>> m_PendingActions;
std::unordered_map<int, Ref<RunnerAction>> m_RunningMap;
-
- RwLock m_ResultsLock;
std::unordered_map<int, Ref<RunnerAction>> m_ResultsMap;
metrics::Meter m_ResultRate;
std::atomic<uint64_t> m_RetiredCount{0};
@@ -343,9 +341,12 @@ struct ComputeServiceSession::Impl
ActionCounts GetActionCounts()
{
ActionCounts Counts;
- Counts.Pending = (int)m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
- Counts.Running = (int)m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); });
- Counts.Completed = (int)m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }) + (int)m_RetiredCount.load();
+ m_ActionMapLock.WithSharedLock([&] {
+ Counts.Pending = (int)m_PendingActions.size();
+ Counts.Running = (int)m_RunningMap.size();
+ Counts.Completed = (int)m_ResultsMap.size();
+ });
+ Counts.Completed += (int)m_RetiredCount.load();
Counts.ActiveQueues = (int)m_QueueLock.WithSharedLock([&] {
size_t Count = 0;
for (const auto& [Id, Queue] : m_Queues)
@@ -364,8 +365,10 @@ struct ComputeServiceSession::Impl
{
Cbo << "session_state"sv << ToString(m_SessionState.load(std::memory_order_relaxed));
m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); });
- m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); });
- m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); });
+ m_ActionMapLock.WithSharedLock([&] {
+ Cbo << "actions_complete"sv << m_ResultsMap.size();
+ Cbo << "actions_pending"sv << m_PendingActions.size();
+ });
Cbo << "actions_submitted"sv << GetSubmittedActionCount();
EmitSnapshot("actions_arrival"sv, m_ArrivalRate, Cbo);
EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo);
@@ -450,27 +453,17 @@ ComputeServiceSession::Impl::RequestStateTransition(SessionState NewState)
void
ComputeServiceSession::Impl::AbandonAllActions()
{
- // Collect all pending actions and mark them as Abandoned
+ // Collect all pending and running actions under a single lock scope
std::vector<Ref<RunnerAction>> PendingToAbandon;
+ std::vector<Ref<RunnerAction>> RunningToAbandon;
- m_PendingLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
PendingToAbandon.reserve(m_PendingActions.size());
for (auto& [Lsn, Action] : m_PendingActions)
{
PendingToAbandon.push_back(Action);
}
- });
-
- for (auto& Action : PendingToAbandon)
- {
- Action->SetActionState(RunnerAction::State::Abandoned);
- }
- // Collect all running actions and mark them as Abandoned, then
- // best-effort cancel via the local runner group
- std::vector<Ref<RunnerAction>> RunningToAbandon;
-
- m_RunningLock.WithSharedLock([&] {
RunningToAbandon.reserve(m_RunningMap.size());
for (auto& [Lsn, Action] : m_RunningMap)
{
@@ -478,6 +471,11 @@ ComputeServiceSession::Impl::AbandonAllActions()
}
});
+ for (auto& Action : PendingToAbandon)
+ {
+ Action->SetActionState(RunnerAction::State::Abandoned);
+ }
+
for (auto& Action : RunningToAbandon)
{
Action->SetActionState(RunnerAction::State::Abandoned);
@@ -742,7 +740,7 @@ std::vector<ComputeServiceSession::RunningActionInfo>
ComputeServiceSession::Impl::GetRunningActions()
{
std::vector<ComputeServiceSession::RunningActionInfo> Result;
- m_RunningLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
Result.reserve(m_RunningMap.size());
for (const auto& [Lsn, Action] : m_RunningMap)
{
@@ -810,6 +808,11 @@ void
ComputeServiceSession::Impl::RegisterWorker(CbPackage Worker)
{
ZEN_TRACE_CPU("ComputeServiceSession::RegisterWorker");
+
+ // Validate all paths in the worker description upfront, before the worker is
+ // distributed to runners. This rejects malicious packages early at ingestion time.
+ ValidateWorkerDescriptionPaths(Worker.GetObject());
+
RwLock::ExclusiveLockScope _(m_WorkerLock);
const IoHash& WorkerId = Worker.GetObject().GetHash();
@@ -994,10 +997,15 @@ ComputeServiceSession::Impl::EnqueueResolvedAction(int QueueId, WorkerDesc Worke
Pending->ActionObj = ActionObj;
Pending->Priority = RequestPriority;
- // For now simply put action into pending state, so we can do batch scheduling
+ // Insert into the pending map immediately so the action is visible to
+ // FindActionResult/GetActionResult right away. SetActionState will call
+ // PostUpdate which adds the action to m_UpdatedActions and signals the
+ // scheduler, but the scheduler's HandleActionUpdates inserts with
+ // std::map::insert which is a no-op for existing keys.
ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn);
+ m_ActionMapLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Pending}); });
Pending->SetActionState(RunnerAction::State::Pending);
if (m_Recorder)
@@ -1043,11 +1051,7 @@ ComputeServiceSession::Impl::GetSubmittedActionCount()
HttpResponseCode
ComputeServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage)
{
- // This lock is held for the duration of the function since we need to
- // be sure that the action doesn't change state while we are checking the
- // different data structures
-
- RwLock::ExclusiveLockScope _(m_ResultsLock);
+ RwLock::ExclusiveLockScope _(m_ActionMapLock);
if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end())
{
@@ -1058,25 +1062,14 @@ ComputeServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResult
return HttpResponseCode::OK;
}
+ if (m_PendingActions.find(ActionLsn) != m_PendingActions.end())
{
- RwLock::SharedLockScope __(m_PendingLock);
-
- if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end())
- {
- return HttpResponseCode::Accepted;
- }
+ return HttpResponseCode::Accepted;
}
- // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must
- // always be taken after m_ResultsLock if both are needed
-
+ if (m_RunningMap.find(ActionLsn) != m_RunningMap.end())
{
- RwLock::SharedLockScope __(m_RunningLock);
-
- if (m_RunningMap.find(ActionLsn) != m_RunningMap.end())
- {
- return HttpResponseCode::Accepted;
- }
+ return HttpResponseCode::Accepted;
}
return HttpResponseCode::NotFound;
@@ -1085,11 +1078,7 @@ ComputeServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResult
HttpResponseCode
ComputeServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage)
{
- // This lock is held for the duration of the function since we need to
- // be sure that the action doesn't change state while we are checking the
- // different data structures
-
- RwLock::ExclusiveLockScope _(m_ResultsLock);
+ RwLock::ExclusiveLockScope _(m_ActionMapLock);
for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It)
{
@@ -1103,30 +1092,19 @@ ComputeServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage&
}
}
+ for (const auto& [K, Pending] : m_PendingActions)
{
- RwLock::SharedLockScope __(m_PendingLock);
-
- for (const auto& [K, Pending] : m_PendingActions)
+ if (Pending->ActionId == ActionId)
{
- if (Pending->ActionId == ActionId)
- {
- return HttpResponseCode::Accepted;
- }
+ return HttpResponseCode::Accepted;
}
}
- // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must
- // always be taken after m_ResultsLock if both are needed
-
+ for (const auto& [K, v] : m_RunningMap)
{
- RwLock::SharedLockScope __(m_RunningLock);
-
- for (const auto& [K, v] : m_RunningMap)
+ if (v->ActionId == ActionId)
{
- if (v->ActionId == ActionId)
- {
- return HttpResponseCode::Accepted;
- }
+ return HttpResponseCode::Accepted;
}
}
@@ -1144,7 +1122,7 @@ ComputeServiceSession::Impl::GetCompleted(CbWriter& Cbo)
{
Cbo.BeginArray("completed");
- m_ResultsLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
for (auto& [Lsn, Action] : m_ResultsMap)
{
Cbo.BeginObject();
@@ -1275,20 +1253,14 @@ ComputeServiceSession::Impl::CancelQueue(int QueueId)
std::vector<Ref<RunnerAction>> PendingActionsToCancel;
std::vector<int> RunningLsnsToCancel;
- m_PendingLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
for (int Lsn : LsnsToCancel)
{
if (auto It = m_PendingActions.find(Lsn); It != m_PendingActions.end())
{
PendingActionsToCancel.push_back(It->second);
}
- }
- });
-
- m_RunningLock.WithSharedLock([&] {
- for (int Lsn : LsnsToCancel)
- {
- if (m_RunningMap.find(Lsn) != m_RunningMap.end())
+ else if (m_RunningMap.find(Lsn) != m_RunningMap.end())
{
RunningLsnsToCancel.push_back(Lsn);
}
@@ -1307,7 +1279,7 @@ ComputeServiceSession::Impl::CancelQueue(int QueueId)
// transition from the runner is blocked (Cancelled > Failed in the enum).
for (int Lsn : RunningLsnsToCancel)
{
- m_RunningLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
if (auto It = m_RunningMap.find(Lsn); It != m_RunningMap.end())
{
It->second->SetActionState(RunnerAction::State::Cancelled);
@@ -1445,7 +1417,7 @@ ComputeServiceSession::Impl::GetQueueCompleted(int QueueId, CbWriter& Cbo)
if (Queue)
{
Queue->m_Lock.WithSharedLock([&] {
- m_ResultsLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
for (int Lsn : Queue->FinishedLsns)
{
if (m_ResultsMap.contains(Lsn))
@@ -1541,9 +1513,15 @@ ComputeServiceSession::Impl::SchedulePendingActions()
{
ZEN_TRACE_CPU("ComputeServiceSession::SchedulePendingActions");
int ScheduledCount = 0;
- size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); });
- size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
- size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); });
+ size_t RunningCount = 0;
+ size_t PendingCount = 0;
+ size_t ResultCount = 0;
+
+ m_ActionMapLock.WithSharedLock([&] {
+ RunningCount = m_RunningMap.size();
+ PendingCount = m_PendingActions.size();
+ ResultCount = m_ResultsMap.size();
+ });
static Stopwatch DumpRunningTimer;
@@ -1560,7 +1538,7 @@ ComputeServiceSession::Impl::SchedulePendingActions()
DumpRunningTimer.Reset();
std::set<int> RunningList;
- m_RunningLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
for (auto& [K, V] : m_RunningMap)
{
RunningList.insert(K);
@@ -1602,7 +1580,7 @@ ComputeServiceSession::Impl::SchedulePendingActions()
// Also note that the m_PendingActions list is not maintained
// here, that's done periodically in SchedulePendingActions()
- m_PendingLock.WithExclusiveLock([&] {
+ m_ActionMapLock.WithExclusiveLock([&] {
if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Paused)
{
return;
@@ -1701,7 +1679,7 @@ ComputeServiceSession::Impl::SchedulerThreadFunction()
{
int TimeoutMs = 500;
- auto PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
+ auto PendingCount = m_ActionMapLock.WithSharedLock([&] { return m_PendingActions.size(); });
if (PendingCount)
{
@@ -1720,22 +1698,22 @@ ComputeServiceSession::Impl::SchedulerThreadFunction()
m_SchedulingThreadEvent.Reset();
}
- ZEN_DEBUG("compute scheduler TICK (Pending: {} was {}, Running: {}, Results: {}) timeout: {}",
- m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }),
- PendingCount,
- m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }),
- m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }),
- TimeoutMs);
+ m_ActionMapLock.WithSharedLock([&] {
+ ZEN_DEBUG("compute scheduler TICK (Pending: {}, Running: {}, Results: {}) timeout: {}",
+ m_PendingActions.size(),
+ m_RunningMap.size(),
+ m_ResultsMap.size(),
+ TimeoutMs);
+ });
HandleActionUpdates();
// Auto-transition Draining → Paused when all work is done
if (m_SessionState.load(std::memory_order_relaxed) == SessionState::Draining)
{
- size_t Pending = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
- size_t Running = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); });
+ bool AllDrained = m_ActionMapLock.WithSharedLock([&] { return m_PendingActions.empty() && m_RunningMap.empty(); });
- if (Pending == 0 && Running == 0)
+ if (AllDrained)
{
SessionState Expected = SessionState::Draining;
if (m_SessionState.compare_exchange_strong(Expected, SessionState::Paused, std::memory_order_acq_rel))
@@ -1776,9 +1754,9 @@ ComputeServiceSession::Impl::GetMaxRetriesForQueue(int QueueId)
if (Config)
{
- int Value = Config["max_retries"].AsInt32(0);
+ int Value = Config["max_retries"].AsInt32(-1);
- if (Value > 0)
+ if (Value >= 0)
{
return Value;
}
@@ -1797,7 +1775,7 @@ ComputeServiceSession::Impl::RescheduleAction(int ActionLsn)
// Find, validate, and remove atomically under a single lock scope to prevent
// concurrent RescheduleAction calls from double-removing the same action.
- m_ResultsLock.WithExclusiveLock([&] {
+ m_ActionMapLock.WithExclusiveLock([&] {
auto It = m_ResultsMap.find(ActionLsn);
if (It == m_ResultsMap.end())
{
@@ -1871,26 +1849,20 @@ ComputeServiceSession::Impl::RetractAction(int ActionLsn)
bool WasRunning = false;
// Look for the action in pending or running maps
- m_RunningLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
if (auto It = m_RunningMap.find(ActionLsn); It != m_RunningMap.end())
{
Action = It->second;
WasRunning = true;
}
+ else if (auto PIt = m_PendingActions.find(ActionLsn); PIt != m_PendingActions.end())
+ {
+ Action = PIt->second;
+ }
});
if (!Action)
{
- m_PendingLock.WithSharedLock([&] {
- if (auto It = m_PendingActions.find(ActionLsn); It != m_PendingActions.end())
- {
- Action = It->second;
- }
- });
- }
-
- if (!Action)
- {
return {.Success = false, .Error = "Action not found in pending or running maps"};
}
@@ -1912,18 +1884,15 @@ ComputeServiceSession::Impl::RetractAction(int ActionLsn)
void
ComputeServiceSession::Impl::RemoveActionFromActiveMaps(int ActionLsn)
{
- m_RunningLock.WithExclusiveLock([&] {
- m_PendingLock.WithExclusiveLock([&] {
- if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end())
- {
- m_PendingActions.erase(ActionLsn);
- }
- else
- {
- m_RunningMap.erase(FindIt);
- }
- });
- });
+ // Caller must hold m_ActionMapLock exclusively.
+ if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end())
+ {
+ m_PendingActions.erase(ActionLsn);
+ }
+ else
+ {
+ m_RunningMap.erase(FindIt);
+ }
}
void
@@ -1973,7 +1942,7 @@ ComputeServiceSession::Impl::HandleActionUpdates()
}
else
{
- m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); });
+ m_ActionMapLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); });
}
break;
@@ -1983,11 +1952,9 @@ ComputeServiceSession::Impl::HandleActionUpdates()
// Dispatched to a runner — move from pending to running
case RunnerAction::State::Running:
- m_RunningLock.WithExclusiveLock([&] {
- m_PendingLock.WithExclusiveLock([&] {
- m_RunningMap.insert({ActionLsn, Action});
- m_PendingActions.erase(ActionLsn);
- });
+ m_ActionMapLock.WithExclusiveLock([&] {
+ m_RunningMap.insert({ActionLsn, Action});
+ m_PendingActions.erase(ActionLsn);
});
ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn);
break;
@@ -1995,7 +1962,10 @@ ComputeServiceSession::Impl::HandleActionUpdates()
// Retracted — pull back for rescheduling without counting against retry limit
case RunnerAction::State::Retracted:
{
- RemoveActionFromActiveMaps(ActionLsn);
+ m_ActionMapLock.WithExclusiveLock([&] {
+ m_RunningMap.erase(ActionLsn);
+ m_PendingActions[ActionLsn] = Action;
+ });
Action->ResetActionStateToPending();
ZEN_INFO("action {} ({}) retracted for rescheduling", Action->ActionId, ActionLsn);
break;
@@ -2019,7 +1989,10 @@ ComputeServiceSession::Impl::HandleActionUpdates()
if (Action->RetryCount.load(std::memory_order_relaxed) < MaxRetries)
{
- RemoveActionFromActiveMaps(ActionLsn);
+ m_ActionMapLock.WithExclusiveLock([&] {
+ m_RunningMap.erase(ActionLsn);
+ m_PendingActions[ActionLsn] = Action;
+ });
// Reset triggers PostUpdate() which re-enters the action as Pending
Action->ResetActionStateToPending();
@@ -2034,16 +2007,16 @@ ComputeServiceSession::Impl::HandleActionUpdates()
}
}
- RemoveActionFromActiveMaps(ActionLsn);
+ m_ActionMapLock.WithExclusiveLock([&] {
+ RemoveActionFromActiveMaps(ActionLsn);
- // Update queue counters BEFORE publishing the result into
- // m_ResultsMap. GetActionResult erases from m_ResultsMap
- // under m_ResultsLock, so if we updated counters after
- // releasing that lock, a caller could observe ActiveCount
- // still at 1 immediately after GetActionResult returned OK.
- NotifyQueueActionComplete(Action->QueueId, ActionLsn, TerminalState);
+ // Update queue counters BEFORE publishing the result into
+ // m_ResultsMap. GetActionResult erases from m_ResultsMap
+ // under m_ActionMapLock, so if we updated counters after
+ // releasing that lock, a caller could observe ActiveCount
+ // still at 1 immediately after GetActionResult returned OK.
+ NotifyQueueActionComplete(Action->QueueId, ActionLsn, TerminalState);
- m_ResultsLock.WithExclusiveLock([&] {
m_ResultsMap[ActionLsn] = Action;
// Append to bounded action history ring
@@ -2282,6 +2255,18 @@ ComputeServiceSession::AddLocalRunner(ChunkResolver& InChunkResolver, std::files
}
void
+ComputeServiceSession::AddManagedLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, int32_t MaxConcurrentActions)
+{
+ ZEN_TRACE_CPU("ComputeServiceSession::AddManagedLocalRunner");
+
+ auto* NewRunner =
+ new ManagedProcessRunner(InChunkResolver, BasePath, m_Impl->m_DeferredDeleter, m_Impl->m_LocalSubmitPool, MaxConcurrentActions);
+
+ m_Impl->SyncWorkersToRunner(*NewRunner);
+ m_Impl->m_LocalRunnerGroup.AddRunner(NewRunner);
+}
+
+void
ComputeServiceSession::AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName)
{
ZEN_TRACE_CPU("ComputeServiceSession::AddRemoteRunner");
diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp
index bdfd9d197..bd3f4e70e 100644
--- a/src/zencompute/httpcomputeservice.cpp
+++ b/src/zencompute/httpcomputeservice.cpp
@@ -93,13 +93,14 @@ struct HttpComputeService::Impl
uint64_t NewBytes = 0;
};
- IngestStats IngestPackageAttachments(const CbPackage& Package);
- bool CheckAttachments(const CbObject& ActionObj, std::vector<IoHash>& NeedList);
- void HandleWorkersGet(HttpServerRequest& HttpReq);
- void HandleWorkersAllGet(HttpServerRequest& HttpReq);
- void WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status);
- void HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId);
- void HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker);
+ bool IngestPackageAttachments(HttpServerRequest& HttpReq, const CbPackage& Package, IngestStats& OutStats);
+ bool CheckAttachments(const CbObject& ActionObj, std::vector<IoHash>& NeedList);
+ bool ValidateAttachmentHash(HttpServerRequest& HttpReq, const CbAttachment& Attachment);
+ void HandleWorkersGet(HttpServerRequest& HttpReq);
+ void HandleWorkersAllGet(HttpServerRequest& HttpReq);
+ void WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status);
+ void HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId);
+ void HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker);
// WebSocket / observer
void OnWebSocketOpen(Ref<WebSocketConnection> Connection);
@@ -373,7 +374,7 @@ HttpComputeService::Impl::RegisterRoutes()
if (HttpResponseCode ResponseCode = m_ComputeService.FindActionResult(ActionId, /* out */ Output);
ResponseCode != HttpResponseCode::OK)
{
- ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode))
+ ZEN_DEBUG("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode))
if (ResponseCode == HttpResponseCode::NotFound)
{
@@ -1167,35 +1168,81 @@ HttpComputeService::Impl::ResolveQueueRef(HttpServerRequest& HttpReq, std::strin
return ParseInt<int>(Capture).value_or(0);
}
-HttpComputeService::Impl::IngestStats
-HttpComputeService::Impl::IngestPackageAttachments(const CbPackage& Package)
+bool
+HttpComputeService::Impl::ValidateAttachmentHash(HttpServerRequest& HttpReq, const CbAttachment& Attachment)
{
- IngestStats Stats;
+ const IoHash ClaimedHash = Attachment.GetHash();
+ CompressedBuffer Buffer = Attachment.AsCompressedBinary();
+ const IoHash HeaderHash = Buffer.DecodeRawHash();
+
+ if (HeaderHash != ClaimedHash)
+ {
+ ZEN_WARN("attachment header hash mismatch: claimed {} but header contains {}", ClaimedHash, HeaderHash);
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ return false;
+ }
+
+ IoHashStream Hasher;
+
+ bool DecompressOk = Buffer.DecompressToStream(
+ 0,
+ Buffer.DecodeRawSize(),
+ [&](uint64_t /*SourceOffset*/, uint64_t /*SourceSize*/, uint64_t /*Offset*/, const CompositeBuffer& Range) -> bool {
+ for (const SharedBuffer& Segment : Range.GetSegments())
+ {
+ Hasher.Append(Segment.GetView());
+ }
+ return true;
+ });
+
+ if (!DecompressOk)
+ {
+ ZEN_WARN("attachment {}: failed to decompress", ClaimedHash);
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ return false;
+ }
+
+ const IoHash ActualHash = Hasher.GetHash();
+
+ if (ActualHash != ClaimedHash)
+ {
+ ZEN_WARN("attachment hash mismatch: claimed {} but decompressed data hashes to {}", ClaimedHash, ActualHash);
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ return false;
+ }
+
+ return true;
+}
+bool
+HttpComputeService::Impl::IngestPackageAttachments(HttpServerRequest& HttpReq, const CbPackage& Package, IngestStats& OutStats)
+{
for (const CbAttachment& Attachment : Package.GetAttachments())
{
ZEN_ASSERT(Attachment.IsCompressedBinary());
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer DataView = Attachment.AsCompressedBinary();
-
- ZEN_UNUSED(DataHash);
+ if (!ValidateAttachmentHash(HttpReq, Attachment))
+ {
+ return false;
+ }
- const uint64_t CompressedSize = DataView.GetCompressedSize();
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
+ const uint64_t CompressedSize = DataView.GetCompressedSize();
- Stats.Bytes += CompressedSize;
- ++Stats.Count;
+ OutStats.Bytes += CompressedSize;
+ ++OutStats.Count;
const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
if (InsertResult.New)
{
- Stats.NewBytes += CompressedSize;
- ++Stats.NewCount;
+ OutStats.NewBytes += CompressedSize;
+ ++OutStats.NewCount;
}
}
- return Stats;
+ return true;
}
bool
@@ -1253,7 +1300,10 @@ HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int Que
{
CbPackage Package = HttpReq.ReadPayloadPackage();
Body = Package.GetObject();
- Stats = IngestPackageAttachments(Package);
+ if (!IngestPackageAttachments(HttpReq, Package, Stats))
+ {
+ return; // validation failed, response already written
+ }
break;
}
@@ -1268,8 +1318,7 @@ HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int Que
{
// --- Batch path ---
- // For CbObject payloads, check all attachments upfront before enqueuing anything
- if (HttpReq.RequestContentType() == HttpContentType::kCbObject)
+ // Verify all action attachment references exist in the store
{
std::vector<IoHash> NeedList;
@@ -1345,7 +1394,6 @@ HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int Que
// --- Single-action path: Body is the action itself ---
- if (HttpReq.RequestContentType() == HttpContentType::kCbObject)
{
std::vector<IoHash> NeedList;
@@ -1491,10 +1539,14 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const
{
ZEN_ASSERT(Attachment.IsCompressedBinary());
+ if (!ValidateAttachmentHash(HttpReq, Attachment))
+ {
+ return;
+ }
+
const IoHash DataHash = Attachment.GetHash();
CompressedBuffer Buffer = Attachment.AsCompressedBinary();
- ZEN_UNUSED(DataHash);
TotalAttachmentBytes += Buffer.GetCompressedSize();
++AttachmentCount;
diff --git a/src/zencompute/include/zencompute/computeservice.h b/src/zencompute/include/zencompute/computeservice.h
index 1ca78738a..ad556f546 100644
--- a/src/zencompute/include/zencompute/computeservice.h
+++ b/src/zencompute/include/zencompute/computeservice.h
@@ -167,6 +167,7 @@ public:
// Action runners
void AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, int32_t MaxConcurrentActions = 0);
+ void AddManagedLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, int32_t MaxConcurrentActions = 0);
void AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName);
// Action submission
diff --git a/src/zencompute/pathvalidation.h b/src/zencompute/pathvalidation.h
new file mode 100644
index 000000000..c2e30183a
--- /dev/null
+++ b/src/zencompute/pathvalidation.h
@@ -0,0 +1,118 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinary.h>
+#include <zencore/except_fmt.h>
+#include <zencore/string.h>
+
+#include <filesystem>
+#include <string_view>
+
+namespace zen::compute {
+
+// Validate that a single path component contains only characters that are valid
+// file/directory names on all supported platforms. Uses Windows rules as the most
+// restrictive superset, since packages may be built on one platform and consumed
+// on another.
+inline void
+ValidatePathComponent(std::string_view Component, std::string_view FullPath)
+{
+ // Reject control characters (0x00-0x1F) and characters forbidden on Windows
+ for (char Ch : Component)
+ {
+ if (static_cast<unsigned char>(Ch) < 0x20 || Ch == '<' || Ch == '>' || Ch == ':' || Ch == '"' || Ch == '|' || Ch == '?' ||
+ Ch == '*')
+ {
+ throw zen::invalid_argument("invalid character in path component '{}' of '{}'", Component, FullPath);
+ }
+ }
+
+ // Reject empty components and trailing dots or spaces (silently stripped on Windows, leading to confusion)
+ if (Component.empty() || Component.back() == '.' || Component.back() == ' ')
+ {
+ throw zen::invalid_argument("path component '{}' of '{}' has trailing dot or space", Component, FullPath);
+ }
+
+ // Reject Windows reserved device names (CON, PRN, AUX, NUL, COM1-9, LPT1-9)
+ // These are reserved with or without an extension (e.g. "CON.txt" is still reserved).
+ std::string_view Stem = Component.substr(0, Component.find('.'));
+
+ static constexpr std::string_view ReservedNames[] = {
+ "CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7",
+ "COM8", "COM9", "LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9",
+ };
+
+ for (std::string_view Reserved : ReservedNames)
+ {
+ if (zen::StrCaseCompare(Stem, Reserved) == 0)
+ {
+ throw zen::invalid_argument("path component '{}' of '{}' uses reserved device name '{}'", Component, FullPath, Reserved);
+ }
+ }
+}
+
+// Validate that a path extracted from a package is a safe relative path.
+// Rejects absolute paths, ".." components, and invalid platform filenames.
+inline void
+ValidateSandboxRelativePath(std::string_view Name)
+{
+ if (Name.empty())
+ {
+ throw zen::invalid_argument("path traversal detected: empty path name");
+ }
+
+ std::filesystem::path Parsed(Name);
+
+ if (Parsed.is_absolute())
+ {
+ throw zen::invalid_argument("path traversal detected: '{}' is an absolute path", Name);
+ }
+
+ for (const auto& Component : Parsed)
+ {
+ std::string ComponentStr = Component.string();
+
+ if (ComponentStr == "..")
+ {
+ throw zen::invalid_argument("path traversal detected: '{}' contains '..' component", Name);
+ }
+
+ // Skip "." (current directory) — harmless in relative paths
+ if (ComponentStr != ".")
+ {
+ ValidatePathComponent(ComponentStr, Name);
+ }
+ }
+}
+
+// Validate all path entries in a worker description CbObject.
+// Checks path, executables[].name, dirs[], and files[].name fields.
+// Throws an exception if any invalid paths are found.
+inline void
+ValidateWorkerDescriptionPaths(const CbObject& WorkerDescription)
+{
+ using namespace std::literals;
+
+ if (auto PathField = WorkerDescription["path"sv]; PathField.HasValue())
+ {
+ ValidateSandboxRelativePath(PathField.AsString());
+ }
+
+ for (auto& It : WorkerDescription["executables"sv])
+ {
+ ValidateSandboxRelativePath(It.AsObjectView()["name"sv].AsString());
+ }
+
+ for (auto& It : WorkerDescription["dirs"sv])
+ {
+ ValidateSandboxRelativePath(It.AsString());
+ }
+
+ for (auto& It : WorkerDescription["files"sv])
+ {
+ ValidateSandboxRelativePath(It.AsObjectView()["name"sv].AsString());
+ }
+}
+
+} // namespace zen::compute
diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp
index 4f116e7d8..67e12b84e 100644
--- a/src/zencompute/runners/functionrunner.cpp
+++ b/src/zencompute/runners/functionrunner.cpp
@@ -164,8 +164,9 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
}
}
- // Assign any remaining actions to runners with capacity (round-robin)
- for (int i = 0; ActionIdx < Actions.size(); i = (i + 1) % RunnerCount)
+ // Assign any remaining actions to runners with capacity (round-robin).
+ // Cap at TotalCapacity to avoid spinning when there are more actions than runners can accept.
+ for (int i = 0; ActionIdx < Actions.size() && ActionIdx < TotalCapacity; i = (i + 1) % RunnerCount)
{
if (Capacities[i] > PerRunnerActions[i].size())
{
@@ -186,11 +187,12 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
}
}
- // Reassemble results in original action order
- std::vector<SubmitResult> Results(Actions.size());
+ // Reassemble results in original action order.
+ // Actions beyond ActionIdx were not assigned to any runner (insufficient capacity).
+ std::vector<SubmitResult> Results(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No capacity"});
std::vector<size_t> PerRunnerIdx(RunnerCount, 0);
- for (size_t i = 0; i < Actions.size(); ++i)
+ for (size_t i = 0; i < ActionIdx; ++i)
{
size_t RunnerIdx = ActionRunnerIndex[i];
size_t Idx = PerRunnerIdx[RunnerIdx]++;
diff --git a/src/zencompute/runners/linuxrunner.cpp b/src/zencompute/runners/linuxrunner.cpp
index e79a6c90f..9055005d9 100644
--- a/src/zencompute/runners/linuxrunner.cpp
+++ b/src/zencompute/runners/linuxrunner.cpp
@@ -331,6 +331,8 @@ LinuxProcessRunner::LinuxProcessRunner(ChunkResolver& Resolver,
{
ZEN_INFO("namespace sandboxing enabled for child processes");
}
+
+ StartMonitorThread();
}
SubmitResult
diff --git a/src/zencompute/runners/localrunner.cpp b/src/zencompute/runners/localrunner.cpp
index b61e0a46f..1b748c0e5 100644
--- a/src/zencompute/runners/localrunner.cpp
+++ b/src/zencompute/runners/localrunner.cpp
@@ -4,6 +4,8 @@
#if ZEN_WITH_COMPUTE_SERVICES
+# include "pathvalidation.h"
+
# include <zencore/compactbinary.h>
# include <zencore/compactbinarybuilder.h>
# include <zencore/compactbinarypackage.h>
@@ -104,8 +106,6 @@ LocalProcessRunner::LocalProcessRunner(ChunkResolver& Resolver,
ZEN_INFO("Cleanup complete");
}
- m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this};
-
# if ZEN_PLATFORM_WINDOWS
// Suppress any error dialogs caused by missing dependencies
UINT OldMode = ::SetErrorMode(0);
@@ -382,6 +382,8 @@ LocalProcessRunner::DecompressAttachmentToFile(const CbPackage& FromP
const IoHash ChunkHash = FileEntry["hash"sv].AsHash();
const uint64_t Size = FileEntry["size"sv].AsUInt64();
+ ValidateSandboxRelativePath(Name);
+
CompressedBuffer Compressed;
if (const CbAttachment* Attachment = FromPackage.FindAttachment(ChunkHash))
@@ -457,7 +459,8 @@ LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage,
for (auto& It : WorkerDescription["dirs"sv])
{
- std::string_view Name = It.AsString();
+ std::string_view Name = It.AsString();
+ ValidateSandboxRelativePath(Name);
std::filesystem::path DirPath{SandboxPath / std::filesystem::path(Name).make_preferred()};
// Validate dir path stays within sandbox
@@ -482,6 +485,8 @@ LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage,
}
WriteFile(SandboxPath / "worker.zcb", WorkerDescription.GetBuffer().AsIoBuffer());
+
+ ZEN_INFO("manifested worker '{}' in '{}'", WorkerPackage.GetObjectHash(), SandboxPath);
}
CbPackage
@@ -540,6 +545,12 @@ LocalProcessRunner::GatherActionOutputs(std::filesystem::path SandboxPath)
}
void
+LocalProcessRunner::StartMonitorThread()
+{
+ m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this};
+}
+
+void
LocalProcessRunner::MonitorThreadFunction()
{
SetCurrentThreadName("LocalProcessRunner_Monitor");
diff --git a/src/zencompute/runners/localrunner.h b/src/zencompute/runners/localrunner.h
index b8cff6826..d6589db43 100644
--- a/src/zencompute/runners/localrunner.h
+++ b/src/zencompute/runners/localrunner.h
@@ -67,6 +67,7 @@ protected:
{
Ref<RunnerAction> Action;
void* ProcessHandle = nullptr;
+ int Pid = 0;
int ExitCode = 0;
std::filesystem::path SandboxPath;
@@ -83,8 +84,6 @@ protected:
std::filesystem::path m_SandboxPath;
int32_t m_MaxRunningActions = 64; // arbitrary limit for testing
- // if used in conjuction with m_ResultsLock, this lock must be taken *after*
- // m_ResultsLock to avoid deadlocks
RwLock m_RunningLock;
std::unordered_map<int, Ref<RunningAction>> m_RunningMap;
@@ -95,6 +94,7 @@ protected:
std::thread m_MonitorThread;
std::atomic<bool> m_MonitorThreadEnabled{true};
Event m_MonitorThreadEvent;
+ void StartMonitorThread();
void MonitorThreadFunction();
virtual void SweepRunningActions();
virtual void CancelRunningActions();
diff --git a/src/zencompute/runners/macrunner.cpp b/src/zencompute/runners/macrunner.cpp
index 5cec90699..c2ccca9a6 100644
--- a/src/zencompute/runners/macrunner.cpp
+++ b/src/zencompute/runners/macrunner.cpp
@@ -130,6 +130,8 @@ MacProcessRunner::MacProcessRunner(ChunkResolver& Resolver,
{
ZEN_INFO("Seatbelt sandboxing enabled for child processes");
}
+
+ StartMonitorThread();
}
SubmitResult
diff --git a/src/zencompute/runners/managedrunner.cpp b/src/zencompute/runners/managedrunner.cpp
new file mode 100644
index 000000000..e4a7ba388
--- /dev/null
+++ b/src/zencompute/runners/managedrunner.cpp
@@ -0,0 +1,279 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "managedrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/except_fmt.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/scopeguard.h>
+# include <zencore/timer.h>
+# include <zencore/trace.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <asio/io_context.hpp>
+# include <asio/executor_work_guard.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+ManagedProcessRunner::ManagedProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool,
+ int32_t MaxConcurrentActions)
+: LocalProcessRunner(Resolver, BaseDir, Deleter, WorkerPool, MaxConcurrentActions)
+, m_IoContext(std::make_unique<asio::io_context>())
+, m_SubprocessManager(std::make_unique<SubprocessManager>(*m_IoContext))
+{
+ m_ProcessGroup = m_SubprocessManager->CreateGroup("compute-workers");
+
+ // Run the io_context on a small thread pool so that exit callbacks and
+ // metrics sampling are dispatched without blocking each other.
+ for (int i = 0; i < kIoThreadCount; ++i)
+ {
+ m_IoThreads.emplace_back([this, i] {
+ SetCurrentThreadName(fmt::format("mrunner_{}", i));
+
+ // work_guard keeps run() alive even when there is no pending work yet
+ auto WorkGuard = asio::make_work_guard(*m_IoContext);
+
+ m_IoContext->run();
+ });
+ }
+}
+
+ManagedProcessRunner::~ManagedProcessRunner()
+{
+ try
+ {
+ Shutdown();
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("exception during managed process runner shutdown: {}", Ex.what());
+ }
+}
+
+void
+ManagedProcessRunner::Shutdown()
+{
+ ZEN_TRACE_CPU("ManagedProcessRunner::Shutdown");
+ m_AcceptNewActions = false;
+
+ CancelRunningActions();
+
+ // Tear down the SubprocessManager before stopping the io_context so that
+ // any in-flight callbacks are drained cleanly.
+ if (m_SubprocessManager)
+ {
+ m_SubprocessManager->DestroyGroup("compute-workers");
+ m_ProcessGroup = nullptr;
+ m_SubprocessManager.reset();
+ }
+
+ if (m_IoContext)
+ {
+ m_IoContext->stop();
+ }
+
+ for (std::thread& Thread : m_IoThreads)
+ {
+ if (Thread.joinable())
+ {
+ Thread.join();
+ }
+ }
+ m_IoThreads.clear();
+}
+
+SubmitResult
+ManagedProcessRunner::SubmitAction(Ref<RunnerAction> Action)
+{
+ ZEN_TRACE_CPU("ManagedProcessRunner::SubmitAction");
+ std::optional<PreparedAction> Prepared = PrepareActionSubmission(Action);
+
+ if (!Prepared)
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+
+ CbObject WorkerDescription = Prepared->WorkerPackage.GetObject();
+
+ // Parse environment variables from worker descriptor ("KEY=VALUE" strings)
+ // into the key-value pairs expected by CreateProcOptions.
+ std::vector<std::pair<std::string, std::string>> EnvPairs;
+ for (auto& It : WorkerDescription["environment"sv])
+ {
+ std::string_view Str = It.AsString();
+ size_t Eq = Str.find('=');
+ if (Eq != std::string_view::npos)
+ {
+ EnvPairs.emplace_back(std::string(Str.substr(0, Eq)), std::string(Str.substr(Eq + 1)));
+ }
+ }
+
+ // Build command line
+ std::string_view ExecPath = WorkerDescription["path"sv].AsString();
+ std::filesystem::path ExePath = Prepared->WorkerPath / std::filesystem::path(ExecPath).make_preferred();
+
+ std::string CommandLine = fmt::format("\"{}\" -Build=build.action"sv, ExePath.string());
+
+ ZEN_DEBUG("Executing (managed): '{}' (sandbox='{}')", CommandLine, Prepared->SandboxPath);
+
+ CreateProcOptions Options;
+ Options.WorkingDirectory = &Prepared->SandboxPath;
+ Options.Flags = CreateProcOptions::Flag_NoConsole;
+ Options.Environment = std::move(EnvPairs);
+
+ const int32_t ActionLsn = Prepared->ActionLsn;
+
+ ManagedProcess* Proc = nullptr;
+
+ try
+ {
+ Proc = m_ProcessGroup->Spawn(ExePath, CommandLine, Options, [this, ActionLsn](ManagedProcess& /*Process*/, int ExitCode) {
+ OnProcessExit(ActionLsn, ExitCode);
+ });
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to spawn process for action LSN {}: {}", ActionLsn, Ex.what());
+ m_DeferredDeleter.Enqueue(ActionLsn, std::move(Prepared->SandboxPath));
+ return SubmitResult{.IsAccepted = false};
+ }
+
+ {
+ Ref<RunningAction> NewAction{new RunningAction()};
+ NewAction->Action = Action;
+ NewAction->ProcessHandle = static_cast<void*>(Proc);
+ NewAction->Pid = Proc->Pid();
+ NewAction->SandboxPath = std::move(Prepared->SandboxPath);
+
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+ m_RunningMap[ActionLsn] = std::move(NewAction);
+ }
+
+ Action->SetActionState(RunnerAction::State::Running);
+
+ ZEN_DEBUG("Managed runner: action LSN {} -> PID {}", ActionLsn, Proc->Pid());
+
+ return SubmitResult{.IsAccepted = true};
+}
+
+void
+ManagedProcessRunner::OnProcessExit(int ActionLsn, int ExitCode)
+{
+ ZEN_TRACE_CPU("ManagedProcessRunner::OnProcessExit");
+
+ Ref<RunningAction> Running;
+
+ m_RunningLock.WithExclusiveLock([&] {
+ auto It = m_RunningMap.find(ActionLsn);
+ if (It != m_RunningMap.end())
+ {
+ Running = std::move(It->second);
+ m_RunningMap.erase(It);
+ }
+ });
+
+ if (!Running)
+ {
+ return;
+ }
+
+ ZEN_DEBUG("Managed runner: action LSN {} + PID {} exited with code " ZEN_BRIGHT_WHITE("{}"), ActionLsn, Running->Pid, ExitCode);
+
+ Running->ExitCode = ExitCode;
+
+ // Capture final CPU metrics from the managed process before it is removed.
+ auto* Proc = static_cast<ManagedProcess*>(Running->ProcessHandle);
+ if (Proc)
+ {
+ ProcessMetrics Metrics = Proc->GetLatestMetrics();
+ float CpuMs = static_cast<float>(Metrics.UserTimeMs + Metrics.KernelTimeMs);
+ Running->Action->CpuSeconds.store(CpuMs / 1000.0f, std::memory_order_relaxed);
+
+ float CpuPct = Proc->GetCpuUsagePercent();
+ if (CpuPct >= 0.0f)
+ {
+ Running->Action->CpuUsagePercent.store(CpuPct, std::memory_order_relaxed);
+ }
+ }
+
+ Running->ProcessHandle = nullptr;
+
+ std::vector<Ref<RunningAction>> CompletedActions;
+ CompletedActions.push_back(std::move(Running));
+ ProcessCompletedActions(CompletedActions);
+}
+
+void
+ManagedProcessRunner::CancelRunningActions()
+{
+ ZEN_TRACE_CPU("ManagedProcessRunner::CancelRunningActions");
+
+ std::unordered_map<int, Ref<RunningAction>> RunningMap;
+ m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); });
+
+ if (RunningMap.empty())
+ {
+ return;
+ }
+
+ ZEN_INFO("cancelling {} running actions via process group", RunningMap.size());
+
+ Stopwatch Timer;
+
+ // Kill all processes in the group atomically (TerminateJobObject on Windows,
+ // SIGTERM+SIGKILL on POSIX).
+ if (m_ProcessGroup)
+ {
+ m_ProcessGroup->KillAll();
+ }
+
+ for (auto& [Lsn, Running] : RunningMap)
+ {
+ m_DeferredDeleter.Enqueue(Running->Action->ActionLsn, std::move(Running->SandboxPath));
+ Running->Action->SetActionState(RunnerAction::State::Failed);
+ }
+
+ ZEN_INFO("DONE - cancelled {} running processes (took {})", RunningMap.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+}
+
+bool
+ManagedProcessRunner::CancelAction(int ActionLsn)
+{
+ ZEN_TRACE_CPU("ManagedProcessRunner::CancelAction");
+
+ ManagedProcess* Proc = nullptr;
+
+ m_RunningLock.WithSharedLock([&] {
+ auto It = m_RunningMap.find(ActionLsn);
+ if (It != m_RunningMap.end() && It->second->ProcessHandle != nullptr)
+ {
+ Proc = static_cast<ManagedProcess*>(It->second->ProcessHandle);
+ }
+ });
+
+ if (!Proc)
+ {
+ return false;
+ }
+
+ // Terminate the process. The exit callback will handle the rest
+ // (remove from running map, gather outputs or mark failed).
+ Proc->Terminate(222);
+
+ ZEN_DEBUG("CancelAction: initiated cancellation of LSN {}", ActionLsn);
+ return true;
+}
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/managedrunner.h b/src/zencompute/runners/managedrunner.h
new file mode 100644
index 000000000..21a44d43c
--- /dev/null
+++ b/src/zencompute/runners/managedrunner.h
@@ -0,0 +1,64 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "localrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zenutil/process/subprocessmanager.h>
+
+# include <memory>
+
+namespace asio {
+class io_context;
+}
+
+namespace zen::compute {
+
+/** Cross-platform process runner backed by SubprocessManager.
+
+ Subclasses LocalProcessRunner, reusing sandbox management, worker manifesting,
+ input/output handling, and shared action preparation. Replaces the polling-based
+ monitor thread with async exit callbacks driven by SubprocessManager, and
+ delegates CPU/memory metrics sampling to the manager's built-in round-robin
+ sampler.
+
+ A ProcessGroup (backed by a JobObject on Windows, process group on POSIX) is
+ used for bulk cancellation on shutdown.
+
+ This runner does not perform any platform-specific sandboxing (AppContainer,
+ namespaces, Seatbelt). It is intended as a simpler, cross-platform alternative
+ to the platform-specific runners for non-sandboxed workloads.
+ */
+class ManagedProcessRunner : public LocalProcessRunner
+{
+public:
+ ManagedProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool,
+ int32_t MaxConcurrentActions = 0);
+ ~ManagedProcessRunner();
+
+ void Shutdown() override;
+ [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
+ void CancelRunningActions() override;
+ bool CancelAction(int ActionLsn) override;
+ [[nodiscard]] bool IsHealthy() override { return true; }
+
+private:
+ static constexpr int kIoThreadCount = 4;
+
+ // Exit callback posted on an io_context thread.
+ void OnProcessExit(int ActionLsn, int ExitCode);
+
+ std::unique_ptr<asio::io_context> m_IoContext;
+ std::unique_ptr<SubprocessManager> m_SubprocessManager;
+ ProcessGroup* m_ProcessGroup = nullptr;
+ std::vector<std::thread> m_IoThreads;
+};
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/windowsrunner.cpp b/src/zencompute/runners/windowsrunner.cpp
index cd4b646e9..92ee65c2d 100644
--- a/src/zencompute/runners/windowsrunner.cpp
+++ b/src/zencompute/runners/windowsrunner.cpp
@@ -21,6 +21,12 @@ ZEN_THIRD_PARTY_INCLUDES_START
# include <sddl.h>
ZEN_THIRD_PARTY_INCLUDES_END
+// JOB_OBJECT_UILIMIT_ERRORMODE is defined in winuser.h which may be
+// excluded by WIN32_LEAN_AND_MEAN.
+# if !defined(JOB_OBJECT_UILIMIT_ERRORMODE)
+# define JOB_OBJECT_UILIMIT_ERRORMODE 0x00000400
+# endif
+
namespace zen::compute {
using namespace std::literals;
@@ -34,38 +40,65 @@ WindowsProcessRunner::WindowsProcessRunner(ChunkResolver& Resolver,
: LocalProcessRunner(Resolver, BaseDir, Deleter, WorkerPool, MaxConcurrentActions)
, m_Sandboxed(Sandboxed)
{
- if (!m_Sandboxed)
+ // Create a job object shared by all child processes. Restricting the
+ // error-mode UI prevents crash dialogs (WER / Dr. Watson) from
+ // blocking the monitor thread when a worker process terminates
+ // abnormally.
+ m_JobObject = CreateJobObjectW(nullptr, nullptr);
+ if (m_JobObject)
{
- return;
+ JOBOBJECT_EXTENDED_LIMIT_INFORMATION ExtLimits{};
+ ExtLimits.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE | JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION;
+ SetInformationJobObject(m_JobObject, JobObjectExtendedLimitInformation, &ExtLimits, sizeof(ExtLimits));
+
+ JOBOBJECT_BASIC_UI_RESTRICTIONS UiRestrictions{};
+ UiRestrictions.UIRestrictionsClass = JOB_OBJECT_UILIMIT_ERRORMODE;
+ SetInformationJobObject(m_JobObject, JobObjectBasicUIRestrictions, &UiRestrictions, sizeof(UiRestrictions));
+
+ // Set error mode on this process so children inherit it. The
+ // UILIMIT_ERRORMODE restriction above prevents them from clearing
+ // SEM_NOGPFAULTERRORBOX.
+ SetErrorMode(SEM_FAILCRITICALERRORS | SEM_NOGPFAULTERRORBOX);
}
- // Build a unique profile name per process to avoid collisions
- m_AppContainerName = L"zenserver-sandbox-" + std::to_wstring(GetCurrentProcessId());
+ if (m_Sandboxed)
+ {
+ // Build a unique profile name per process to avoid collisions
+ m_AppContainerName = L"zenserver-sandbox-" + std::to_wstring(GetCurrentProcessId());
- // Clean up any stale profile from a previous crash
- DeleteAppContainerProfile(m_AppContainerName.c_str());
+ // Clean up any stale profile from a previous crash
+ DeleteAppContainerProfile(m_AppContainerName.c_str());
- PSID Sid = nullptr;
+ PSID Sid = nullptr;
- HRESULT Hr = CreateAppContainerProfile(m_AppContainerName.c_str(),
- m_AppContainerName.c_str(), // display name
- m_AppContainerName.c_str(), // description
- nullptr, // no capabilities
- 0, // capability count
- &Sid);
+ HRESULT Hr = CreateAppContainerProfile(m_AppContainerName.c_str(),
+ m_AppContainerName.c_str(), // display name
+ m_AppContainerName.c_str(), // description
+ nullptr, // no capabilities
+ 0, // capability count
+ &Sid);
- if (FAILED(Hr))
- {
- throw zen::runtime_error("CreateAppContainerProfile failed: HRESULT 0x{:08X}", static_cast<uint32_t>(Hr));
- }
+ if (FAILED(Hr))
+ {
+ throw zen::runtime_error("CreateAppContainerProfile failed: HRESULT 0x{:08X}", static_cast<uint32_t>(Hr));
+ }
- m_AppContainerSid = Sid;
+ m_AppContainerSid = Sid;
+
+ ZEN_INFO("AppContainer sandboxing enabled for child processes (profile={})", WideToUtf8(m_AppContainerName));
+ }
- ZEN_INFO("AppContainer sandboxing enabled for child processes (profile={})", WideToUtf8(m_AppContainerName));
+ StartMonitorThread();
}
WindowsProcessRunner::~WindowsProcessRunner()
{
+ if (m_JobObject)
+ {
+ CloseHandle(m_JobObject);
+ m_JobObject = nullptr;
+ }
+
if (m_AppContainerSid)
{
FreeSid(m_AppContainerSid);
@@ -172,9 +205,9 @@ WindowsProcessRunner::SubmitAction(Ref<RunnerAction> Action)
LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr;
LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr;
BOOL bInheritHandles = FALSE;
- DWORD dwCreationFlags = DETACHED_PROCESS;
+ DWORD dwCreationFlags = CREATE_SUSPENDED | DETACHED_PROCESS;
- ZEN_DEBUG("Executing: {} (sandboxed={})", WideToUtf8(CommandLine.c_str()), m_Sandboxed);
+ ZEN_DEBUG("{}: '{}' (sandbox='{}')", m_Sandboxed ? "Sandboxing" : "Executing", WideToUtf8(CommandLine.c_str()), Prepared->SandboxPath);
CommandLine.EnsureNulTerminated();
@@ -260,14 +293,21 @@ WindowsProcessRunner::SubmitAction(Ref<RunnerAction> Action)
}
}
- CloseHandle(ProcessInformation.hThread);
+ if (m_JobObject)
+ {
+ AssignProcessToJobObject(m_JobObject, ProcessInformation.hProcess);
+ }
- Ref<RunningAction> NewAction{new RunningAction()};
- NewAction->Action = Action;
- NewAction->ProcessHandle = ProcessInformation.hProcess;
- NewAction->SandboxPath = std::move(Prepared->SandboxPath);
+ ResumeThread(ProcessInformation.hThread);
+ CloseHandle(ProcessInformation.hThread);
{
+ Ref<RunningAction> NewAction{new RunningAction()};
+ NewAction->Action = Action;
+ NewAction->ProcessHandle = ProcessInformation.hProcess;
+ NewAction->Pid = ProcessInformation.dwProcessId;
+ NewAction->SandboxPath = std::move(Prepared->SandboxPath);
+
RwLock::ExclusiveLockScope _(m_RunningLock);
m_RunningMap[Prepared->ActionLsn] = std::move(NewAction);
@@ -275,6 +315,8 @@ WindowsProcessRunner::SubmitAction(Ref<RunnerAction> Action)
Action->SetActionState(RunnerAction::State::Running);
+ ZEN_DEBUG("Local runner: action LSN {} -> PID {}", Action->ActionLsn, ProcessInformation.dwProcessId);
+
return SubmitResult{.IsAccepted = true};
}
@@ -294,6 +336,11 @@ WindowsProcessRunner::SweepRunningActions()
if (IsSuccess && ExitCode != STILL_ACTIVE)
{
+ ZEN_DEBUG("Local runner: action LSN {} + PID {} exited with code " ZEN_BRIGHT_WHITE("{}"),
+ Running->Action->ActionLsn,
+ Running->Pid,
+ ExitCode);
+
CloseHandle(Running->ProcessHandle);
Running->ProcessHandle = INVALID_HANDLE_VALUE;
Running->ExitCode = ExitCode;
diff --git a/src/zencompute/runners/windowsrunner.h b/src/zencompute/runners/windowsrunner.h
index 9f2385cc4..adeaf02fc 100644
--- a/src/zencompute/runners/windowsrunner.h
+++ b/src/zencompute/runners/windowsrunner.h
@@ -46,6 +46,7 @@ private:
bool m_Sandboxed = false;
PSID m_AppContainerSid = nullptr;
std::wstring m_AppContainerName;
+ HANDLE m_JobObject = nullptr;
};
} // namespace zen::compute
diff --git a/src/zencompute/runners/winerunner.cpp b/src/zencompute/runners/winerunner.cpp
index 506bec73b..b4fafb467 100644
--- a/src/zencompute/runners/winerunner.cpp
+++ b/src/zencompute/runners/winerunner.cpp
@@ -36,6 +36,8 @@ WineProcessRunner::WineProcessRunner(ChunkResolver& Resolver,
sigemptyset(&Action.sa_mask);
Action.sa_handler = SIG_DFL;
sigaction(SIGCHLD, &Action, nullptr);
+
+ StartMonitorThread();
}
SubmitResult