aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/runners
diff options
context:
space:
mode:
authorLiam Mitchell <[email protected]>2026-03-09 19:06:36 -0700
committerLiam Mitchell <[email protected]>2026-03-09 19:06:36 -0700
commitd1abc50ee9d4fb72efc646e17decafea741caa34 (patch)
treee4288e00f2f7ca0391b83d986efcb69d3ba66a83 /src/zencompute/runners
parentAllow requests with invalid content-types unless specified in command line or... (diff)
parentupdated chunk–block analyser (#818) (diff)
downloadzen-d1abc50ee9d4fb72efc646e17decafea741caa34.tar.xz
zen-d1abc50ee9d4fb72efc646e17decafea741caa34.zip
Merge branch 'main' into lm/restrict-content-type
Diffstat (limited to 'src/zencompute/runners')
-rw-r--r--src/zencompute/runners/deferreddeleter.cpp340
-rw-r--r--src/zencompute/runners/deferreddeleter.h68
-rw-r--r--src/zencompute/runners/functionrunner.cpp365
-rw-r--r--src/zencompute/runners/functionrunner.h214
-rw-r--r--src/zencompute/runners/linuxrunner.cpp734
-rw-r--r--src/zencompute/runners/linuxrunner.h44
-rw-r--r--src/zencompute/runners/localrunner.cpp674
-rw-r--r--src/zencompute/runners/localrunner.h138
-rw-r--r--src/zencompute/runners/macrunner.cpp491
-rw-r--r--src/zencompute/runners/macrunner.h43
-rw-r--r--src/zencompute/runners/remotehttprunner.cpp618
-rw-r--r--src/zencompute/runners/remotehttprunner.h100
-rw-r--r--src/zencompute/runners/windowsrunner.cpp460
-rw-r--r--src/zencompute/runners/windowsrunner.h53
-rw-r--r--src/zencompute/runners/winerunner.cpp237
-rw-r--r--src/zencompute/runners/winerunner.h37
16 files changed, 4616 insertions, 0 deletions
diff --git a/src/zencompute/runners/deferreddeleter.cpp b/src/zencompute/runners/deferreddeleter.cpp
new file mode 100644
index 000000000..4fad2cf70
--- /dev/null
+++ b/src/zencompute/runners/deferreddeleter.cpp
@@ -0,0 +1,340 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "deferreddeleter.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/logging.h>
+# include <zencore/thread.h>
+
+# include <algorithm>
+# include <chrono>
+
+namespace zen::compute {
+
+using namespace std::chrono_literals;
+
+using Clock = std::chrono::steady_clock;
+
+// Default deferral: how long to wait before attempting deletion.
+// This gives memory-mapped file handles time to close naturally.
+static constexpr auto DeferralPeriod = 60s;
+
+// Shortened deferral after MarkReady(): the client has collected results
+// so handles should be released soon, but we still wait briefly.
+static constexpr auto ReadyGracePeriod = 5s;
+
+// Interval between retry attempts for directories that failed deletion.
+static constexpr auto RetryInterval = 5s;
+
+static constexpr int MaxRetries = 10;
+
+DeferredDirectoryDeleter::DeferredDirectoryDeleter() : m_Thread(&DeferredDirectoryDeleter::ThreadFunction, this)
+{
+}
+
+DeferredDirectoryDeleter::~DeferredDirectoryDeleter()
+{
+ Shutdown();
+}
+
+void
+DeferredDirectoryDeleter::Enqueue(int ActionLsn, std::filesystem::path Path)
+{
+ {
+ std::lock_guard Lock(m_Mutex);
+ m_Queue.push_back({ActionLsn, std::move(Path)});
+ }
+ m_Cv.notify_one();
+}
+
+void
+DeferredDirectoryDeleter::MarkReady(int ActionLsn)
+{
+ {
+ std::lock_guard Lock(m_Mutex);
+ m_ReadyLsns.push_back(ActionLsn);
+ }
+ m_Cv.notify_one();
+}
+
+void
+DeferredDirectoryDeleter::Shutdown()
+{
+ {
+ std::lock_guard Lock(m_Mutex);
+ m_Done = true;
+ }
+ m_Cv.notify_one();
+
+ if (m_Thread.joinable())
+ {
+ m_Thread.join();
+ }
+}
+
+void
+DeferredDirectoryDeleter::ThreadFunction()
+{
+ SetCurrentThreadName("ZenDirCleanup");
+
+ struct PendingEntry
+ {
+ int ActionLsn;
+ std::filesystem::path Path;
+ Clock::time_point ReadyTime;
+ int Attempts = 0;
+ };
+
+ std::vector<PendingEntry> PendingList;
+
+ auto TryDelete = [](PendingEntry& Entry) -> bool {
+ std::error_code Ec;
+ std::filesystem::remove_all(Entry.Path, Ec);
+ return !Ec;
+ };
+
+ for (;;)
+ {
+ bool Shutting = false;
+
+ // Drain the incoming queue and process MarkReady signals
+
+ {
+ std::unique_lock Lock(m_Mutex);
+
+ if (m_Queue.empty() && m_ReadyLsns.empty() && !m_Done)
+ {
+ if (PendingList.empty())
+ {
+ m_Cv.wait(Lock, [this] { return !m_Queue.empty() || !m_ReadyLsns.empty() || m_Done; });
+ }
+ else
+ {
+ auto NextReady = PendingList.front().ReadyTime;
+ for (const auto& Entry : PendingList)
+ {
+ if (Entry.ReadyTime < NextReady)
+ {
+ NextReady = Entry.ReadyTime;
+ }
+ }
+
+ m_Cv.wait_until(Lock, NextReady, [this] { return !m_Queue.empty() || !m_ReadyLsns.empty() || m_Done; });
+ }
+ }
+
+ // Move new items into PendingList with the full deferral deadline
+ auto Now = Clock::now();
+ for (auto& Entry : m_Queue)
+ {
+ PendingList.push_back({Entry.ActionLsn, std::move(Entry.Path), Now + DeferralPeriod, 0});
+ }
+ m_Queue.clear();
+
+ // Apply MarkReady: shorten ReadyTime for matching entries
+ for (int Lsn : m_ReadyLsns)
+ {
+ for (auto& Entry : PendingList)
+ {
+ if (Entry.ActionLsn == Lsn)
+ {
+ auto NewReady = Now + ReadyGracePeriod;
+ if (NewReady < Entry.ReadyTime)
+ {
+ Entry.ReadyTime = NewReady;
+ }
+ }
+ }
+ }
+ m_ReadyLsns.clear();
+
+ Shutting = m_Done;
+ }
+
+ // Process items whose deferral period has elapsed (or all items on shutdown)
+
+ auto Now = Clock::now();
+
+ for (size_t i = 0; i < PendingList.size();)
+ {
+ auto& Entry = PendingList[i];
+
+ if (!Shutting && Now < Entry.ReadyTime)
+ {
+ ++i;
+ continue;
+ }
+
+ if (TryDelete(Entry))
+ {
+ if (Entry.Attempts > 0)
+ {
+ ZEN_INFO("Retry succeeded for directory '{}'", Entry.Path);
+ }
+
+ PendingList[i] = std::move(PendingList.back());
+ PendingList.pop_back();
+ }
+ else
+ {
+ ++Entry.Attempts;
+
+ if (Entry.Attempts >= MaxRetries)
+ {
+ ZEN_WARN("Giving up on deleting '{}' after {} attempts", Entry.Path, Entry.Attempts);
+ PendingList[i] = std::move(PendingList.back());
+ PendingList.pop_back();
+ }
+ else
+ {
+ ZEN_WARN("Unable to delete directory '{}' (attempt {}), will retry", Entry.Path, Entry.Attempts);
+ Entry.ReadyTime = Now + RetryInterval;
+ ++i;
+ }
+ }
+ }
+
+ // Exit once shutdown is requested and nothing remains
+
+ if (Shutting && PendingList.empty())
+ {
+ return;
+ }
+ }
+}
+
+} // namespace zen::compute
+
+#endif
+
+#if ZEN_WITH_TESTS
+
+# include <zencore/testing.h>
+
+namespace zen::compute {
+
+void
+deferreddeleter_forcelink()
+{
+}
+
+} // namespace zen::compute
+
+#endif
+
+#if ZEN_WITH_TESTS && ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/testutils.h>
+
+namespace zen::compute {
+
+TEST_SUITE_BEGIN("compute.deferreddeleter");
+
+TEST_CASE("DeferredDirectoryDeleter.DeletesSingleDirectory")
+{
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path DirToDelete = TempDir.Path() / "subdir";
+ CreateDirectories(DirToDelete / "nested");
+
+ CHECK(std::filesystem::exists(DirToDelete));
+
+ {
+ DeferredDirectoryDeleter Deleter;
+ Deleter.Enqueue(1, DirToDelete);
+ }
+
+ CHECK(!std::filesystem::exists(DirToDelete));
+}
+
+TEST_CASE("DeferredDirectoryDeleter.DeletesMultipleDirectories")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ constexpr int NumDirs = 10;
+ std::vector<std::filesystem::path> Dirs;
+
+ for (int i = 0; i < NumDirs; ++i)
+ {
+ auto Dir = TempDir.Path() / std::to_string(i);
+ CreateDirectories(Dir / "child");
+ Dirs.push_back(std::move(Dir));
+ }
+
+ {
+ DeferredDirectoryDeleter Deleter;
+ for (int i = 0; i < NumDirs; ++i)
+ {
+ CHECK(std::filesystem::exists(Dirs[i]));
+ Deleter.Enqueue(100 + i, Dirs[i]);
+ }
+ }
+
+ for (const auto& Dir : Dirs)
+ {
+ CHECK(!std::filesystem::exists(Dir));
+ }
+}
+
+TEST_CASE("DeferredDirectoryDeleter.ShutdownIsIdempotent")
+{
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path Dir = TempDir.Path() / "idempotent";
+ CreateDirectories(Dir);
+
+ DeferredDirectoryDeleter Deleter;
+ Deleter.Enqueue(42, Dir);
+ Deleter.Shutdown();
+ Deleter.Shutdown();
+
+ CHECK(!std::filesystem::exists(Dir));
+}
+
+TEST_CASE("DeferredDirectoryDeleter.HandlesNonExistentPath")
+{
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path NoSuchDir = TempDir.Path() / "does_not_exist";
+
+ {
+ DeferredDirectoryDeleter Deleter;
+ Deleter.Enqueue(99, NoSuchDir);
+ }
+}
+
+TEST_CASE("DeferredDirectoryDeleter.ExplicitShutdownBeforeDestruction")
+{
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path Dir = TempDir.Path() / "explicit";
+ CreateDirectories(Dir / "inner");
+
+ DeferredDirectoryDeleter Deleter;
+ Deleter.Enqueue(7, Dir);
+ Deleter.Shutdown();
+
+ CHECK(!std::filesystem::exists(Dir));
+}
+
+TEST_CASE("DeferredDirectoryDeleter.MarkReadyShortensDeferral")
+{
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path Dir = TempDir.Path() / "markready";
+ CreateDirectories(Dir / "child");
+
+ DeferredDirectoryDeleter Deleter;
+ Deleter.Enqueue(50, Dir);
+
+ // Without MarkReady the full deferral (60s) would apply.
+ // MarkReady shortens it to 5s, and shutdown bypasses even that.
+ Deleter.MarkReady(50);
+ Deleter.Shutdown();
+
+ CHECK(!std::filesystem::exists(Dir));
+}
+
+TEST_SUITE_END();
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_TESTS && ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/runners/deferreddeleter.h b/src/zencompute/runners/deferreddeleter.h
new file mode 100644
index 000000000..9b010aa0f
--- /dev/null
+++ b/src/zencompute/runners/deferreddeleter.h
@@ -0,0 +1,68 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zencompute/computeservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <condition_variable>
+# include <deque>
+# include <filesystem>
+# include <mutex>
+# include <thread>
+# include <vector>
+
+namespace zen::compute {
+
+/// Deletes directories on a background thread to avoid blocking callers.
+/// Useful when DeleteDirectories may stall (e.g. Wine's deferred-unlink semantics).
+///
+/// Enqueued directories wait for a deferral period before deletion, giving
+/// file handles time to close. Call MarkReady() with the ActionLsn to shorten
+/// the wait to a brief grace period (e.g. once a client has collected results).
+/// On shutdown, all pending directories are deleted immediately.
+class DeferredDirectoryDeleter
+{
+ DeferredDirectoryDeleter(const DeferredDirectoryDeleter&) = delete;
+ DeferredDirectoryDeleter& operator=(const DeferredDirectoryDeleter&) = delete;
+
+public:
+ DeferredDirectoryDeleter();
+ ~DeferredDirectoryDeleter();
+
+ /// Enqueue a directory for deferred deletion, associated with an action LSN.
+ void Enqueue(int ActionLsn, std::filesystem::path Path);
+
+ /// Signal that the action result has been consumed and the directory
+ /// can be deleted after a short grace period instead of the full deferral.
+ void MarkReady(int ActionLsn);
+
+ /// Drain the queue and join the background thread. Idempotent.
+ void Shutdown();
+
+private:
+ struct QueueEntry
+ {
+ int ActionLsn;
+ std::filesystem::path Path;
+ };
+
+ std::mutex m_Mutex;
+ std::condition_variable m_Cv;
+ std::deque<QueueEntry> m_Queue;
+ std::vector<int> m_ReadyLsns;
+ bool m_Done = false;
+ std::thread m_Thread;
+ void ThreadFunction();
+};
+
+} // namespace zen::compute
+
+#endif
+
+#if ZEN_WITH_TESTS
+namespace zen::compute {
+void deferreddeleter_forcelink(); // internal
+} // namespace zen::compute
+#endif
diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp
new file mode 100644
index 000000000..768cdf1e1
--- /dev/null
+++ b/src/zencompute/runners/functionrunner.cpp
@@ -0,0 +1,365 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "functionrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/filesystem.h>
+# include <zencore/trace.h>
+
+# include <fmt/format.h>
+# include <vector>
+
+namespace zen::compute {
+
+FunctionRunner::FunctionRunner(std::filesystem::path BasePath) : m_ActionsPath(BasePath / "actions")
+{
+}
+
+FunctionRunner::~FunctionRunner() = default;
+
+size_t
+FunctionRunner::QueryCapacity()
+{
+ return 1;
+}
+
+std::vector<SubmitResult>
+FunctionRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<SubmitResult> Results;
+ Results.reserve(Actions.size());
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Results.push_back(SubmitAction(Action));
+ }
+
+ return Results;
+}
+
+void
+FunctionRunner::MaybeDumpAction(int ActionLsn, const CbObject& ActionObject)
+{
+ if (m_DumpActions)
+ {
+ std::string UniqueId = fmt::format("{}.ddb", ActionLsn);
+ std::filesystem::path Path = m_ActionsPath / UniqueId;
+
+ zen::WriteFile(Path, IoBuffer(ActionObject.GetBuffer().AsIoBuffer()));
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+BaseRunnerGroup::AddRunnerInternal(FunctionRunner* Runner)
+{
+ m_RunnersLock.WithExclusiveLock([&] { m_Runners.emplace_back(Runner); });
+}
+
+size_t
+BaseRunnerGroup::QueryCapacity()
+{
+ size_t TotalCapacity = 0;
+ m_RunnersLock.WithSharedLock([&] {
+ for (const auto& Runner : m_Runners)
+ {
+ TotalCapacity += Runner->QueryCapacity();
+ }
+ });
+ return TotalCapacity;
+}
+
+SubmitResult
+BaseRunnerGroup::SubmitAction(Ref<RunnerAction> Action)
+{
+ ZEN_TRACE_CPU("BaseRunnerGroup::SubmitAction");
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ const int InitialIndex = m_NextSubmitIndex.load(std::memory_order_acquire);
+ int Index = InitialIndex;
+ const int RunnerCount = gsl::narrow<int>(m_Runners.size());
+
+ if (RunnerCount == 0)
+ {
+ return {.IsAccepted = false, .Reason = "No runners available"};
+ }
+
+ do
+ {
+ while (Index >= RunnerCount)
+ {
+ Index -= RunnerCount;
+ }
+
+ auto& Runner = m_Runners[Index++];
+
+ SubmitResult Result = Runner->SubmitAction(Action);
+
+ if (Result.IsAccepted == true)
+ {
+ m_NextSubmitIndex = Index % RunnerCount;
+
+ return Result;
+ }
+
+ while (Index >= RunnerCount)
+ {
+ Index -= RunnerCount;
+ }
+ } while (Index != InitialIndex);
+
+ return {.IsAccepted = false};
+}
+
+std::vector<SubmitResult>
+BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ ZEN_TRACE_CPU("BaseRunnerGroup::SubmitActions");
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ const int RunnerCount = gsl::narrow<int>(m_Runners.size());
+
+ if (RunnerCount == 0)
+ {
+ return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"});
+ }
+
+ // Query capacity per runner and compute total
+ std::vector<size_t> Capacities(RunnerCount);
+ size_t TotalCapacity = 0;
+
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ Capacities[i] = m_Runners[i]->QueryCapacity();
+ TotalCapacity += Capacities[i];
+ }
+
+ if (TotalCapacity == 0)
+ {
+ return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No capacity"});
+ }
+
+ // Distribute actions across runners proportionally to their available capacity
+ std::vector<std::vector<Ref<RunnerAction>>> PerRunnerActions(RunnerCount);
+ std::vector<size_t> ActionRunnerIndex(Actions.size());
+ size_t ActionIdx = 0;
+
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (Capacities[i] == 0)
+ {
+ continue;
+ }
+
+ size_t Share = (Actions.size() * Capacities[i] + TotalCapacity - 1) / TotalCapacity;
+ Share = std::min(Share, Capacities[i]);
+
+ for (size_t j = 0; j < Share && ActionIdx < Actions.size(); ++j, ++ActionIdx)
+ {
+ PerRunnerActions[i].push_back(Actions[ActionIdx]);
+ ActionRunnerIndex[ActionIdx] = i;
+ }
+ }
+
+ // Assign any remaining actions to runners with capacity (round-robin)
+ for (int i = 0; ActionIdx < Actions.size(); i = (i + 1) % RunnerCount)
+ {
+ if (Capacities[i] > PerRunnerActions[i].size())
+ {
+ PerRunnerActions[i].push_back(Actions[ActionIdx]);
+ ActionRunnerIndex[ActionIdx] = i;
+ ++ActionIdx;
+ }
+ }
+
+ // Submit batches per runner
+ std::vector<std::vector<SubmitResult>> PerRunnerResults(RunnerCount);
+
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (!PerRunnerActions[i].empty())
+ {
+ PerRunnerResults[i] = m_Runners[i]->SubmitActions(PerRunnerActions[i]);
+ }
+ }
+
+ // Reassemble results in original action order
+ std::vector<SubmitResult> Results(Actions.size());
+ std::vector<size_t> PerRunnerIdx(RunnerCount, 0);
+
+ for (size_t i = 0; i < Actions.size(); ++i)
+ {
+ size_t RunnerIdx = ActionRunnerIndex[i];
+ size_t Idx = PerRunnerIdx[RunnerIdx]++;
+ Results[i] = std::move(PerRunnerResults[RunnerIdx][Idx]);
+ }
+
+ return Results;
+}
+
+size_t
+BaseRunnerGroup::GetSubmittedActionCount()
+{
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ size_t TotalCount = 0;
+
+ for (const auto& Runner : m_Runners)
+ {
+ TotalCount += Runner->GetSubmittedActionCount();
+ }
+
+ return TotalCount;
+}
+
+void
+BaseRunnerGroup::RegisterWorker(CbPackage Worker)
+{
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ for (auto& Runner : m_Runners)
+ {
+ Runner->RegisterWorker(Worker);
+ }
+}
+
+void
+BaseRunnerGroup::Shutdown()
+{
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ for (auto& Runner : m_Runners)
+ {
+ Runner->Shutdown();
+ }
+}
+
+bool
+BaseRunnerGroup::CancelAction(int ActionLsn)
+{
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ for (auto& Runner : m_Runners)
+ {
+ if (Runner->CancelAction(ActionLsn))
+ {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+void
+BaseRunnerGroup::CancelRemoteQueue(int QueueId)
+{
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ for (auto& Runner : m_Runners)
+ {
+ Runner->CancelRemoteQueue(QueueId);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+RunnerAction::RunnerAction(ComputeServiceSession* OwnerSession) : m_OwnerSession(OwnerSession)
+{
+ this->Timestamps[static_cast<int>(State::New)] = DateTime::Now().GetTicks();
+}
+
+RunnerAction::~RunnerAction()
+{
+}
+
+bool
+RunnerAction::ResetActionStateToPending()
+{
+ // Only allow reset from Failed or Abandoned states
+ State CurrentState = m_ActionState.load();
+
+ if (CurrentState != State::Failed && CurrentState != State::Abandoned)
+ {
+ return false;
+ }
+
+ if (!m_ActionState.compare_exchange_strong(CurrentState, State::Pending))
+ {
+ return false;
+ }
+
+ // Clear timestamps from Submitting through _Count
+ for (int i = static_cast<int>(State::Submitting); i < static_cast<int>(State::_Count); ++i)
+ {
+ this->Timestamps[i] = 0;
+ }
+
+ // Record new Pending timestamp
+ this->Timestamps[static_cast<int>(State::Pending)] = DateTime::Now().GetTicks();
+
+ // Clear execution fields
+ ExecutionLocation.clear();
+ CpuUsagePercent.store(-1.0f, std::memory_order_relaxed);
+ CpuSeconds.store(0.0f, std::memory_order_relaxed);
+
+ // Increment retry count
+ RetryCount.fetch_add(1, std::memory_order_relaxed);
+
+ // Re-enter the scheduler pipeline
+ m_OwnerSession->PostUpdate(this);
+
+ return true;
+}
+
+void
+RunnerAction::SetActionState(State NewState)
+{
+ ZEN_ASSERT(NewState < State::_Count);
+ this->Timestamps[static_cast<int>(NewState)] = DateTime::Now().GetTicks();
+
+ do
+ {
+ if (State CurrentState = m_ActionState.load(); CurrentState == NewState)
+ {
+ // No state change
+ return;
+ }
+ else
+ {
+ if (NewState <= CurrentState)
+ {
+ // Cannot transition to an earlier or same state
+ return;
+ }
+
+ if (m_ActionState.compare_exchange_strong(CurrentState, NewState))
+ {
+ // Successful state change
+
+ m_OwnerSession->PostUpdate(this);
+
+ return;
+ }
+ }
+ } while (true);
+}
+
+void
+RunnerAction::SetResult(CbPackage&& Result)
+{
+ m_Result = std::move(Result);
+}
+
+CbPackage&
+RunnerAction::GetResult()
+{
+ ZEN_ASSERT(IsCompleted());
+ return m_Result;
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES \ No newline at end of file
diff --git a/src/zencompute/runners/functionrunner.h b/src/zencompute/runners/functionrunner.h
new file mode 100644
index 000000000..f67414dbb
--- /dev/null
+++ b/src/zencompute/runners/functionrunner.h
@@ -0,0 +1,214 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencompute/computeservice.h>
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <atomic>
+# include <filesystem>
+# include <vector>
+
+namespace zen::compute {
+
+struct SubmitResult
+{
+ bool IsAccepted = false;
+ std::string Reason;
+};
+
+/** Base interface for classes implementing a remote execution "runner"
+ */
+class FunctionRunner : public RefCounted
+{
+ FunctionRunner(FunctionRunner&&) = delete;
+ FunctionRunner& operator=(FunctionRunner&&) = delete;
+
+public:
+ FunctionRunner(std::filesystem::path BasePath);
+ virtual ~FunctionRunner() = 0;
+
+ virtual void Shutdown() = 0;
+ virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0;
+
+ [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) = 0;
+ [[nodiscard]] virtual size_t GetSubmittedActionCount() = 0;
+ [[nodiscard]] virtual bool IsHealthy() = 0;
+ [[nodiscard]] virtual size_t QueryCapacity();
+ [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
+
+ // Best-effort cancellation of a specific in-flight action. Returns true if the
+ // cancellation signal was successfully sent. The action will transition to Cancelled
+ // asynchronously once the platform-level termination completes.
+ virtual bool CancelAction(int /*ActionLsn*/) { return false; }
+
+ // Cancel the remote queue corresponding to the given local QueueId.
+ // Only meaningful for remote runners; local runners ignore this.
+ virtual void CancelRemoteQueue(int /*QueueId*/) {}
+
+protected:
+ std::filesystem::path m_ActionsPath;
+ bool m_DumpActions = false;
+ void MaybeDumpAction(int ActionLsn, const CbObject& ActionObject);
+};
+
+/** Base class for RunnerGroup that operates on generic FunctionRunner references.
+ * All scheduling, capacity, and lifecycle logic lives here.
+ */
+class BaseRunnerGroup
+{
+public:
+ size_t QueryCapacity();
+ SubmitResult SubmitAction(Ref<RunnerAction> Action);
+ std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
+ size_t GetSubmittedActionCount();
+ void RegisterWorker(CbPackage Worker);
+ void Shutdown();
+ bool CancelAction(int ActionLsn);
+ void CancelRemoteQueue(int QueueId);
+
+ size_t GetRunnerCount()
+ {
+ return m_RunnersLock.WithSharedLock([this] { return m_Runners.size(); });
+ }
+
+protected:
+ void AddRunnerInternal(FunctionRunner* Runner);
+
+ RwLock m_RunnersLock;
+ std::vector<Ref<FunctionRunner>> m_Runners;
+ std::atomic<int> m_NextSubmitIndex{0};
+};
+
+/** Typed RunnerGroup that adds type-safe runner addition and predicate-based removal.
+ */
+template<typename RunnerType>
+struct RunnerGroup : public BaseRunnerGroup
+{
+ void AddRunner(RunnerType* Runner) { AddRunnerInternal(Runner); }
+
+ template<typename Predicate>
+ size_t RemoveRunnerIf(Predicate&& Pred)
+ {
+ size_t RemovedCount = 0;
+ m_RunnersLock.WithExclusiveLock([&] {
+ auto It = m_Runners.begin();
+ while (It != m_Runners.end())
+ {
+ if (Pred(static_cast<RunnerType&>(**It)))
+ {
+ (*It)->Shutdown();
+ It = m_Runners.erase(It);
+ ++RemovedCount;
+ }
+ else
+ {
+ ++It;
+ }
+ }
+ });
+ return RemovedCount;
+ }
+};
+
+/**
+ * This represents an action going through different stages of scheduling and execution.
+ */
+struct RunnerAction : public RefCounted
+{
+ explicit RunnerAction(ComputeServiceSession* OwnerSession);
+ ~RunnerAction();
+
+ int ActionLsn = 0;
+ int QueueId = 0;
+ WorkerDesc Worker;
+ IoHash ActionId;
+ CbObject ActionObj;
+ int Priority = 0;
+ std::string ExecutionLocation; // "local" or remote hostname
+
+ // CPU usage and total CPU time of the running process, sampled periodically by the local runner.
+ // CpuUsagePercent: -1.0 means not yet sampled; >=0.0 is the most recent reading as a percentage.
+ // CpuSeconds: total CPU time (user+system) consumed since process start, in seconds. 0.0 if not yet sampled.
+ std::atomic<float> CpuUsagePercent{-1.0f};
+ std::atomic<float> CpuSeconds{0.0f};
+ std::atomic<int> RetryCount{0};
+
+ enum class State
+ {
+ New,
+ Pending,
+ Submitting,
+ Running,
+ Completed,
+ Failed,
+ Abandoned,
+ Cancelled,
+ _Count
+ };
+
+ static const char* ToString(State _)
+ {
+ switch (_)
+ {
+ case State::New:
+ return "New";
+ case State::Pending:
+ return "Pending";
+ case State::Submitting:
+ return "Submitting";
+ case State::Running:
+ return "Running";
+ case State::Completed:
+ return "Completed";
+ case State::Failed:
+ return "Failed";
+ case State::Abandoned:
+ return "Abandoned";
+ case State::Cancelled:
+ return "Cancelled";
+ default:
+ return "Unknown";
+ }
+ }
+
+ static State FromString(std::string_view Name, State Default = State::Failed)
+ {
+ for (int i = 0; i < static_cast<int>(State::_Count); ++i)
+ {
+ if (Name == ToString(static_cast<State>(i)))
+ {
+ return static_cast<State>(i);
+ }
+ }
+ return Default;
+ }
+
+ uint64_t Timestamps[static_cast<int>(State::_Count)] = {};
+
+ State ActionState() const { return m_ActionState; }
+ void SetActionState(State NewState);
+
+ bool IsSuccess() const { return ActionState() == State::Completed; }
+ bool ResetActionStateToPending();
+ bool IsCompleted() const
+ {
+ return ActionState() == State::Completed || ActionState() == State::Failed || ActionState() == State::Abandoned ||
+ ActionState() == State::Cancelled;
+ }
+
+ void SetResult(CbPackage&& Result);
+ CbPackage& GetResult();
+
+ ComputeServiceSession* GetOwnerSession() const { return m_OwnerSession; }
+
+private:
+ std::atomic<State> m_ActionState = State::New;
+ ComputeServiceSession* m_OwnerSession = nullptr;
+ CbPackage m_Result;
+};
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES \ No newline at end of file
diff --git a/src/zencompute/runners/linuxrunner.cpp b/src/zencompute/runners/linuxrunner.cpp
new file mode 100644
index 000000000..e79a6c90f
--- /dev/null
+++ b/src/zencompute/runners/linuxrunner.cpp
@@ -0,0 +1,734 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "linuxrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES && ZEN_PLATFORM_LINUX
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/except.h>
+# include <zencore/except_fmt.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/timer.h>
+# include <zencore/trace.h>
+
+# include <fcntl.h>
+# include <sched.h>
+# include <signal.h>
+# include <sys/mount.h>
+# include <sys/stat.h>
+# include <sys/syscall.h>
+# include <sys/wait.h>
+# include <unistd.h>
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+namespace {
+
+ // All helper functions in this namespace are async-signal-safe (safe to call
+ // between fork() and execve()). They use only raw syscalls and avoid any
+ // heap allocation, stdio, or other non-AS-safe operations.
+
+ void WriteToFd(int Fd, const char* Buf, size_t Len)
+ {
+ while (Len > 0)
+ {
+ ssize_t Written = write(Fd, Buf, Len);
+ if (Written <= 0)
+ {
+ break;
+ }
+ Buf += Written;
+ Len -= static_cast<size_t>(Written);
+ }
+ }
+
+ [[noreturn]] void WriteErrorAndExit(int ErrorPipeFd, const char* Msg, int Errno)
+ {
+ // Write the message prefix
+ size_t MsgLen = 0;
+ for (const char* P = Msg; *P; ++P)
+ {
+ ++MsgLen;
+ }
+ WriteToFd(ErrorPipeFd, Msg, MsgLen);
+
+ // Append ": " and the errno string if non-zero
+ if (Errno != 0)
+ {
+ WriteToFd(ErrorPipeFd, ": ", 2);
+ const char* ErrStr = strerror(Errno);
+ size_t ErrLen = 0;
+ for (const char* P = ErrStr; *P; ++P)
+ {
+ ++ErrLen;
+ }
+ WriteToFd(ErrorPipeFd, ErrStr, ErrLen);
+ }
+
+ _exit(127);
+ }
+
+ int MkdirIfNeeded(const char* Path, mode_t Mode)
+ {
+ if (mkdir(Path, Mode) != 0 && errno != EEXIST)
+ {
+ return -1;
+ }
+ return 0;
+ }
+
+ int BindMountReadOnly(const char* Src, const char* Dst)
+ {
+ if (mount(Src, Dst, nullptr, MS_BIND | MS_REC, nullptr) != 0)
+ {
+ return -1;
+ }
+
+ // Remount read-only
+ if (mount(nullptr, Dst, nullptr, MS_REMOUNT | MS_BIND | MS_RDONLY | MS_REC, nullptr) != 0)
+ {
+ return -1;
+ }
+
+ return 0;
+ }
+
+ // Set up namespace-based sandbox isolation in the child process.
+ // This is called after fork(), before execve(). All operations must be
+ // async-signal-safe.
+ //
+ // The sandbox layout after pivot_root:
+ // / -> the sandbox directory (tmpfs-like, was SandboxPath)
+ // /usr -> bind-mount of host /usr (read-only)
+ // /lib -> bind-mount of host /lib (read-only)
+ // /lib64 -> bind-mount of host /lib64 (read-only, optional)
+ // /etc -> bind-mount of host /etc (read-only)
+ // /worker -> bind-mount of worker directory (read-only)
+ // /proc -> proc filesystem
+ // /dev -> tmpfs with null, zero, urandom
+ void SetupNamespaceSandbox(const char* SandboxPath, uid_t Uid, gid_t Gid, const char* WorkerPath, int ErrorPipeFd)
+ {
+ // 1. Unshare user, mount, and network namespaces
+ if (unshare(CLONE_NEWUSER | CLONE_NEWNS | CLONE_NEWNET) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "unshare() failed", errno);
+ }
+
+ // 2. Write UID/GID mappings
+ // Must deny setgroups first (required by kernel for unprivileged user namespaces)
+ {
+ int Fd = open("/proc/self/setgroups", O_WRONLY);
+ if (Fd >= 0)
+ {
+ WriteToFd(Fd, "deny", 4);
+ close(Fd);
+ }
+ // setgroups file may not exist on older kernels; not fatal
+ }
+
+ {
+ // uid_map: map our UID to 0 inside the namespace
+ char Buf[64];
+ int Len = snprintf(Buf, sizeof(Buf), "0 %u 1\n", static_cast<unsigned>(Uid));
+
+ int Fd = open("/proc/self/uid_map", O_WRONLY);
+ if (Fd < 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "open uid_map failed", errno);
+ }
+ WriteToFd(Fd, Buf, static_cast<size_t>(Len));
+ close(Fd);
+ }
+
+ {
+ // gid_map: map our GID to 0 inside the namespace
+ char Buf[64];
+ int Len = snprintf(Buf, sizeof(Buf), "0 %u 1\n", static_cast<unsigned>(Gid));
+
+ int Fd = open("/proc/self/gid_map", O_WRONLY);
+ if (Fd < 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "open gid_map failed", errno);
+ }
+ WriteToFd(Fd, Buf, static_cast<size_t>(Len));
+ close(Fd);
+ }
+
+ // 3. Privatize the entire mount tree so our mounts don't propagate
+ if (mount(nullptr, "/", nullptr, MS_REC | MS_PRIVATE, nullptr) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "mount MS_PRIVATE failed", errno);
+ }
+
+ // 4. Create mount points inside the sandbox and bind-mount system directories
+
+ // Helper macro-like pattern for building paths inside sandbox
+ // We use stack buffers since we can't allocate heap memory safely
+ char MountPoint[4096];
+
+ auto BuildPath = [&](const char* Suffix) -> const char* {
+ snprintf(MountPoint, sizeof(MountPoint), "%s/%s", SandboxPath, Suffix);
+ return MountPoint;
+ };
+
+ // /usr (required)
+ if (MkdirIfNeeded(BuildPath("usr"), 0755) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "mkdir sandbox/usr failed", errno);
+ }
+ if (BindMountReadOnly("/usr", BuildPath("usr")) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "bind mount /usr failed", errno);
+ }
+
+ // /lib (required)
+ if (MkdirIfNeeded(BuildPath("lib"), 0755) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "mkdir sandbox/lib failed", errno);
+ }
+ if (BindMountReadOnly("/lib", BuildPath("lib")) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "bind mount /lib failed", errno);
+ }
+
+ // /lib64 (optional — not all distros have it)
+ {
+ struct stat St;
+ if (stat("/lib64", &St) == 0 && S_ISDIR(St.st_mode))
+ {
+ if (MkdirIfNeeded(BuildPath("lib64"), 0755) == 0)
+ {
+ BindMountReadOnly("/lib64", BuildPath("lib64"));
+ // Failure is non-fatal for lib64
+ }
+ }
+ }
+
+ // /etc (required — for resolv.conf, ld.so.cache, etc.)
+ if (MkdirIfNeeded(BuildPath("etc"), 0755) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "mkdir sandbox/etc failed", errno);
+ }
+ if (BindMountReadOnly("/etc", BuildPath("etc")) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "bind mount /etc failed", errno);
+ }
+
+ // /worker — bind-mount worker directory (contains the executable)
+ if (MkdirIfNeeded(BuildPath("worker"), 0755) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "mkdir sandbox/worker failed", errno);
+ }
+ if (BindMountReadOnly(WorkerPath, BuildPath("worker")) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "bind mount worker dir failed", errno);
+ }
+
+ // 5. Mount /proc inside sandbox
+ if (MkdirIfNeeded(BuildPath("proc"), 0755) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "mkdir sandbox/proc failed", errno);
+ }
+ if (mount("proc", BuildPath("proc"), "proc", MS_NOSUID | MS_NOEXEC | MS_NODEV, nullptr) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "mount /proc failed", errno);
+ }
+
+ // 6. Mount tmpfs /dev and bind-mount essential device nodes
+ if (MkdirIfNeeded(BuildPath("dev"), 0755) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "mkdir sandbox/dev failed", errno);
+ }
+ if (mount("tmpfs", BuildPath("dev"), "tmpfs", MS_NOSUID | MS_NOEXEC, "size=64k,mode=0755") != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "mount tmpfs /dev failed", errno);
+ }
+
+ // Bind-mount /dev/null, /dev/zero, /dev/urandom
+ {
+ char DevSrc[64];
+ char DevDst[4096];
+
+ auto BindDev = [&](const char* Name) {
+ snprintf(DevSrc, sizeof(DevSrc), "/dev/%s", Name);
+ snprintf(DevDst, sizeof(DevDst), "%s/dev/%s", SandboxPath, Name);
+
+ // Create the file to mount over
+ int Fd = open(DevDst, O_WRONLY | O_CREAT, 0666);
+ if (Fd >= 0)
+ {
+ close(Fd);
+ }
+ mount(DevSrc, DevDst, nullptr, MS_BIND, nullptr);
+ // Non-fatal if individual devices fail
+ };
+
+ BindDev("null");
+ BindDev("zero");
+ BindDev("urandom");
+ }
+
+ // 7. pivot_root to sandbox
+ // pivot_root requires the new root and put_old to be mount points.
+ // Bind-mount sandbox onto itself to make it a mount point.
+ if (mount(SandboxPath, SandboxPath, nullptr, MS_BIND | MS_REC, nullptr) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "bind mount sandbox onto itself failed", errno);
+ }
+
+ // Create .pivot_old inside sandbox
+ char PivotOld[4096];
+ snprintf(PivotOld, sizeof(PivotOld), "%s/.pivot_old", SandboxPath);
+ if (MkdirIfNeeded(PivotOld, 0755) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "mkdir .pivot_old failed", errno);
+ }
+
+ if (syscall(SYS_pivot_root, SandboxPath, PivotOld) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "pivot_root failed", errno);
+ }
+
+ // 8. Now inside new root. Clean up old root.
+ if (chdir("/") != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "chdir / failed", errno);
+ }
+
+ if (umount2("/.pivot_old", MNT_DETACH) != 0)
+ {
+ WriteErrorAndExit(ErrorPipeFd, "umount2 .pivot_old failed", errno);
+ }
+
+ rmdir("/.pivot_old");
+ }
+
+} // anonymous namespace
+
+LinuxProcessRunner::LinuxProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool,
+ bool Sandboxed,
+ int32_t MaxConcurrentActions)
+: LocalProcessRunner(Resolver, BaseDir, Deleter, WorkerPool, MaxConcurrentActions)
+, m_Sandboxed(Sandboxed)
+{
+ // Restore SIGCHLD to default behavior so waitpid() can properly collect
+ // child exit status. zenserver/main.cpp sets SIGCHLD to SIG_IGN which
+ // causes the kernel to auto-reap children, making waitpid() return
+ // -1/ECHILD instead of the exit status we need.
+ struct sigaction Action = {};
+ sigemptyset(&Action.sa_mask);
+ Action.sa_handler = SIG_DFL;
+ sigaction(SIGCHLD, &Action, nullptr);
+
+ if (m_Sandboxed)
+ {
+ ZEN_INFO("namespace sandboxing enabled for child processes");
+ }
+}
+
+SubmitResult
+LinuxProcessRunner::SubmitAction(Ref<RunnerAction> Action)
+{
+ ZEN_TRACE_CPU("LinuxProcessRunner::SubmitAction");
+ std::optional<PreparedAction> Prepared = PrepareActionSubmission(Action);
+
+ if (!Prepared)
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+
+ // Build environment array from worker descriptor
+
+ CbObject WorkerDescription = Prepared->WorkerPackage.GetObject();
+
+ std::vector<std::string> EnvStrings;
+ for (auto& It : WorkerDescription["environment"sv])
+ {
+ EnvStrings.emplace_back(It.AsString());
+ }
+
+ std::vector<char*> Envp;
+ Envp.reserve(EnvStrings.size() + 1);
+ for (auto& Str : EnvStrings)
+ {
+ Envp.push_back(Str.data());
+ }
+ Envp.push_back(nullptr);
+
+ // Build argv: <worker_exe_path> -Build=build.action
+ // Pre-compute all path strings before fork() for async-signal-safety.
+
+ std::string_view ExecPath = WorkerDescription["path"sv].AsString();
+ std::string ExePathStr;
+ std::string SandboxedExePathStr;
+
+ if (m_Sandboxed)
+ {
+ // After pivot_root, the worker dir is at /worker inside the new root
+ std::filesystem::path SandboxedExePath = std::filesystem::path("/worker") / std::filesystem::path(ExecPath);
+ SandboxedExePathStr = SandboxedExePath.string();
+ // We still need the real path for logging
+ ExePathStr = (Prepared->WorkerPath / std::filesystem::path(ExecPath)).string();
+ }
+ else
+ {
+ ExePathStr = (Prepared->WorkerPath / std::filesystem::path(ExecPath)).string();
+ }
+
+ std::string BuildArg = "-Build=build.action";
+
+ // argv[0] should be the path the child will see
+ const std::string& ChildExePath = m_Sandboxed ? SandboxedExePathStr : ExePathStr;
+
+ std::vector<char*> ArgV;
+ ArgV.push_back(const_cast<char*>(ChildExePath.data()));
+ ArgV.push_back(BuildArg.data());
+ ArgV.push_back(nullptr);
+
+ ZEN_DEBUG("Executing: {} {} (sandboxed={})", ExePathStr, BuildArg, m_Sandboxed);
+
+ std::string SandboxPathStr = Prepared->SandboxPath.string();
+ std::string WorkerPathStr = Prepared->WorkerPath.string();
+
+ // Pre-fork: get uid/gid for namespace mapping, create error pipe
+ uid_t CurrentUid = 0;
+ gid_t CurrentGid = 0;
+ int ErrorPipe[2] = {-1, -1};
+
+ if (m_Sandboxed)
+ {
+ CurrentUid = getuid();
+ CurrentGid = getgid();
+
+ if (pipe2(ErrorPipe, O_CLOEXEC) != 0)
+ {
+ throw zen::runtime_error("pipe2() for sandbox error pipe failed: {}", strerror(errno));
+ }
+ }
+
+ pid_t ChildPid = fork();
+
+ if (ChildPid < 0)
+ {
+ int SavedErrno = errno;
+ if (m_Sandboxed)
+ {
+ close(ErrorPipe[0]);
+ close(ErrorPipe[1]);
+ }
+ throw zen::runtime_error("fork() failed: {}", strerror(SavedErrno));
+ }
+
+ if (ChildPid == 0)
+ {
+ // Child process
+
+ if (m_Sandboxed)
+ {
+ // Close read end of error pipe — child only writes
+ close(ErrorPipe[0]);
+
+ SetupNamespaceSandbox(SandboxPathStr.c_str(), CurrentUid, CurrentGid, WorkerPathStr.c_str(), ErrorPipe[1]);
+
+ // After pivot_root, CWD is "/" which is the sandbox root.
+ // execve with the sandboxed path.
+ execve(SandboxedExePathStr.c_str(), ArgV.data(), Envp.data());
+
+ WriteErrorAndExit(ErrorPipe[1], "execve failed", errno);
+ }
+ else
+ {
+ if (chdir(SandboxPathStr.c_str()) != 0)
+ {
+ _exit(127);
+ }
+
+ execve(ExePathStr.c_str(), ArgV.data(), Envp.data());
+ _exit(127);
+ }
+ }
+
+ // Parent process
+
+ if (m_Sandboxed)
+ {
+ // Close write end of error pipe — parent only reads
+ close(ErrorPipe[1]);
+
+ // Read from error pipe. If execve succeeded, pipe was closed by O_CLOEXEC
+ // and read returns 0. If setup failed, child wrote an error message.
+ char ErrBuf[512];
+ ssize_t BytesRead = read(ErrorPipe[0], ErrBuf, sizeof(ErrBuf) - 1);
+ close(ErrorPipe[0]);
+
+ if (BytesRead > 0)
+ {
+ // Sandbox setup or execve failed
+ ErrBuf[BytesRead] = '\0';
+
+ // Reap the child (it called _exit(127))
+ waitpid(ChildPid, nullptr, 0);
+
+ // Clean up the sandbox in the background
+ m_DeferredDeleter.Enqueue(Action->ActionLsn, std::move(Prepared->SandboxPath));
+
+ ZEN_ERROR("Sandbox setup failed for action {}: {}", Action->ActionLsn, ErrBuf);
+
+ Action->SetActionState(RunnerAction::State::Failed);
+ return SubmitResult{.IsAccepted = false};
+ }
+ }
+
+ // Store child pid as void* (same convention as zencore/process.cpp)
+
+ Ref<RunningAction> NewAction{new RunningAction()};
+ NewAction->Action = Action;
+ NewAction->ProcessHandle = reinterpret_cast<void*>(static_cast<intptr_t>(ChildPid));
+ NewAction->SandboxPath = std::move(Prepared->SandboxPath);
+
+ {
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+ m_RunningMap[Prepared->ActionLsn] = std::move(NewAction);
+ }
+
+ Action->SetActionState(RunnerAction::State::Running);
+
+ return SubmitResult{.IsAccepted = true};
+}
+
+void
+LinuxProcessRunner::SweepRunningActions()
+{
+ ZEN_TRACE_CPU("LinuxProcessRunner::SweepRunningActions");
+ std::vector<Ref<RunningAction>> CompletedActions;
+
+ m_RunningLock.WithExclusiveLock([&] {
+ for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd;)
+ {
+ Ref<RunningAction> Running = It->second;
+
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running->ProcessHandle));
+ int Status = 0;
+
+ pid_t Result = waitpid(Pid, &Status, WNOHANG);
+
+ if (Result == Pid)
+ {
+ if (WIFEXITED(Status))
+ {
+ Running->ExitCode = WEXITSTATUS(Status);
+ }
+ else if (WIFSIGNALED(Status))
+ {
+ Running->ExitCode = 128 + WTERMSIG(Status);
+ }
+ else
+ {
+ Running->ExitCode = 1;
+ }
+
+ Running->ProcessHandle = nullptr;
+
+ CompletedActions.push_back(std::move(Running));
+ It = m_RunningMap.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+ });
+
+ ProcessCompletedActions(CompletedActions);
+}
+
+void
+LinuxProcessRunner::CancelRunningActions()
+{
+ ZEN_TRACE_CPU("LinuxProcessRunner::CancelRunningActions");
+ Stopwatch Timer;
+ std::unordered_map<int, Ref<RunningAction>> RunningMap;
+
+ m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); });
+
+ if (RunningMap.empty())
+ {
+ return;
+ }
+
+ ZEN_INFO("cancelling all running actions");
+
+ // Send SIGTERM to all running processes first
+
+ for (const auto& [Lsn, Running] : RunningMap)
+ {
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running->ProcessHandle));
+
+ if (kill(Pid, SIGTERM) != 0)
+ {
+ ZEN_WARN("kill(SIGTERM) for LSN {} (pid {}) failed: {}", Running->Action->ActionLsn, Pid, strerror(errno));
+ }
+ }
+
+ // Wait for all processes, regardless of whether SIGTERM succeeded, then clean up.
+
+ for (auto& [Lsn, Running] : RunningMap)
+ {
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running->ProcessHandle));
+
+ // Poll for up to 2 seconds
+ bool Exited = false;
+ for (int i = 0; i < 20; ++i)
+ {
+ int Status = 0;
+ pid_t WaitResult = waitpid(Pid, &Status, WNOHANG);
+ if (WaitResult == Pid)
+ {
+ Exited = true;
+ ZEN_DEBUG("LSN {}: process exit OK", Running->Action->ActionLsn);
+ break;
+ }
+ usleep(100000); // 100ms
+ }
+
+ if (!Exited)
+ {
+ ZEN_WARN("LSN {}: process did not exit after SIGTERM, sending SIGKILL", Running->Action->ActionLsn);
+ kill(Pid, SIGKILL);
+ waitpid(Pid, nullptr, 0);
+ }
+
+ m_DeferredDeleter.Enqueue(Running->Action->ActionLsn, std::move(Running->SandboxPath));
+ Running->Action->SetActionState(RunnerAction::State::Failed);
+ }
+
+ ZEN_INFO("DONE - cancelled {} running processes (took {})", RunningMap.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+}
+
+bool
+LinuxProcessRunner::CancelAction(int ActionLsn)
+{
+ ZEN_TRACE_CPU("LinuxProcessRunner::CancelAction");
+
+ // Hold the shared lock while sending the signal to prevent the sweep thread
+ // from reaping the PID (via waitpid) between our lookup and kill(). Without
+ // the lock held, the PID could be recycled by the kernel and we'd signal an
+ // unrelated process.
+ bool Sent = false;
+
+ m_RunningLock.WithSharedLock([&] {
+ auto It = m_RunningMap.find(ActionLsn);
+ if (It == m_RunningMap.end())
+ {
+ return;
+ }
+
+ Ref<RunningAction> Target = It->second;
+ if (!Target->ProcessHandle)
+ {
+ return;
+ }
+
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Target->ProcessHandle));
+
+ if (kill(Pid, SIGTERM) != 0)
+ {
+ ZEN_WARN("CancelAction: kill(SIGTERM) for LSN {} (pid {}) failed: {}", ActionLsn, Pid, strerror(errno));
+ return;
+ }
+
+ ZEN_DEBUG("CancelAction: sent SIGTERM to LSN {} (pid {})", ActionLsn, Pid);
+ Sent = true;
+ });
+
+ // The monitor thread will pick up the process exit and mark the action as Failed.
+ return Sent;
+}
+
+static uint64_t
+ReadProcStatCpuTicks(pid_t Pid)
+{
+ char Path[64];
+ snprintf(Path, sizeof(Path), "/proc/%d/stat", static_cast<int>(Pid));
+
+ char Buf[256];
+ int Fd = open(Path, O_RDONLY);
+ if (Fd < 0)
+ {
+ return 0;
+ }
+
+ ssize_t Len = read(Fd, Buf, sizeof(Buf) - 1);
+ close(Fd);
+
+ if (Len <= 0)
+ {
+ return 0;
+ }
+
+ Buf[Len] = '\0';
+
+ // Skip past "pid (name) " — find last ')' to handle names containing spaces or parens
+ const char* P = strrchr(Buf, ')');
+ if (!P)
+ {
+ return 0;
+ }
+
+ P += 2; // skip ') '
+
+ // Remaining fields (space-separated, 0-indexed from here):
+ // 0:state 1:ppid 2:pgrp 3:session 4:tty_nr 5:tty_pgrp 6:flags
+ // 7:minflt 8:cminflt 9:majflt 10:cmajflt 11:utime 12:stime
+ unsigned long UTime = 0;
+ unsigned long STime = 0;
+ sscanf(P, "%*c %*d %*d %*d %*d %*d %*u %*u %*u %*u %*u %lu %lu", &UTime, &STime);
+ return UTime + STime;
+}
+
+void
+LinuxProcessRunner::SampleProcessCpu(RunningAction& Running)
+{
+ static const long ClkTck = sysconf(_SC_CLK_TCK);
+
+ const pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running.ProcessHandle));
+
+ const uint64_t NowTicks = GetHifreqTimerValue();
+ const uint64_t CurrentOsTicks = ReadProcStatCpuTicks(Pid);
+
+ if (CurrentOsTicks == 0)
+ {
+ // Process gone or /proc entry unreadable — record timestamp without updating usage
+ Running.LastCpuSampleTicks = NowTicks;
+ Running.LastCpuOsTicks = 0;
+ return;
+ }
+
+ // Cumulative CPU seconds (absolute, available from first sample)
+ Running.Action->CpuSeconds.store(static_cast<float>(static_cast<double>(CurrentOsTicks) / ClkTck), std::memory_order_relaxed);
+
+ if (Running.LastCpuSampleTicks != 0 && Running.LastCpuOsTicks != 0)
+ {
+ const uint64_t ElapsedMs = Stopwatch::GetElapsedTimeMs(NowTicks - Running.LastCpuSampleTicks);
+ if (ElapsedMs > 0)
+ {
+ const uint64_t DeltaOsTicks = CurrentOsTicks - Running.LastCpuOsTicks;
+ const float CpuPct = static_cast<float>(static_cast<double>(DeltaOsTicks) * 1000.0 / ClkTck / ElapsedMs * 100.0);
+ Running.Action->CpuUsagePercent.store(CpuPct, std::memory_order_relaxed);
+ }
+ }
+
+ Running.LastCpuSampleTicks = NowTicks;
+ Running.LastCpuOsTicks = CurrentOsTicks;
+}
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/linuxrunner.h b/src/zencompute/runners/linuxrunner.h
new file mode 100644
index 000000000..266de366b
--- /dev/null
+++ b/src/zencompute/runners/linuxrunner.h
@@ -0,0 +1,44 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "localrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES && ZEN_PLATFORM_LINUX
+
+namespace zen::compute {
+
+/** Native Linux process runner for executing Linux worker executables directly.
+
+ Subclasses LocalProcessRunner, reusing sandbox management, worker manifesting,
+ input/output handling, and monitor thread infrastructure. Overrides only the
+ platform-specific methods: process spawning, sweep, and cancellation.
+
+ When Sandboxed is true, child processes are isolated using Linux namespaces:
+ user, mount, and network namespaces are unshared so the child has no network
+ access and can only see the sandbox directory (with system libraries bind-mounted
+ read-only). This requires no special privileges thanks to user namespaces.
+ */
+class LinuxProcessRunner : public LocalProcessRunner
+{
+public:
+ LinuxProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool,
+ bool Sandboxed = false,
+ int32_t MaxConcurrentActions = 0);
+
+ [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
+ void SweepRunningActions() override;
+ void CancelRunningActions() override;
+ bool CancelAction(int ActionLsn) override;
+ void SampleProcessCpu(RunningAction& Running) override;
+
+private:
+ bool m_Sandboxed = false;
+};
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/localrunner.cpp b/src/zencompute/runners/localrunner.cpp
new file mode 100644
index 000000000..7aaefb06e
--- /dev/null
+++ b/src/zencompute/runners/localrunner.cpp
@@ -0,0 +1,674 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "localrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except_fmt.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/system.h>
+# include <zencore/scopeguard.h>
+# include <zencore/timer.h>
+# include <zencore/trace.h>
+# include <zenstore/cidstore.h>
+
+# include <span>
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+LocalProcessRunner::LocalProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool,
+ int32_t MaxConcurrentActions)
+: FunctionRunner(BaseDir)
+, m_Log(logging::Get("local_exec"))
+, m_ChunkResolver(Resolver)
+, m_WorkerPath(std::filesystem::weakly_canonical(BaseDir / "workers"))
+, m_SandboxPath(std::filesystem::weakly_canonical(BaseDir / "scratch"))
+, m_DeferredDeleter(Deleter)
+, m_WorkerPool(WorkerPool)
+{
+ SystemMetrics Sm = GetSystemMetricsForReporting();
+
+ m_MaxRunningActions = Sm.LogicalProcessorCount * 2;
+
+ if (MaxConcurrentActions > 0)
+ {
+ m_MaxRunningActions = MaxConcurrentActions;
+ }
+
+ ZEN_INFO("Max concurrent action count: {}", m_MaxRunningActions);
+
+ bool DidCleanup = false;
+
+ if (std::filesystem::is_directory(m_ActionsPath))
+ {
+ ZEN_INFO("Cleaning '{}'", m_ActionsPath);
+
+ std::error_code Ec;
+ CleanDirectory(m_ActionsPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to clean '{}': {}", m_ActionsPath, Ec.message());
+ }
+
+ DidCleanup = true;
+ }
+
+ if (std::filesystem::is_directory(m_SandboxPath))
+ {
+ ZEN_INFO("Cleaning '{}'", m_SandboxPath);
+ std::error_code Ec;
+ CleanDirectory(m_SandboxPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to clean '{}': {}", m_SandboxPath, Ec.message());
+ }
+
+ DidCleanup = true;
+ }
+
+ // We clean out all workers on startup since we can't know they are good. They could be bad
+ // due to tampering, malware (which I also mean to include AV and antimalware software) or
+ // other processes we have no control over
+ if (std::filesystem::is_directory(m_WorkerPath))
+ {
+ ZEN_INFO("Cleaning '{}'", m_WorkerPath);
+ std::error_code Ec;
+ CleanDirectory(m_WorkerPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to clean '{}': {}", m_WorkerPath, Ec.message());
+ }
+
+ DidCleanup = true;
+ }
+
+ if (DidCleanup)
+ {
+ ZEN_INFO("Cleanup complete");
+ }
+
+ m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this};
+
+# if ZEN_PLATFORM_WINDOWS
+ // Suppress any error dialogs caused by missing dependencies
+ UINT OldMode = ::SetErrorMode(0);
+ ::SetErrorMode(OldMode | SEM_FAILCRITICALERRORS);
+# endif
+
+ m_AcceptNewActions = true;
+}
+
+LocalProcessRunner::~LocalProcessRunner()
+{
+ try
+ {
+ Shutdown();
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("exception during local process runner shutdown: {}", Ex.what());
+ }
+}
+
+void
+LocalProcessRunner::Shutdown()
+{
+ ZEN_TRACE_CPU("LocalProcessRunner::Shutdown");
+ m_AcceptNewActions = false;
+
+ m_MonitorThreadEnabled = false;
+ m_MonitorThreadEvent.Set();
+ if (m_MonitorThread.joinable())
+ {
+ m_MonitorThread.join();
+ }
+
+ CancelRunningActions();
+}
+
+std::filesystem::path
+LocalProcessRunner::CreateNewSandbox()
+{
+ ZEN_TRACE_CPU("LocalProcessRunner::CreateNewSandbox");
+ std::string UniqueId = std::to_string(++m_SandboxCounter);
+ std::filesystem::path Path = m_SandboxPath / UniqueId;
+ zen::CreateDirectories(Path);
+
+ return Path;
+}
+
+void
+LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage)
+{
+ ZEN_TRACE_CPU("LocalProcessRunner::RegisterWorker");
+ if (m_DumpActions)
+ {
+ CbObject WorkerDescriptor = WorkerPackage.GetObject();
+ const IoHash& WorkerId = WorkerPackage.GetObjectHash();
+
+ std::string UniqueId = fmt::format("worker_{}"sv, WorkerId);
+ std::filesystem::path Path = m_ActionsPath / UniqueId;
+
+ zen::WriteFile(Path / "worker.ucb", WorkerDescriptor.GetBuffer().AsIoBuffer());
+
+ ManifestWorker(WorkerPackage, Path / "tree", [&](const IoHash& Cid, CompressedBuffer& ChunkBuffer) {
+ std::filesystem::path ChunkPath = Path / "chunks" / Cid.ToHexString();
+ zen::WriteFile(ChunkPath, ChunkBuffer.GetCompressed());
+ });
+
+ ZEN_INFO("dumped worker '{}' to 'file://{}'", WorkerId, Path);
+ }
+}
+
+size_t
+LocalProcessRunner::QueryCapacity()
+{
+ // Estimate how much more work we're ready to accept
+
+ RwLock::SharedLockScope _{m_RunningLock};
+
+ if (!m_AcceptNewActions)
+ {
+ return 0;
+ }
+
+ const size_t InFlightCount = m_RunningMap.size() + m_SubmittingCount.load(std::memory_order_relaxed);
+
+ if (const size_t MaxRunningActions = m_MaxRunningActions; InFlightCount >= MaxRunningActions)
+ {
+ return 0;
+ }
+ else
+ {
+ return MaxRunningActions - InFlightCount;
+ }
+}
+
+std::vector<SubmitResult>
+LocalProcessRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ if (Actions.size() <= 1)
+ {
+ std::vector<SubmitResult> Results;
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Results.push_back(SubmitAction(Action));
+ }
+
+ return Results;
+ }
+
+ // For nontrivial batches, check capacity upfront and accept what fits.
+ // Accepted actions are transitioned to Submitting and dispatched to the
+ // worker pool as fire-and-forget, so SubmitActions returns immediately
+ // and the scheduler thread is free to handle completions and updates.
+
+ size_t Available = QueryCapacity();
+
+ std::vector<SubmitResult> Results(Actions.size());
+
+ size_t AcceptCount = std::min(Available, Actions.size());
+
+ for (size_t i = 0; i < AcceptCount; ++i)
+ {
+ const Ref<RunnerAction>& Action = Actions[i];
+
+ Action->SetActionState(RunnerAction::State::Submitting);
+ m_SubmittingCount.fetch_add(1, std::memory_order_relaxed);
+
+ Results[i] = SubmitResult{.IsAccepted = true};
+
+ m_WorkerPool.ScheduleWork(
+ [this, Action]() {
+ auto CountGuard = MakeGuard([this] { m_SubmittingCount.fetch_sub(1, std::memory_order_relaxed); });
+
+ SubmitResult Result = SubmitAction(Action);
+
+ if (!Result.IsAccepted)
+ {
+ // This might require another state? We should
+ // distinguish between outright rejections (e.g. invalid action)
+ // and transient failures (e.g. failed to launch process) which might
+ // be retried by the scheduler, but for now just fail the action
+ Action->SetActionState(RunnerAction::State::Failed);
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+
+ for (size_t i = AcceptCount; i < Actions.size(); ++i)
+ {
+ Results[i] = SubmitResult{.IsAccepted = false};
+ }
+
+ return Results;
+}
+
+std::optional<LocalProcessRunner::PreparedAction>
+LocalProcessRunner::PrepareActionSubmission(Ref<RunnerAction> Action)
+{
+ ZEN_TRACE_CPU("LocalProcessRunner::PrepareActionSubmission");
+
+ // Verify whether we can accept more work
+
+ {
+ RwLock::SharedLockScope _{m_RunningLock};
+
+ if (!m_AcceptNewActions)
+ {
+ return std::nullopt;
+ }
+
+ if (m_RunningMap.size() >= size_t(m_MaxRunningActions))
+ {
+ return std::nullopt;
+ }
+ }
+
+ // Each enqueued action is assigned an integer index (logical sequence number),
+ // which we use as a key for tracking data structures and as an opaque id which
+ // may be used by clients to reference the scheduled action
+
+ const int32_t ActionLsn = Action->ActionLsn;
+ const CbObject& ActionObj = Action->ActionObj;
+
+ MaybeDumpAction(ActionLsn, ActionObj);
+
+ std::filesystem::path SandboxPath = CreateNewSandbox();
+
+ // Ensure the sandbox directory is cleaned up if any subsequent step throws
+ auto SandboxGuard = MakeGuard([&] { m_DeferredDeleter.Enqueue(Action->ActionLsn, std::move(SandboxPath)); });
+
+ CbPackage WorkerPackage = Action->Worker.Descriptor;
+
+ std::filesystem::path WorkerPath = ManifestWorker(Action->Worker);
+
+ // Write out action
+
+ zen::WriteFile(SandboxPath / "build.action", ActionObj.GetBuffer().AsIoBuffer());
+
+ // Manifest inputs in sandbox
+
+ ActionObj.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Cid = Field.AsHash();
+ std::filesystem::path FilePath{SandboxPath / "Inputs"sv / Cid.ToHexString()};
+ IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(Cid);
+
+ if (!DataBuffer)
+ {
+ throw std::runtime_error(fmt::format("input CID chunk '{}' missing", Cid));
+ }
+
+ zen::WriteFile(FilePath, DataBuffer);
+ });
+
+ Action->ExecutionLocation = "local";
+
+ SandboxGuard.Dismiss();
+
+ return PreparedAction{
+ .ActionLsn = ActionLsn,
+ .SandboxPath = std::move(SandboxPath),
+ .WorkerPath = std::move(WorkerPath),
+ .WorkerPackage = std::move(WorkerPackage),
+ };
+}
+
+SubmitResult
+LocalProcessRunner::SubmitAction(Ref<RunnerAction> Action)
+{
+ // Base class is not directly usable — platform subclasses override this
+ ZEN_UNUSED(Action);
+ return SubmitResult{.IsAccepted = false};
+}
+
+size_t
+LocalProcessRunner::GetSubmittedActionCount()
+{
+ RwLock::SharedLockScope _(m_RunningLock);
+ return m_RunningMap.size();
+}
+
+std::filesystem::path
+LocalProcessRunner::ManifestWorker(const WorkerDesc& Worker)
+{
+ ZEN_TRACE_CPU("LocalProcessRunner::ManifestWorker");
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ std::filesystem::path WorkerDir = m_WorkerPath / fmt::format("runner_{}", Worker.WorkerId);
+
+ if (!std::filesystem::exists(WorkerDir))
+ {
+ _.ReleaseNow();
+
+ RwLock::ExclusiveLockScope $(m_WorkerLock);
+
+ if (!std::filesystem::exists(WorkerDir))
+ {
+ ManifestWorker(Worker.Descriptor, WorkerDir, [](const IoHash&, CompressedBuffer&) {});
+ }
+ }
+
+ return WorkerDir;
+}
+
+void
+LocalProcessRunner::DecompressAttachmentToFile(const CbPackage& FromPackage,
+ CbObjectView FileEntry,
+ const std::filesystem::path& SandboxRootPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback)
+{
+ std::string_view Name = FileEntry["name"sv].AsString();
+ const IoHash ChunkHash = FileEntry["hash"sv].AsHash();
+ const uint64_t Size = FileEntry["size"sv].AsUInt64();
+
+ CompressedBuffer Compressed;
+
+ if (const CbAttachment* Attachment = FromPackage.FindAttachment(ChunkHash))
+ {
+ Compressed = Attachment->AsCompressedBinary();
+ }
+ else
+ {
+ IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(ChunkHash);
+
+ if (!DataBuffer)
+ {
+ throw std::runtime_error(fmt::format("worker chunk '{}' missing", ChunkHash));
+ }
+
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ Compressed = CompressedBuffer::FromCompressed(SharedBuffer{DataBuffer}, DataRawHash, DataRawSize);
+
+ if (DataRawSize != Size)
+ {
+ throw std::runtime_error(
+ fmt::format("worker chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size));
+ }
+ }
+
+ ChunkReferenceCallback(ChunkHash, Compressed);
+
+ std::filesystem::path FilePath{SandboxRootPath / std::filesystem::path(Name).make_preferred()};
+
+ // Validate the resolved path stays within the sandbox to prevent directory traversal
+ // via malicious names like "../../etc/evil"
+ //
+ // This might be worth revisiting to frontload the validation and eliminate some memory
+ // allocations in the future.
+ {
+ std::filesystem::path CanonicalRoot = std::filesystem::weakly_canonical(SandboxRootPath);
+ std::filesystem::path CanonicalFile = std::filesystem::weakly_canonical(FilePath);
+ std::string RootStr = CanonicalRoot.string();
+ std::string FileStr = CanonicalFile.string();
+
+ if (FileStr.size() < RootStr.size() || FileStr.compare(0, RootStr.size(), RootStr) != 0)
+ {
+ throw zen::runtime_error("path traversal detected: '{}' escapes sandbox root '{}'", Name, SandboxRootPath);
+ }
+ }
+
+ SharedBuffer Decompressed = Compressed.Decompress();
+ zen::WriteFile(FilePath, Decompressed.AsIoBuffer());
+}
+
+void
+LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage,
+ const std::filesystem::path& SandboxPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback)
+{
+ CbObject WorkerDescription = WorkerPackage.GetObject();
+
+ // Manifest worker in Sandbox
+
+ for (auto& It : WorkerDescription["executables"sv])
+ {
+ DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback);
+# if !ZEN_PLATFORM_WINDOWS
+ std::string_view ExeName = It.AsObjectView()["name"sv].AsString();
+ std::filesystem::path ExePath{SandboxPath / std::filesystem::path(ExeName).make_preferred()};
+ std::filesystem::permissions(
+ ExePath,
+ std::filesystem::perms::owner_exec | std::filesystem::perms::group_exec | std::filesystem::perms::others_exec,
+ std::filesystem::perm_options::add);
+# endif
+ }
+
+ for (auto& It : WorkerDescription["dirs"sv])
+ {
+ std::string_view Name = It.AsString();
+ std::filesystem::path DirPath{SandboxPath / std::filesystem::path(Name).make_preferred()};
+
+ // Validate dir path stays within sandbox
+ {
+ std::filesystem::path CanonicalRoot = std::filesystem::weakly_canonical(SandboxPath);
+ std::filesystem::path CanonicalDir = std::filesystem::weakly_canonical(DirPath);
+ std::string RootStr = CanonicalRoot.string();
+ std::string DirStr = CanonicalDir.string();
+
+ if (DirStr.size() < RootStr.size() || DirStr.compare(0, RootStr.size(), RootStr) != 0)
+ {
+ throw zen::runtime_error("path traversal detected: dir '{}' escapes sandbox root '{}'", Name, SandboxPath);
+ }
+ }
+
+ zen::CreateDirectories(DirPath);
+ }
+
+ for (auto& It : WorkerDescription["files"sv])
+ {
+ DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback);
+ }
+
+ WriteFile(SandboxPath / "worker.zcb", WorkerDescription.GetBuffer().AsIoBuffer());
+}
+
+CbPackage
+LocalProcessRunner::GatherActionOutputs(std::filesystem::path SandboxPath)
+{
+ ZEN_TRACE_CPU("LocalProcessRunner::GatherActionOutputs");
+ std::filesystem::path OutputFile = SandboxPath / "build.output";
+ FileContents OutputData = zen::ReadFile(OutputFile);
+
+ if (OutputData.ErrorCode)
+ {
+ throw std::system_error(OutputData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputFile));
+ }
+
+ CbPackage OutputPackage;
+ CbObject Output = zen::LoadCompactBinaryObject(OutputData.Flatten());
+
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalRawAttachmentBytes = 0;
+
+ Output.IterateAttachments([&](CbFieldView Field) {
+ IoHash Hash = Field.AsHash();
+ std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()};
+ FileContents ChunkData = zen::ReadFile(OutputPath);
+
+ if (ChunkData.ErrorCode)
+ {
+ throw std::system_error(ChunkData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputPath));
+ }
+
+ uint64_t ChunkDataRawSize = 0;
+ IoHash ChunkDataHash;
+ CompressedBuffer AttachmentBuffer =
+ CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Flatten()), ChunkDataHash, ChunkDataRawSize);
+
+ if (!AttachmentBuffer)
+ {
+ throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)");
+ }
+
+ TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
+ TotalRawAttachmentBytes += ChunkDataRawSize;
+
+ CbAttachment Attachment(std::move(AttachmentBuffer), ChunkDataHash);
+ OutputPackage.AddAttachment(Attachment);
+ });
+
+ OutputPackage.SetObject(Output);
+
+ ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)",
+ OutputPackage.GetAttachments().size(),
+ NiceBytes(TotalAttachmentBytes),
+ NiceBytes(TotalRawAttachmentBytes));
+
+ return OutputPackage;
+}
+
+void
+LocalProcessRunner::MonitorThreadFunction()
+{
+ SetCurrentThreadName("LocalProcessRunner_Monitor");
+
+ auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); });
+
+ do
+ {
+ // On Windows it's possible to wait on process handles, so we wait for either a process to exit
+ // or for the monitor event to be signaled (which indicates we should check for cancellation
+ // or shutdown). This could be further improved by using a completion port and registering process
+ // handles with it, but this is a reasonable first implementation given that we shouldn't be dealing
+ // with an enormous number of concurrent processes.
+ //
+ // On other platforms we just wait on the monitor event and poll for process exits at intervals.
+# if ZEN_PLATFORM_WINDOWS
+ auto WaitOnce = [&] {
+ HANDLE WaitHandles[MAXIMUM_WAIT_OBJECTS];
+
+ uint32_t NumHandles = 0;
+
+ WaitHandles[NumHandles++] = m_MonitorThreadEvent.GetWindowsHandle();
+
+ m_RunningLock.WithSharedLock([&] {
+ for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd && NumHandles < MAXIMUM_WAIT_OBJECTS; ++It)
+ {
+ Ref<RunningAction> Action = It->second;
+
+ WaitHandles[NumHandles++] = Action->ProcessHandle;
+ }
+ });
+
+ DWORD WaitResult = WaitForMultipleObjects(NumHandles, WaitHandles, FALSE, 1000);
+
+ // return true if a handle was signaled
+ return (WaitResult <= NumHandles);
+ };
+# else
+ auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(1000); };
+# endif
+
+ while (!WaitOnce())
+ {
+ if (m_MonitorThreadEnabled == false)
+ {
+ return;
+ }
+
+ SweepRunningActions();
+ SampleRunningProcessCpu();
+ }
+
+ // Signal received
+
+ SweepRunningActions();
+ SampleRunningProcessCpu();
+ } while (m_MonitorThreadEnabled);
+}
+
+void
+LocalProcessRunner::CancelRunningActions()
+{
+ // Base class is not directly usable — platform subclasses override this
+}
+
+void
+LocalProcessRunner::SampleRunningProcessCpu()
+{
+ static constexpr uint64_t kSampleIntervalMs = 5'000;
+
+ m_RunningLock.WithSharedLock([&] {
+ const uint64_t Now = GetHifreqTimerValue();
+ for (auto& [Lsn, Running] : m_RunningMap)
+ {
+ const bool NeverSampled = Running->LastCpuSampleTicks == 0;
+ const bool IntervalElapsed = Stopwatch::GetElapsedTimeMs(Now - Running->LastCpuSampleTicks) >= kSampleIntervalMs;
+ if (NeverSampled || IntervalElapsed)
+ {
+ SampleProcessCpu(*Running);
+ }
+ }
+ });
+}
+
+void
+LocalProcessRunner::SweepRunningActions()
+{
+ ZEN_TRACE_CPU("LocalProcessRunner::SweepRunningActions");
+}
+
+void
+LocalProcessRunner::ProcessCompletedActions(std::vector<Ref<RunningAction>>& CompletedActions)
+{
+ ZEN_TRACE_CPU("LocalProcessRunner::ProcessCompletedActions");
+ // Shared post-processing: gather outputs, set state, clean sandbox.
+ // Note that this must be called without holding any local locks
+ // otherwise we may end up with deadlocks.
+
+ for (Ref<RunningAction> Running : CompletedActions)
+ {
+ const int ActionLsn = Running->Action->ActionLsn;
+
+ if (Running->ExitCode == 0)
+ {
+ try
+ {
+ // Gather outputs
+
+ CbPackage OutputPackage = GatherActionOutputs(Running->SandboxPath);
+
+ Running->Action->SetResult(std::move(OutputPackage));
+ Running->Action->SetActionState(RunnerAction::State::Completed);
+
+ // Enqueue sandbox for deferred background deletion, giving
+ // file handles time to close before we attempt removal.
+ m_DeferredDeleter.Enqueue(ActionLsn, std::move(Running->SandboxPath));
+
+ // Success -- continue with next iteration of the loop
+ continue;
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Encountered failure while gathering outputs for action lsn {}, '{}'", ActionLsn, Ex.what());
+ }
+ }
+
+ // Failed - clean up the sandbox in the background.
+
+ m_DeferredDeleter.Enqueue(ActionLsn, std::move(Running->SandboxPath));
+ Running->Action->SetActionState(RunnerAction::State::Failed);
+ }
+}
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/localrunner.h b/src/zencompute/runners/localrunner.h
new file mode 100644
index 000000000..7493e980b
--- /dev/null
+++ b/src/zencompute/runners/localrunner.h
@@ -0,0 +1,138 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zencompute/computeservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "functionrunner.h"
+
+# include <zencore/thread.h>
+# include <zencore/zencore.h>
+# include <zenstore/cidstore.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/logging.h>
+
+# include "deferreddeleter.h"
+
+# include <zencore/workthreadpool.h>
+
+# include <atomic>
+# include <filesystem>
+# include <optional>
+# include <thread>
+
+namespace zen {
+class CbPackage;
+}
+
+namespace zen::compute {
+
+/** Direct process spawner
+
+ This runner simply sets up a directory structure for each job and
+ creates a process to perform the computation in it. It is not very
+ efficient and is intended mostly for testing.
+
+ */
+
+class LocalProcessRunner : public FunctionRunner
+{
+ LocalProcessRunner(LocalProcessRunner&&) = delete;
+ LocalProcessRunner& operator=(LocalProcessRunner&&) = delete;
+
+public:
+ LocalProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool,
+ int32_t MaxConcurrentActions = 0);
+ ~LocalProcessRunner();
+
+ virtual void Shutdown() override;
+ virtual void RegisterWorker(const CbPackage& WorkerPackage) override;
+ [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
+ [[nodiscard]] virtual bool IsHealthy() override { return true; }
+ [[nodiscard]] virtual size_t GetSubmittedActionCount() override;
+ [[nodiscard]] virtual size_t QueryCapacity() override;
+ [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;
+
+protected:
+ LoggerRef Log() { return m_Log; }
+
+ LoggerRef m_Log;
+
+ struct RunningAction : public RefCounted
+ {
+ Ref<RunnerAction> Action;
+ void* ProcessHandle = nullptr;
+ int ExitCode = 0;
+ std::filesystem::path SandboxPath;
+
+ // State for periodic CPU usage sampling
+ uint64_t LastCpuSampleTicks = 0; // hifreq timer value at last sample
+ uint64_t LastCpuOsTicks = 0; // OS CPU ticks (platform-specific units) at last sample
+ };
+
+ std::atomic_bool m_AcceptNewActions;
+ ChunkResolver& m_ChunkResolver;
+ RwLock m_WorkerLock;
+ std::filesystem::path m_WorkerPath;
+ std::atomic<int32_t> m_SandboxCounter = 0;
+ std::filesystem::path m_SandboxPath;
+ int32_t m_MaxRunningActions = 64; // arbitrary limit for testing
+
+ // if used in conjuction with m_ResultsLock, this lock must be taken *after*
+ // m_ResultsLock to avoid deadlocks
+ RwLock m_RunningLock;
+ std::unordered_map<int, Ref<RunningAction>> m_RunningMap;
+
+ std::atomic<int32_t> m_SubmittingCount = 0;
+ DeferredDirectoryDeleter& m_DeferredDeleter;
+ WorkerThreadPool& m_WorkerPool;
+
+ std::thread m_MonitorThread;
+ std::atomic<bool> m_MonitorThreadEnabled{true};
+ Event m_MonitorThreadEvent;
+ void MonitorThreadFunction();
+ virtual void SweepRunningActions();
+ virtual void CancelRunningActions();
+
+ // Sample CPU usage for all currently running processes (throttled per-action).
+ void SampleRunningProcessCpu();
+
+ // Override in platform runners to sample one process. Called under a shared RunningLock.
+ virtual void SampleProcessCpu(RunningAction& /*Running*/) {}
+
+ // Shared preamble for SubmitAction: capacity check, sandbox creation,
+ // worker manifesting, action writing, input manifesting.
+ struct PreparedAction
+ {
+ int32_t ActionLsn;
+ std::filesystem::path SandboxPath;
+ std::filesystem::path WorkerPath;
+ CbPackage WorkerPackage;
+ };
+ std::optional<PreparedAction> PrepareActionSubmission(Ref<RunnerAction> Action);
+
+ // Shared post-processing for SweepRunningActions: gather outputs,
+ // set state, clean sandbox.
+ void ProcessCompletedActions(std::vector<Ref<RunningAction>>& CompletedActions);
+
+ std::filesystem::path CreateNewSandbox();
+ void ManifestWorker(const CbPackage& WorkerPackage,
+ const std::filesystem::path& SandboxPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback);
+ std::filesystem::path ManifestWorker(const WorkerDesc& Worker);
+ CbPackage GatherActionOutputs(std::filesystem::path SandboxPath);
+
+ void DecompressAttachmentToFile(const CbPackage& FromPackage,
+ CbObjectView FileEntry,
+ const std::filesystem::path& SandboxRootPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback);
+};
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/macrunner.cpp b/src/zencompute/runners/macrunner.cpp
new file mode 100644
index 000000000..5cec90699
--- /dev/null
+++ b/src/zencompute/runners/macrunner.cpp
@@ -0,0 +1,491 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "macrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES && ZEN_PLATFORM_MAC
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/except.h>
+# include <zencore/except_fmt.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/timer.h>
+# include <zencore/trace.h>
+
+# include <fcntl.h>
+# include <libproc.h>
+# include <sandbox.h>
+# include <signal.h>
+# include <sys/wait.h>
+# include <unistd.h>
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+namespace {
+
+ // All helper functions in this namespace are async-signal-safe (safe to call
+ // between fork() and execve()). They use only raw syscalls and avoid any
+ // heap allocation, stdio, or other non-AS-safe operations.
+
+ void WriteToFd(int Fd, const char* Buf, size_t Len)
+ {
+ while (Len > 0)
+ {
+ ssize_t Written = write(Fd, Buf, Len);
+ if (Written <= 0)
+ {
+ break;
+ }
+ Buf += Written;
+ Len -= static_cast<size_t>(Written);
+ }
+ }
+
+ [[noreturn]] void WriteErrorAndExit(int ErrorPipeFd, const char* Msg, int Errno)
+ {
+ // Write the message prefix
+ size_t MsgLen = 0;
+ for (const char* P = Msg; *P; ++P)
+ {
+ ++MsgLen;
+ }
+ WriteToFd(ErrorPipeFd, Msg, MsgLen);
+
+ // Append ": " and the errno string if non-zero
+ if (Errno != 0)
+ {
+ WriteToFd(ErrorPipeFd, ": ", 2);
+ const char* ErrStr = strerror(Errno);
+ size_t ErrLen = 0;
+ for (const char* P = ErrStr; *P; ++P)
+ {
+ ++ErrLen;
+ }
+ WriteToFd(ErrorPipeFd, ErrStr, ErrLen);
+ }
+
+ _exit(127);
+ }
+
+ // Build a Seatbelt profile string that denies everything by default and
+ // allows only the minimum needed for the worker to execute: process ops,
+ // system library reads, worker directory (read-only), and sandbox directory
+ // (read-write). Network access is denied implicitly by the deny-default policy.
+ std::string BuildSandboxProfile(const std::string& SandboxPath, const std::string& WorkerPath)
+ {
+ std::string Profile;
+ Profile.reserve(1024);
+
+ Profile += "(version 1)\n";
+ Profile += "(deny default)\n";
+ Profile += "(allow process*)\n";
+ Profile += "(allow sysctl-read)\n";
+ Profile += "(allow file-read-metadata)\n";
+
+ // System library paths needed for dynamic linker and runtime
+ Profile += "(allow file-read* (subpath \"/usr\"))\n";
+ Profile += "(allow file-read* (subpath \"/System\"))\n";
+ Profile += "(allow file-read* (subpath \"/Library\"))\n";
+ Profile += "(allow file-read* (subpath \"/dev\"))\n";
+ Profile += "(allow file-read* (subpath \"/private/var/db/dyld\"))\n";
+ Profile += "(allow file-read* (subpath \"/etc\"))\n";
+
+ // Worker directory: read-only
+ Profile += "(allow file-read* (subpath \"";
+ Profile += WorkerPath;
+ Profile += "\"))\n";
+
+ // Sandbox directory: read+write
+ Profile += "(allow file-read* file-write* (subpath \"";
+ Profile += SandboxPath;
+ Profile += "\"))\n";
+
+ return Profile;
+ }
+
+} // anonymous namespace
+
+MacProcessRunner::MacProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool,
+ bool Sandboxed,
+ int32_t MaxConcurrentActions)
+: LocalProcessRunner(Resolver, BaseDir, Deleter, WorkerPool, MaxConcurrentActions)
+, m_Sandboxed(Sandboxed)
+{
+ // Restore SIGCHLD to default behavior so waitpid() can properly collect
+ // child exit status. zenserver/main.cpp sets SIGCHLD to SIG_IGN which
+ // causes the kernel to auto-reap children, making waitpid() return
+ // -1/ECHILD instead of the exit status we need.
+ struct sigaction Action = {};
+ sigemptyset(&Action.sa_mask);
+ Action.sa_handler = SIG_DFL;
+ sigaction(SIGCHLD, &Action, nullptr);
+
+ if (m_Sandboxed)
+ {
+ ZEN_INFO("Seatbelt sandboxing enabled for child processes");
+ }
+}
+
+SubmitResult
+MacProcessRunner::SubmitAction(Ref<RunnerAction> Action)
+{
+ ZEN_TRACE_CPU("MacProcessRunner::SubmitAction");
+ std::optional<PreparedAction> Prepared = PrepareActionSubmission(Action);
+
+ if (!Prepared)
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+
+ // Build environment array from worker descriptor
+
+ CbObject WorkerDescription = Prepared->WorkerPackage.GetObject();
+
+ std::vector<std::string> EnvStrings;
+ for (auto& It : WorkerDescription["environment"sv])
+ {
+ EnvStrings.emplace_back(It.AsString());
+ }
+
+ std::vector<char*> Envp;
+ Envp.reserve(EnvStrings.size() + 1);
+ for (auto& Str : EnvStrings)
+ {
+ Envp.push_back(Str.data());
+ }
+ Envp.push_back(nullptr);
+
+ // Build argv: <worker_exe_path> -Build=build.action
+
+ std::string_view ExecPath = WorkerDescription["path"sv].AsString();
+ std::filesystem::path ExePath = Prepared->WorkerPath / std::filesystem::path(ExecPath);
+ std::string ExePathStr = ExePath.string();
+ std::string BuildArg = "-Build=build.action";
+
+ std::vector<char*> ArgV;
+ ArgV.push_back(ExePathStr.data());
+ ArgV.push_back(BuildArg.data());
+ ArgV.push_back(nullptr);
+
+ ZEN_DEBUG("Executing: {} {} (sandboxed={})", ExePathStr, BuildArg, m_Sandboxed);
+
+ std::string SandboxPathStr = Prepared->SandboxPath.string();
+ std::string WorkerPathStr = Prepared->WorkerPath.string();
+
+ // Pre-fork: build sandbox profile and create error pipe
+ std::string SandboxProfile;
+ int ErrorPipe[2] = {-1, -1};
+
+ if (m_Sandboxed)
+ {
+ SandboxProfile = BuildSandboxProfile(SandboxPathStr, WorkerPathStr);
+
+ if (pipe(ErrorPipe) != 0)
+ {
+ throw zen::runtime_error("pipe() for sandbox error pipe failed: {}", strerror(errno));
+ }
+ fcntl(ErrorPipe[0], F_SETFD, FD_CLOEXEC);
+ fcntl(ErrorPipe[1], F_SETFD, FD_CLOEXEC);
+ }
+
+ pid_t ChildPid = fork();
+
+ if (ChildPid < 0)
+ {
+ int SavedErrno = errno;
+ if (m_Sandboxed)
+ {
+ close(ErrorPipe[0]);
+ close(ErrorPipe[1]);
+ }
+ throw zen::runtime_error("fork() failed: {}", strerror(SavedErrno));
+ }
+
+ if (ChildPid == 0)
+ {
+ // Child process
+
+ if (m_Sandboxed)
+ {
+ // Close read end of error pipe — child only writes
+ close(ErrorPipe[0]);
+
+ // Apply Seatbelt sandbox profile
+ char* ErrorBuf = nullptr;
+ if (sandbox_init(SandboxProfile.c_str(), 0, &ErrorBuf) != 0)
+ {
+ // sandbox_init failed — write error to pipe and exit
+ if (ErrorBuf)
+ {
+ WriteErrorAndExit(ErrorPipe[1], ErrorBuf, 0);
+ // WriteErrorAndExit does not return, but sandbox_free_error
+ // is not needed since we _exit
+ }
+ WriteErrorAndExit(ErrorPipe[1], "sandbox_init failed", errno);
+ }
+ if (ErrorBuf)
+ {
+ sandbox_free_error(ErrorBuf);
+ }
+
+ if (chdir(SandboxPathStr.c_str()) != 0)
+ {
+ WriteErrorAndExit(ErrorPipe[1], "chdir to sandbox failed", errno);
+ }
+
+ execve(ExePathStr.c_str(), ArgV.data(), Envp.data());
+
+ WriteErrorAndExit(ErrorPipe[1], "execve failed", errno);
+ }
+ else
+ {
+ if (chdir(SandboxPathStr.c_str()) != 0)
+ {
+ _exit(127);
+ }
+
+ execve(ExePathStr.c_str(), ArgV.data(), Envp.data());
+ _exit(127);
+ }
+ }
+
+ // Parent process
+
+ if (m_Sandboxed)
+ {
+ // Close write end of error pipe — parent only reads
+ close(ErrorPipe[1]);
+
+ // Read from error pipe. If execve succeeded, pipe was closed by O_CLOEXEC
+ // and read returns 0. If setup failed, child wrote an error message.
+ char ErrBuf[512];
+ ssize_t BytesRead = read(ErrorPipe[0], ErrBuf, sizeof(ErrBuf) - 1);
+ close(ErrorPipe[0]);
+
+ if (BytesRead > 0)
+ {
+ // Sandbox setup or execve failed
+ ErrBuf[BytesRead] = '\0';
+
+ // Reap the child (it called _exit(127))
+ waitpid(ChildPid, nullptr, 0);
+
+ // Clean up the sandbox in the background
+ m_DeferredDeleter.Enqueue(Action->ActionLsn, std::move(Prepared->SandboxPath));
+
+ ZEN_ERROR("Sandbox setup failed for action {}: {}", Action->ActionLsn, ErrBuf);
+
+ Action->SetActionState(RunnerAction::State::Failed);
+ return SubmitResult{.IsAccepted = false};
+ }
+ }
+
+ // Store child pid as void* (same convention as zencore/process.cpp)
+
+ Ref<RunningAction> NewAction{new RunningAction()};
+ NewAction->Action = Action;
+ NewAction->ProcessHandle = reinterpret_cast<void*>(static_cast<intptr_t>(ChildPid));
+ NewAction->SandboxPath = std::move(Prepared->SandboxPath);
+
+ {
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+ m_RunningMap[Prepared->ActionLsn] = std::move(NewAction);
+ }
+
+ Action->SetActionState(RunnerAction::State::Running);
+
+ return SubmitResult{.IsAccepted = true};
+}
+
+void
+MacProcessRunner::SweepRunningActions()
+{
+ ZEN_TRACE_CPU("MacProcessRunner::SweepRunningActions");
+ std::vector<Ref<RunningAction>> CompletedActions;
+
+ m_RunningLock.WithExclusiveLock([&] {
+ for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd;)
+ {
+ Ref<RunningAction> Running = It->second;
+
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running->ProcessHandle));
+ int Status = 0;
+
+ pid_t Result = waitpid(Pid, &Status, WNOHANG);
+
+ if (Result == Pid)
+ {
+ if (WIFEXITED(Status))
+ {
+ Running->ExitCode = WEXITSTATUS(Status);
+ }
+ else if (WIFSIGNALED(Status))
+ {
+ Running->ExitCode = 128 + WTERMSIG(Status);
+ }
+ else
+ {
+ Running->ExitCode = 1;
+ }
+
+ Running->ProcessHandle = nullptr;
+
+ CompletedActions.push_back(std::move(Running));
+ It = m_RunningMap.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+ });
+
+ ProcessCompletedActions(CompletedActions);
+}
+
+void
+MacProcessRunner::CancelRunningActions()
+{
+ ZEN_TRACE_CPU("MacProcessRunner::CancelRunningActions");
+ Stopwatch Timer;
+ std::unordered_map<int, Ref<RunningAction>> RunningMap;
+
+ m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); });
+
+ if (RunningMap.empty())
+ {
+ return;
+ }
+
+ ZEN_INFO("cancelling all running actions");
+
+ // Send SIGTERM to all running processes first
+
+ for (const auto& [Lsn, Running] : RunningMap)
+ {
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running->ProcessHandle));
+
+ if (kill(Pid, SIGTERM) != 0)
+ {
+ ZEN_WARN("kill(SIGTERM) for LSN {} (pid {}) failed: {}", Running->Action->ActionLsn, Pid, strerror(errno));
+ }
+ }
+
+ // Wait for all processes, regardless of whether SIGTERM succeeded, then clean up.
+
+ for (auto& [Lsn, Running] : RunningMap)
+ {
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running->ProcessHandle));
+
+ // Poll for up to 2 seconds
+ bool Exited = false;
+ for (int i = 0; i < 20; ++i)
+ {
+ int Status = 0;
+ pid_t WaitResult = waitpid(Pid, &Status, WNOHANG);
+ if (WaitResult == Pid)
+ {
+ Exited = true;
+ ZEN_DEBUG("LSN {}: process exit OK", Running->Action->ActionLsn);
+ break;
+ }
+ usleep(100000); // 100ms
+ }
+
+ if (!Exited)
+ {
+ ZEN_WARN("LSN {}: process did not exit after SIGTERM, sending SIGKILL", Running->Action->ActionLsn);
+ kill(Pid, SIGKILL);
+ waitpid(Pid, nullptr, 0);
+ }
+
+ m_DeferredDeleter.Enqueue(Running->Action->ActionLsn, std::move(Running->SandboxPath));
+ Running->Action->SetActionState(RunnerAction::State::Failed);
+ }
+
+ ZEN_INFO("DONE - cancelled {} running processes (took {})", RunningMap.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+}
+
+bool
+MacProcessRunner::CancelAction(int ActionLsn)
+{
+ ZEN_TRACE_CPU("MacProcessRunner::CancelAction");
+
+ // Hold the shared lock while sending the signal to prevent the sweep thread
+ // from reaping the PID (via waitpid) between our lookup and kill(). Without
+ // the lock held, the PID could be recycled by the kernel and we'd signal an
+ // unrelated process.
+ bool Sent = false;
+
+ m_RunningLock.WithSharedLock([&] {
+ auto It = m_RunningMap.find(ActionLsn);
+ if (It == m_RunningMap.end())
+ {
+ return;
+ }
+
+ Ref<RunningAction> Target = It->second;
+ if (!Target->ProcessHandle)
+ {
+ return;
+ }
+
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Target->ProcessHandle));
+
+ if (kill(Pid, SIGTERM) != 0)
+ {
+ ZEN_WARN("CancelAction: kill(SIGTERM) for LSN {} (pid {}) failed: {}", ActionLsn, Pid, strerror(errno));
+ return;
+ }
+
+ ZEN_DEBUG("CancelAction: sent SIGTERM to LSN {} (pid {})", ActionLsn, Pid);
+ Sent = true;
+ });
+
+ // The monitor thread will pick up the process exit and mark the action as Failed.
+ return Sent;
+}
+
+void
+MacProcessRunner::SampleProcessCpu(RunningAction& Running)
+{
+ const pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running.ProcessHandle));
+
+ struct proc_taskinfo Info;
+ if (proc_pidinfo(Pid, PROC_PIDTASKINFO, 0, &Info, sizeof(Info)) <= 0)
+ {
+ return;
+ }
+
+ // pti_total_user and pti_total_system are in nanoseconds
+ const uint64_t CurrentOsTicks = Info.pti_total_user + Info.pti_total_system;
+ const uint64_t NowTicks = GetHifreqTimerValue();
+
+ // Cumulative CPU seconds (absolute, available from first sample): ns → seconds
+ Running.Action->CpuSeconds.store(static_cast<float>(static_cast<double>(CurrentOsTicks) / 1'000'000'000.0), std::memory_order_relaxed);
+
+ if (Running.LastCpuSampleTicks != 0 && Running.LastCpuOsTicks != 0)
+ {
+ const uint64_t ElapsedMs = Stopwatch::GetElapsedTimeMs(NowTicks - Running.LastCpuSampleTicks);
+ if (ElapsedMs > 0)
+ {
+ const uint64_t DeltaOsTicks = CurrentOsTicks - Running.LastCpuOsTicks;
+ // ns → ms: divide by 1,000,000; then as percent of elapsed ms
+ const float CpuPct = static_cast<float>(static_cast<double>(DeltaOsTicks) / 1'000'000.0 / ElapsedMs * 100.0);
+ Running.Action->CpuUsagePercent.store(CpuPct, std::memory_order_relaxed);
+ }
+ }
+
+ Running.LastCpuSampleTicks = NowTicks;
+ Running.LastCpuOsTicks = CurrentOsTicks;
+}
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/macrunner.h b/src/zencompute/runners/macrunner.h
new file mode 100644
index 000000000..d653b923a
--- /dev/null
+++ b/src/zencompute/runners/macrunner.h
@@ -0,0 +1,43 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "localrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES && ZEN_PLATFORM_MAC
+
+namespace zen::compute {
+
+/** Native macOS process runner for executing Mac worker executables directly.
+
+ Subclasses LocalProcessRunner, reusing sandbox management, worker manifesting,
+ input/output handling, and monitor thread infrastructure. Overrides only the
+ platform-specific methods: process spawning, sweep, and cancellation.
+
+ When Sandboxed is true, child processes are isolated using macOS Seatbelt
+ (sandbox_init): no network access and no filesystem access outside the
+ explicitly allowed sandbox and worker directories. This requires no elevation.
+ */
+class MacProcessRunner : public LocalProcessRunner
+{
+public:
+ MacProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool,
+ bool Sandboxed = false,
+ int32_t MaxConcurrentActions = 0);
+
+ [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
+ void SweepRunningActions() override;
+ void CancelRunningActions() override;
+ bool CancelAction(int ActionLsn) override;
+ void SampleProcessCpu(RunningAction& Running) override;
+
+private:
+ bool m_Sandboxed = false;
+};
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp
new file mode 100644
index 000000000..672636d06
--- /dev/null
+++ b/src/zencompute/runners/remotehttprunner.cpp
@@ -0,0 +1,618 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "remotehttprunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/scopeguard.h>
+# include <zencore/system.h>
+# include <zencore/trace.h>
+# include <zenhttp/httpcommon.h>
+# include <zenstore/cidstore.h>
+
+# include <span>
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+//////////////////////////////////////////////////////////////////////////
+
+RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver,
+ const std::filesystem::path& BaseDir,
+ std::string_view HostName,
+ WorkerThreadPool& InWorkerPool)
+: FunctionRunner(BaseDir)
+, m_Log(logging::Get("http_exec"))
+, m_ChunkResolver{InChunkResolver}
+, m_WorkerPool{InWorkerPool}
+, m_HostName{HostName}
+, m_BaseUrl{fmt::format("{}/compute", HostName)}
+, m_Http(m_BaseUrl)
+, m_InstanceId(Oid::NewOid())
+{
+ m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this};
+}
+
+RemoteHttpRunner::~RemoteHttpRunner()
+{
+ Shutdown();
+}
+
+void
+RemoteHttpRunner::Shutdown()
+{
+ // TODO: should cleanly drain/cancel pending work
+
+ m_MonitorThreadEnabled = false;
+ m_MonitorThreadEvent.Set();
+ if (m_MonitorThread.joinable())
+ {
+ m_MonitorThread.join();
+ }
+}
+
+void
+RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage)
+{
+ ZEN_TRACE_CPU("RemoteHttpRunner::RegisterWorker");
+ const IoHash WorkerId = WorkerPackage.GetObjectHash();
+ CbPackage WorkerDesc = WorkerPackage;
+
+ std::string WorkerUrl = fmt::format("/workers/{}", WorkerId);
+
+ HttpClient::Response WorkerResponse = m_Http.Get(WorkerUrl);
+
+ if (WorkerResponse.StatusCode == HttpResponseCode::NotFound)
+ {
+ HttpClient::Response DescResponse = m_Http.Post(WorkerUrl, WorkerDesc.GetObject());
+
+ if (DescResponse.StatusCode == HttpResponseCode::NotFound)
+ {
+ CbPackage Pkg = WorkerDesc;
+
+ // Build response package by sending only the attachments
+ // the other end needs. We start with the full package and
+ // remove the attachments which are not needed.
+
+ {
+ std::unordered_set<IoHash> Needed;
+
+ CbObject Response = DescResponse.AsObject();
+
+ for (auto& Item : Response["need"sv])
+ {
+ const IoHash NeedHash = Item.AsHash();
+
+ Needed.insert(NeedHash);
+ }
+
+ std::unordered_set<IoHash> ToRemove;
+
+ for (const CbAttachment& Attachment : Pkg.GetAttachments())
+ {
+ const IoHash& Hash = Attachment.GetHash();
+
+ if (Needed.find(Hash) == Needed.end())
+ {
+ ToRemove.insert(Hash);
+ }
+ }
+
+ for (const IoHash& Hash : ToRemove)
+ {
+ int RemovedCount = Pkg.RemoveAttachment(Hash);
+
+ ZEN_ASSERT(RemovedCount == 1);
+ }
+ }
+
+ // Post resulting package
+
+ HttpClient::Response PayloadResponse = m_Http.Post(WorkerUrl, Pkg);
+
+ if (!IsHttpSuccessCode(PayloadResponse.StatusCode))
+ {
+ ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl);
+
+ // TODO: propagate error
+ }
+ }
+ else if (!IsHttpSuccessCode(DescResponse.StatusCode))
+ {
+ ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl);
+
+ // TODO: propagate error
+ }
+ else
+ {
+ ZEN_ASSERT(DescResponse.StatusCode == HttpResponseCode::NoContent);
+ }
+ }
+ else if (WorkerResponse.StatusCode == HttpResponseCode::OK)
+ {
+ // Already known from a previous run
+ }
+ else if (!IsHttpSuccessCode(WorkerResponse.StatusCode))
+ {
+ ZEN_ERROR("ERROR: unable to look up worker {} at {}{} (error: {} {})",
+ WorkerId,
+ m_Http.GetBaseUri(),
+ WorkerUrl,
+ (int)WorkerResponse.StatusCode,
+ ToString(WorkerResponse.StatusCode));
+
+ // TODO: propagate error
+ }
+}
+
+size_t
+RemoteHttpRunner::QueryCapacity()
+{
+ // Estimate how much more work we're ready to accept
+
+ RwLock::SharedLockScope _{m_RunningLock};
+
+ size_t RunningCount = m_RemoteRunningMap.size();
+
+ if (RunningCount >= size_t(m_MaxRunningActions))
+ {
+ return 0;
+ }
+
+ return m_MaxRunningActions - RunningCount;
+}
+
+std::vector<SubmitResult>
+RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ ZEN_TRACE_CPU("RemoteHttpRunner::SubmitActions");
+
+ if (Actions.size() <= 1)
+ {
+ std::vector<SubmitResult> Results;
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Results.push_back(SubmitAction(Action));
+ }
+
+ return Results;
+ }
+
+ // For larger batches, submit HTTP requests in parallel via the shared worker pool
+
+ std::vector<std::future<SubmitResult>> Futures;
+ Futures.reserve(Actions.size());
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ std::packaged_task<SubmitResult()> Task([this, Action]() { return SubmitAction(Action); });
+
+ Futures.push_back(m_WorkerPool.EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog));
+ }
+
+ std::vector<SubmitResult> Results;
+ Results.reserve(Futures.size());
+
+ for (auto& Future : Futures)
+ {
+ Results.push_back(Future.get());
+ }
+
+ return Results;
+}
+
+SubmitResult
+RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
+{
+ ZEN_TRACE_CPU("RemoteHttpRunner::SubmitAction");
+
+ // Verify whether we can accept more work
+
+ {
+ RwLock::SharedLockScope _{m_RunningLock};
+ if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions))
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+ }
+
+ using namespace std::literals;
+
+ // Each enqueued action is assigned an integer index (logical sequence number),
+ // which we use as a key for tracking data structures and as an opaque id which
+ // may be used by clients to reference the scheduled action
+
+ Action->ExecutionLocation = m_HostName;
+
+ const int32_t ActionLsn = Action->ActionLsn;
+ const CbObject& ActionObj = Action->ActionObj;
+ const IoHash ActionId = ActionObj.GetHash();
+
+ MaybeDumpAction(ActionLsn, ActionObj);
+
+ // Determine the submission URL. If the action belongs to a queue, ensure a
+ // corresponding remote queue exists on the target node and submit via it.
+
+ std::string SubmitUrl = "/jobs";
+ if (const int QueueId = Action->QueueId; QueueId != 0)
+ {
+ CbObject QueueMeta = Action->GetOwnerSession()->GetQueueMetadata(QueueId);
+ CbObject QueueConfig = Action->GetOwnerSession()->GetQueueConfig(QueueId);
+ if (Oid Token = EnsureRemoteQueue(QueueId, QueueMeta, QueueConfig); Token != Oid::Zero)
+ {
+ SubmitUrl = fmt::format("/queues/{}/jobs", Token);
+ }
+ }
+
+ // Enqueue job. If the remote returns FailedDependency (424), it means it
+ // cannot resolve the worker/function — re-register the worker and retry once.
+
+ CbObject Result;
+ HttpClient::Response WorkResponse;
+ HttpResponseCode WorkResponseCode{};
+
+ for (int Attempt = 0; Attempt < 2; ++Attempt)
+ {
+ WorkResponse = m_Http.Post(SubmitUrl, ActionObj);
+ WorkResponseCode = WorkResponse.StatusCode;
+
+ if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0)
+ {
+ ZEN_WARN("remote {} returned FailedDependency for action {} — re-registering worker and retrying",
+ m_Http.GetBaseUri(),
+ ActionId);
+
+ RegisterWorker(Action->Worker.Descriptor);
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ if (WorkResponseCode == HttpResponseCode::OK)
+ {
+ Result = WorkResponse.AsObject();
+ }
+ else if (WorkResponseCode == HttpResponseCode::NotFound)
+ {
+ // Not all attachments are present
+
+ // Build response package including all required attachments
+
+ CbPackage Pkg;
+ Pkg.SetObject(ActionObj);
+
+ CbObject Response = WorkResponse.AsObject();
+
+ for (auto& Item : Response["need"sv])
+ {
+ const IoHash NeedHash = Item.AsHash();
+
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ {
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
+
+ ZEN_ASSERT(DataRawHash == NeedHash);
+
+ Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ }
+ else
+ {
+ // No such attachment
+
+ return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)};
+ }
+ }
+
+ // Post resulting package
+
+ HttpClient::Response PayloadResponse = m_Http.Post(SubmitUrl, Pkg);
+
+ if (!PayloadResponse)
+ {
+ ZEN_WARN("unable to register payloads for action {} at {}{}", ActionId, m_Http.GetBaseUri(), SubmitUrl);
+
+ // TODO: include more information about the failure in the response
+
+ return {.IsAccepted = false, .Reason = "HTTP request failed"};
+ }
+ else if (PayloadResponse.StatusCode == HttpResponseCode::OK)
+ {
+ Result = PayloadResponse.AsObject();
+ }
+ else
+ {
+ // Unexpected response
+
+ const int ResponseStatusCode = (int)PayloadResponse.StatusCode;
+
+ ZEN_WARN("unable to register payloads for action {} at {}{} (error: {} {})",
+ ActionId,
+ m_Http.GetBaseUri(),
+ SubmitUrl,
+ ResponseStatusCode,
+ ToString(ResponseStatusCode));
+
+ return {.IsAccepted = false,
+ .Reason = fmt::format("unexpected response code {} {} from {}{}",
+ ResponseStatusCode,
+ ToString(ResponseStatusCode),
+ m_Http.GetBaseUri(),
+ SubmitUrl)};
+ }
+ }
+
+ if (Result)
+ {
+ if (const int32_t LsnField = Result["lsn"].AsInt32(0))
+ {
+ HttpRunningAction NewAction;
+ NewAction.Action = Action;
+ NewAction.RemoteActionLsn = LsnField;
+
+ {
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+
+ m_RemoteRunningMap[LsnField] = std::move(NewAction);
+ }
+
+ ZEN_DEBUG("scheduled action {} with remote LSN {} (local LSN {})", ActionId, LsnField, ActionLsn);
+
+ Action->SetActionState(RunnerAction::State::Running);
+
+ return SubmitResult{.IsAccepted = true};
+ }
+ }
+
+ return {};
+}
+
+Oid
+RemoteHttpRunner::EnsureRemoteQueue(int QueueId, const CbObject& Metadata, const CbObject& Config)
+{
+ {
+ RwLock::SharedLockScope _(m_QueueTokenLock);
+ if (auto It = m_RemoteQueueTokens.find(QueueId); It != m_RemoteQueueTokens.end())
+ {
+ return It->second;
+ }
+ }
+
+ // Build a stable idempotency key that uniquely identifies this (runner instance, local queue)
+ // pair. The server uses this to return the same remote queue token for concurrent or redundant
+ // requests, preventing orphaned remote queues when multiple threads race through here.
+ // Also send hostname so the server can associate the queue with its origin for diagnostics.
+ CbObjectWriter Body;
+ Body << "idempotency_key"sv << fmt::format("{}/{}", m_InstanceId, QueueId);
+ Body << "hostname"sv << GetMachineName();
+ if (Metadata)
+ {
+ Body << "metadata"sv << Metadata;
+ }
+ if (Config)
+ {
+ Body << "config"sv << Config;
+ }
+
+ HttpClient::Response Resp = m_Http.Post("/queues/remote", Body.Save());
+ if (!Resp)
+ {
+ ZEN_WARN("failed to create remote queue for local queue {} on {}", QueueId, m_HostName);
+ return Oid::Zero;
+ }
+
+ Oid Token = Oid::TryFromHexString(Resp.AsObject()["queue_token"sv].AsString());
+ if (Token == Oid::Zero)
+ {
+ return Oid::Zero;
+ }
+
+ ZEN_DEBUG("created remote queue '{}' for local queue {} on {}", Token, QueueId, m_HostName);
+
+ RwLock::ExclusiveLockScope _(m_QueueTokenLock);
+ auto [It, Inserted] = m_RemoteQueueTokens.try_emplace(QueueId, Token);
+ return It->second;
+}
+
+void
+RemoteHttpRunner::CancelRemoteQueue(int QueueId)
+{
+ Oid Token;
+ {
+ RwLock::SharedLockScope _(m_QueueTokenLock);
+ if (auto It = m_RemoteQueueTokens.find(QueueId); It != m_RemoteQueueTokens.end())
+ {
+ Token = It->second;
+ }
+ }
+
+ if (Token == Oid::Zero)
+ {
+ return;
+ }
+
+ HttpClient::Response Resp = m_Http.Delete(fmt::format("/queues/{}", Token));
+
+ if (Resp.StatusCode == HttpResponseCode::NoContent)
+ {
+ ZEN_DEBUG("cancelled remote queue '{}' (local queue {}) on {}", Token, QueueId, m_HostName);
+ }
+ else
+ {
+ ZEN_WARN("failed to cancel remote queue '{}' on {}: {}", Token, m_HostName, int(Resp.StatusCode));
+ }
+}
+
+bool
+RemoteHttpRunner::IsHealthy()
+{
+ if (HttpClient::Response Ready = m_Http.Get("/ready"))
+ {
+ return true;
+ }
+ else
+ {
+ // TODO: use response to propagate context
+ return false;
+ }
+}
+
+size_t
+RemoteHttpRunner::GetSubmittedActionCount()
+{
+ RwLock::SharedLockScope _(m_RunningLock);
+ return m_RemoteRunningMap.size();
+}
+
+void
+RemoteHttpRunner::MonitorThreadFunction()
+{
+ SetCurrentThreadName("RemoteHttpRunner_Monitor");
+
+ do
+ {
+ const int NormalWaitingTime = 200;
+ int WaitTimeMs = NormalWaitingTime;
+ auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); };
+ auto SweepOnce = [&] {
+ const size_t RetiredCount = SweepRunningActions();
+
+ m_RunningLock.WithSharedLock([&] {
+ if (m_RemoteRunningMap.size() > 16)
+ {
+ WaitTimeMs = NormalWaitingTime / 4;
+ }
+ else
+ {
+ if (RetiredCount)
+ {
+ WaitTimeMs = NormalWaitingTime / 2;
+ }
+ else
+ {
+ WaitTimeMs = NormalWaitingTime;
+ }
+ }
+ });
+ };
+
+ while (!WaitOnce())
+ {
+ SweepOnce();
+ }
+
+ // Signal received - this may mean we should quit
+
+ SweepOnce();
+ } while (m_MonitorThreadEnabled);
+}
+
+size_t
+RemoteHttpRunner::SweepRunningActions()
+{
+ ZEN_TRACE_CPU("RemoteHttpRunner::SweepRunningActions");
+ std::vector<HttpRunningAction> CompletedActions;
+
+ // Poll remote for list of completed actions
+
+ HttpClient::Response ResponseCompleted = m_Http.Get("/jobs/completed"sv);
+
+ if (CbObject Completed = ResponseCompleted.AsObject())
+ {
+ for (auto& FieldIt : Completed["completed"sv])
+ {
+ CbObjectView EntryObj = FieldIt.AsObjectView();
+ const int32_t CompleteLsn = EntryObj["lsn"sv].AsInt32();
+ std::string_view StateName = EntryObj["state"sv].AsString();
+
+ RunnerAction::State RemoteState = RunnerAction::FromString(StateName);
+
+ // Always fetch to drain the result from the remote's results map,
+ // but only keep the result package for successfully completed actions.
+ HttpClient::Response ResponseJob = m_Http.Get(fmt::format("/jobs/{}"sv, CompleteLsn));
+
+ m_RunningLock.WithExclusiveLock([&] {
+ if (auto CompleteIt = m_RemoteRunningMap.find(CompleteLsn); CompleteIt != m_RemoteRunningMap.end())
+ {
+ HttpRunningAction CompletedAction = std::move(CompleteIt->second);
+ CompletedAction.RemoteState = RemoteState;
+
+ if (RemoteState == RunnerAction::State::Completed && ResponseJob)
+ {
+ CompletedAction.ActionResults = ResponseJob.AsPackage();
+ }
+
+ CompletedActions.push_back(std::move(CompletedAction));
+ m_RemoteRunningMap.erase(CompleteIt);
+ }
+ else
+ {
+ // we received a completion notice for an action we don't know about,
+ // this can happen if the runner is used by multiple upstream schedulers,
+ // or if this compute node was recently restarted and lost track of
+ // previously scheduled actions
+ }
+ });
+ }
+
+ if (CbObjectView Metrics = Completed["metrics"sv].AsObjectView())
+ {
+ // if (const size_t CpuCount = Metrics["core_count"].AsInt32(0))
+ if (const int32_t CpuCount = Metrics["lp_count"].AsInt32(0))
+ {
+ const int32_t NewCap = zen::Max(4, CpuCount);
+
+ if (m_MaxRunningActions > NewCap)
+ {
+ ZEN_DEBUG("capping {} to {} actions (was {})", m_BaseUrl, NewCap, m_MaxRunningActions);
+
+ m_MaxRunningActions = NewCap;
+ }
+ }
+ }
+ }
+
+ // Notify outer. Note that this has to be done without holding any local locks
+ // otherwise we may end up with deadlocks.
+
+ for (HttpRunningAction& HttpAction : CompletedActions)
+ {
+ const int ActionLsn = HttpAction.Action->ActionLsn;
+
+ ZEN_DEBUG("action {} LSN {} (remote LSN {}) -> {}",
+ HttpAction.Action->ActionId,
+ ActionLsn,
+ HttpAction.RemoteActionLsn,
+ RunnerAction::ToString(HttpAction.RemoteState));
+
+ if (HttpAction.RemoteState == RunnerAction::State::Completed)
+ {
+ HttpAction.Action->SetResult(std::move(HttpAction.ActionResults));
+ }
+
+ HttpAction.Action->SetActionState(HttpAction.RemoteState);
+ }
+
+ return CompletedActions.size();
+}
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/remotehttprunner.h b/src/zencompute/runners/remotehttprunner.h
new file mode 100644
index 000000000..9119992a9
--- /dev/null
+++ b/src/zencompute/runners/remotehttprunner.h
@@ -0,0 +1,100 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zencompute/computeservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "functionrunner.h"
+
+# include <zencore/compactbinarypackage.h>
+# include <zencore/logging.h>
+# include <zencore/uid.h>
+# include <zencore/workthreadpool.h>
+# include <zencore/zencore.h>
+# include <zenhttp/httpclient.h>
+
+# include <atomic>
+# include <filesystem>
+# include <thread>
+# include <unordered_map>
+
+namespace zen {
+class CidStore;
+}
+
+namespace zen::compute {
+
+/** HTTP-based runner
+
+ This implements a DDC remote compute execution strategy via REST API
+
+ */
+
+class RemoteHttpRunner : public FunctionRunner
+{
+ RemoteHttpRunner(RemoteHttpRunner&&) = delete;
+ RemoteHttpRunner& operator=(RemoteHttpRunner&&) = delete;
+
+public:
+ RemoteHttpRunner(ChunkResolver& InChunkResolver,
+ const std::filesystem::path& BaseDir,
+ std::string_view HostName,
+ WorkerThreadPool& InWorkerPool);
+ ~RemoteHttpRunner();
+
+ virtual void Shutdown() override;
+ virtual void RegisterWorker(const CbPackage& WorkerPackage) override;
+ [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
+ [[nodiscard]] virtual bool IsHealthy() override;
+ [[nodiscard]] virtual size_t GetSubmittedActionCount() override;
+ [[nodiscard]] virtual size_t QueryCapacity() override;
+ [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;
+ virtual void CancelRemoteQueue(int QueueId) override;
+
+ std::string_view GetHostName() const { return m_HostName; }
+
+protected:
+ LoggerRef Log() { return m_Log; }
+
+private:
+ LoggerRef m_Log;
+ ChunkResolver& m_ChunkResolver;
+ WorkerThreadPool& m_WorkerPool;
+ std::string m_HostName;
+ std::string m_BaseUrl;
+ HttpClient m_Http;
+
+ int32_t m_MaxRunningActions = 256; // arbitrary limit for testing
+
+ struct HttpRunningAction
+ {
+ Ref<RunnerAction> Action;
+ int RemoteActionLsn = 0; // Remote LSN
+ RunnerAction::State RemoteState = RunnerAction::State::Failed;
+ CbPackage ActionResults;
+ };
+
+ RwLock m_RunningLock;
+ std::unordered_map<int, HttpRunningAction> m_RemoteRunningMap; // Note that this is keyed on the *REMOTE* lsn
+
+ std::thread m_MonitorThread;
+ std::atomic<bool> m_MonitorThreadEnabled{true};
+ Event m_MonitorThreadEvent;
+ void MonitorThreadFunction();
+ size_t SweepRunningActions();
+
+ RwLock m_QueueTokenLock;
+ std::unordered_map<int, Oid> m_RemoteQueueTokens; // local QueueId → remote queue token
+
+ // Stable identity for this runner instance, used as part of the idempotency key when
+ // creating remote queues. Generated once at construction and never changes.
+ Oid m_InstanceId;
+
+ Oid EnsureRemoteQueue(int QueueId, const CbObject& Metadata, const CbObject& Config);
+};
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/windowsrunner.cpp b/src/zencompute/runners/windowsrunner.cpp
new file mode 100644
index 000000000..e9a1ae8b6
--- /dev/null
+++ b/src/zencompute/runners/windowsrunner.cpp
@@ -0,0 +1,460 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "windowsrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES && ZEN_PLATFORM_WINDOWS
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/except.h>
+# include <zencore/except_fmt.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/scopeguard.h>
+# include <zencore/trace.h>
+# include <zencore/system.h>
+# include <zencore/timer.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <userenv.h>
+# include <aclapi.h>
+# include <sddl.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+WindowsProcessRunner::WindowsProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool,
+ bool Sandboxed,
+ int32_t MaxConcurrentActions)
+: LocalProcessRunner(Resolver, BaseDir, Deleter, WorkerPool, MaxConcurrentActions)
+, m_Sandboxed(Sandboxed)
+{
+ if (!m_Sandboxed)
+ {
+ return;
+ }
+
+ // Build a unique profile name per process to avoid collisions
+ m_AppContainerName = L"zenserver-sandbox-" + std::to_wstring(GetCurrentProcessId());
+
+ // Clean up any stale profile from a previous crash
+ DeleteAppContainerProfile(m_AppContainerName.c_str());
+
+ PSID Sid = nullptr;
+
+ HRESULT Hr = CreateAppContainerProfile(m_AppContainerName.c_str(),
+ m_AppContainerName.c_str(), // display name
+ m_AppContainerName.c_str(), // description
+ nullptr, // no capabilities
+ 0, // capability count
+ &Sid);
+
+ if (FAILED(Hr))
+ {
+ throw zen::runtime_error("CreateAppContainerProfile failed: HRESULT 0x{:08X}", static_cast<uint32_t>(Hr));
+ }
+
+ m_AppContainerSid = Sid;
+
+ ZEN_INFO("AppContainer sandboxing enabled for child processes (profile={})", WideToUtf8(m_AppContainerName));
+}
+
+WindowsProcessRunner::~WindowsProcessRunner()
+{
+ if (m_AppContainerSid)
+ {
+ FreeSid(m_AppContainerSid);
+ m_AppContainerSid = nullptr;
+ }
+
+ if (!m_AppContainerName.empty())
+ {
+ DeleteAppContainerProfile(m_AppContainerName.c_str());
+ }
+}
+
+void
+WindowsProcessRunner::GrantAppContainerAccess(const std::filesystem::path& Path, DWORD AccessMask)
+{
+ PACL ExistingDacl = nullptr;
+ PSECURITY_DESCRIPTOR SecurityDescriptor = nullptr;
+
+ DWORD Result = GetNamedSecurityInfoW(Path.c_str(),
+ SE_FILE_OBJECT,
+ DACL_SECURITY_INFORMATION,
+ nullptr,
+ nullptr,
+ &ExistingDacl,
+ nullptr,
+ &SecurityDescriptor);
+
+ if (Result != ERROR_SUCCESS)
+ {
+ throw zen::runtime_error("GetNamedSecurityInfoW failed for '{}': {}", Path.string(), GetSystemErrorAsString(Result));
+ }
+
+ auto $0 = MakeGuard([&] { LocalFree(SecurityDescriptor); });
+
+ EXPLICIT_ACCESSW Access{};
+ Access.grfAccessPermissions = AccessMask;
+ Access.grfAccessMode = SET_ACCESS;
+ Access.grfInheritance = OBJECT_INHERIT_ACE | CONTAINER_INHERIT_ACE;
+ Access.Trustee.TrusteeForm = TRUSTEE_IS_SID;
+ Access.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
+ Access.Trustee.ptstrName = static_cast<LPWSTR>(m_AppContainerSid);
+
+ PACL NewDacl = nullptr;
+
+ Result = SetEntriesInAclW(1, &Access, ExistingDacl, &NewDacl);
+ if (Result != ERROR_SUCCESS)
+ {
+ throw zen::runtime_error("SetEntriesInAclW failed for '{}': {}", Path.string(), GetSystemErrorAsString(Result));
+ }
+
+ auto $1 = MakeGuard([&] { LocalFree(NewDacl); });
+
+ Result = SetNamedSecurityInfoW(const_cast<LPWSTR>(Path.c_str()),
+ SE_FILE_OBJECT,
+ DACL_SECURITY_INFORMATION,
+ nullptr,
+ nullptr,
+ NewDacl,
+ nullptr);
+
+ if (Result != ERROR_SUCCESS)
+ {
+ throw zen::runtime_error("SetNamedSecurityInfoW failed for '{}': {}", Path.string(), GetSystemErrorAsString(Result));
+ }
+}
+
+SubmitResult
+WindowsProcessRunner::SubmitAction(Ref<RunnerAction> Action)
+{
+ ZEN_TRACE_CPU("WindowsProcessRunner::SubmitAction");
+ std::optional<PreparedAction> Prepared = PrepareActionSubmission(Action);
+
+ if (!Prepared)
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+
+ // Set up environment variables
+
+ CbObject WorkerDescription = Prepared->WorkerPackage.GetObject();
+
+ StringBuilder<1024> EnvironmentBlock;
+
+ for (auto& It : WorkerDescription["environment"sv])
+ {
+ EnvironmentBlock.Append(It.AsString());
+ EnvironmentBlock.Append('\0');
+ }
+ EnvironmentBlock.Append('\0');
+ EnvironmentBlock.Append('\0');
+
+ // Execute process - this spawns the child process immediately without waiting
+ // for completion
+
+ std::string_view ExecPath = WorkerDescription["path"sv].AsString();
+ std::filesystem::path ExePath = Prepared->WorkerPath / std::filesystem::path(ExecPath).make_preferred();
+
+ ExtendableWideStringBuilder<512> CommandLine;
+ CommandLine.Append(L'"');
+ CommandLine.Append(ExePath.c_str());
+ CommandLine.Append(L'"');
+ CommandLine.Append(L" -Build=build.action");
+
+ LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr;
+ LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr;
+ BOOL bInheritHandles = FALSE;
+ DWORD dwCreationFlags = 0;
+
+ ZEN_DEBUG("Executing: {} (sandboxed={})", WideToUtf8(CommandLine.c_str()), m_Sandboxed);
+
+ CommandLine.EnsureNulTerminated();
+
+ PROCESS_INFORMATION ProcessInformation{};
+
+ if (m_Sandboxed)
+ {
+ // Grant AppContainer access to sandbox and worker directories
+ GrantAppContainerAccess(Prepared->SandboxPath, FILE_ALL_ACCESS);
+ GrantAppContainerAccess(Prepared->WorkerPath, FILE_GENERIC_READ | FILE_GENERIC_EXECUTE);
+
+ // Set up extended startup info with AppContainer security capabilities
+ SECURITY_CAPABILITIES SecurityCapabilities{};
+ SecurityCapabilities.AppContainerSid = m_AppContainerSid;
+ SecurityCapabilities.Capabilities = nullptr;
+ SecurityCapabilities.CapabilityCount = 0;
+
+ SIZE_T AttrListSize = 0;
+ InitializeProcThreadAttributeList(nullptr, 1, 0, &AttrListSize);
+
+ auto AttrList = static_cast<PPROC_THREAD_ATTRIBUTE_LIST>(malloc(AttrListSize));
+ auto $0 = MakeGuard([&] { free(AttrList); });
+
+ if (!InitializeProcThreadAttributeList(AttrList, 1, 0, &AttrListSize))
+ {
+ zen::ThrowLastError("InitializeProcThreadAttributeList failed");
+ }
+
+ auto $1 = MakeGuard([&] { DeleteProcThreadAttributeList(AttrList); });
+
+ if (!UpdateProcThreadAttribute(AttrList,
+ 0,
+ PROC_THREAD_ATTRIBUTE_SECURITY_CAPABILITIES,
+ &SecurityCapabilities,
+ sizeof(SecurityCapabilities),
+ nullptr,
+ nullptr))
+ {
+ zen::ThrowLastError("UpdateProcThreadAttribute (SECURITY_CAPABILITIES) failed");
+ }
+
+ STARTUPINFOEXW StartupInfoEx{};
+ StartupInfoEx.StartupInfo.cb = sizeof(STARTUPINFOEXW);
+ StartupInfoEx.lpAttributeList = AttrList;
+
+ dwCreationFlags |= EXTENDED_STARTUPINFO_PRESENT;
+
+ BOOL Success = CreateProcessW(nullptr,
+ CommandLine.Data(),
+ lpProcessAttributes,
+ lpThreadAttributes,
+ bInheritHandles,
+ dwCreationFlags,
+ (LPVOID)EnvironmentBlock.Data(),
+ Prepared->SandboxPath.c_str(),
+ &StartupInfoEx.StartupInfo,
+ /* out */ &ProcessInformation);
+
+ if (!Success)
+ {
+ zen::ThrowLastError("Unable to launch sandboxed process");
+ }
+ }
+ else
+ {
+ STARTUPINFO StartupInfo{};
+ StartupInfo.cb = sizeof StartupInfo;
+
+ BOOL Success = CreateProcessW(nullptr,
+ CommandLine.Data(),
+ lpProcessAttributes,
+ lpThreadAttributes,
+ bInheritHandles,
+ dwCreationFlags,
+ (LPVOID)EnvironmentBlock.Data(),
+ Prepared->SandboxPath.c_str(),
+ &StartupInfo,
+ /* out */ &ProcessInformation);
+
+ if (!Success)
+ {
+ zen::ThrowLastError("Unable to launch process");
+ }
+ }
+
+ CloseHandle(ProcessInformation.hThread);
+
+ Ref<RunningAction> NewAction{new RunningAction()};
+ NewAction->Action = Action;
+ NewAction->ProcessHandle = ProcessInformation.hProcess;
+ NewAction->SandboxPath = std::move(Prepared->SandboxPath);
+
+ {
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+
+ m_RunningMap[Prepared->ActionLsn] = std::move(NewAction);
+ }
+
+ Action->SetActionState(RunnerAction::State::Running);
+
+ return SubmitResult{.IsAccepted = true};
+}
+
+void
+WindowsProcessRunner::SweepRunningActions()
+{
+ ZEN_TRACE_CPU("WindowsProcessRunner::SweepRunningActions");
+ std::vector<Ref<RunningAction>> CompletedActions;
+
+ m_RunningLock.WithExclusiveLock([&] {
+ for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd;)
+ {
+ Ref<RunningAction> Running = It->second;
+
+ DWORD ExitCode = 0;
+ BOOL IsSuccess = GetExitCodeProcess(Running->ProcessHandle, &ExitCode);
+
+ if (IsSuccess && ExitCode != STILL_ACTIVE)
+ {
+ CloseHandle(Running->ProcessHandle);
+ Running->ProcessHandle = INVALID_HANDLE_VALUE;
+ Running->ExitCode = ExitCode;
+
+ CompletedActions.push_back(std::move(Running));
+ It = m_RunningMap.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+ });
+
+ ProcessCompletedActions(CompletedActions);
+}
+
+void
+WindowsProcessRunner::CancelRunningActions()
+{
+ ZEN_TRACE_CPU("WindowsProcessRunner::CancelRunningActions");
+ Stopwatch Timer;
+ std::unordered_map<int, Ref<RunningAction>> RunningMap;
+
+ m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); });
+
+ if (RunningMap.empty())
+ {
+ return;
+ }
+
+ ZEN_INFO("cancelling all running actions");
+
+ // For expedience we initiate the process termination for all known
+ // processes before attempting to wait for them to exit.
+
+ // Initiate termination for all known processes before waiting for them to exit.
+
+ for (const auto& Kv : RunningMap)
+ {
+ Ref<RunningAction> Running = Kv.second;
+
+ BOOL TermSuccess = TerminateProcess(Running->ProcessHandle, 222);
+
+ if (!TermSuccess)
+ {
+ DWORD LastError = GetLastError();
+
+ if (LastError != ERROR_ACCESS_DENIED)
+ {
+ ZEN_WARN("TerminateProcess for LSN {} not successful: {}", Running->Action->ActionLsn, GetSystemErrorAsString(LastError));
+ }
+ }
+ }
+
+ // Wait for all processes and clean up, regardless of whether TerminateProcess succeeded.
+
+ for (auto& [Lsn, Running] : RunningMap)
+ {
+ if (Running->ProcessHandle != INVALID_HANDLE_VALUE)
+ {
+ DWORD WaitResult = WaitForSingleObject(Running->ProcessHandle, 2000);
+
+ if (WaitResult != WAIT_OBJECT_0)
+ {
+ ZEN_WARN("wait for LSN {}: process exit did not succeed, result = {}", Running->Action->ActionLsn, WaitResult);
+ }
+ else
+ {
+ ZEN_DEBUG("LSN {}: process exit OK", Running->Action->ActionLsn);
+ }
+
+ CloseHandle(Running->ProcessHandle);
+ Running->ProcessHandle = INVALID_HANDLE_VALUE;
+ }
+
+ m_DeferredDeleter.Enqueue(Running->Action->ActionLsn, std::move(Running->SandboxPath));
+ Running->Action->SetActionState(RunnerAction::State::Failed);
+ }
+
+ ZEN_INFO("DONE - cancelled {} running processes (took {})", RunningMap.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+}
+
+bool
+WindowsProcessRunner::CancelAction(int ActionLsn)
+{
+ ZEN_TRACE_CPU("WindowsProcessRunner::CancelAction");
+
+ // Hold the shared lock while terminating to prevent the sweep thread from
+ // closing the handle between our lookup and TerminateProcess call.
+ bool Sent = false;
+
+ m_RunningLock.WithSharedLock([&] {
+ auto It = m_RunningMap.find(ActionLsn);
+ if (It == m_RunningMap.end())
+ {
+ return;
+ }
+
+ Ref<RunningAction> Target = It->second;
+ if (Target->ProcessHandle == INVALID_HANDLE_VALUE)
+ {
+ return;
+ }
+
+ BOOL TermSuccess = TerminateProcess(Target->ProcessHandle, 222);
+
+ if (!TermSuccess)
+ {
+ DWORD LastError = GetLastError();
+
+ if (LastError != ERROR_ACCESS_DENIED)
+ {
+ ZEN_WARN("CancelAction: TerminateProcess for LSN {} not successful: {}", ActionLsn, GetSystemErrorAsString(LastError));
+ }
+
+ return;
+ }
+
+ ZEN_DEBUG("CancelAction: initiated cancellation of LSN {}", ActionLsn);
+ Sent = true;
+ });
+
+ // The monitor thread will pick up the process exit and mark the action as Failed.
+ return Sent;
+}
+
+void
+WindowsProcessRunner::SampleProcessCpu(RunningAction& Running)
+{
+ FILETIME CreationTime, ExitTime, KernelTime, UserTime;
+ if (!GetProcessTimes(Running.ProcessHandle, &CreationTime, &ExitTime, &KernelTime, &UserTime))
+ {
+ return;
+ }
+
+ auto FtToU64 = [](FILETIME Ft) -> uint64_t { return (static_cast<uint64_t>(Ft.dwHighDateTime) << 32) | Ft.dwLowDateTime; };
+
+ // FILETIME values are in 100-nanosecond intervals
+ const uint64_t CurrentOsTicks = FtToU64(KernelTime) + FtToU64(UserTime);
+ const uint64_t NowTicks = GetHifreqTimerValue();
+
+ // Cumulative CPU seconds (absolute, available from first sample): 100ns → seconds
+ Running.Action->CpuSeconds.store(static_cast<float>(static_cast<double>(CurrentOsTicks) / 10'000'000.0), std::memory_order_relaxed);
+
+ if (Running.LastCpuSampleTicks != 0 && Running.LastCpuOsTicks != 0)
+ {
+ const uint64_t ElapsedMs = Stopwatch::GetElapsedTimeMs(NowTicks - Running.LastCpuSampleTicks);
+ if (ElapsedMs > 0)
+ {
+ const uint64_t DeltaOsTicks = CurrentOsTicks - Running.LastCpuOsTicks;
+ // 100ns → ms: divide by 10000; then as percent of elapsed ms
+ const float CpuPct = static_cast<float>(static_cast<double>(DeltaOsTicks) / 10000.0 / ElapsedMs * 100.0);
+ Running.Action->CpuUsagePercent.store(CpuPct, std::memory_order_relaxed);
+ }
+ }
+
+ Running.LastCpuSampleTicks = NowTicks;
+ Running.LastCpuOsTicks = CurrentOsTicks;
+}
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/windowsrunner.h b/src/zencompute/runners/windowsrunner.h
new file mode 100644
index 000000000..9f2385cc4
--- /dev/null
+++ b/src/zencompute/runners/windowsrunner.h
@@ -0,0 +1,53 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "localrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES && ZEN_PLATFORM_WINDOWS
+
+# include <zencore/windows.h>
+
+# include <string>
+
+namespace zen::compute {
+
+/** Windows process runner using CreateProcessW for executing worker executables.
+
+ Subclasses LocalProcessRunner, reusing sandbox management, worker manifesting,
+ input/output handling, and monitor thread infrastructure. Overrides only the
+ platform-specific methods: process spawning, sweep, and cancellation.
+
+ When Sandboxed is true, child processes are isolated using a Windows AppContainer:
+ no network access (AppContainer blocks network by default when no capabilities are
+ granted) and no filesystem access outside explicitly granted sandbox and worker
+ directories. This requires no elevation.
+ */
+class WindowsProcessRunner : public LocalProcessRunner
+{
+public:
+ WindowsProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool,
+ bool Sandboxed = false,
+ int32_t MaxConcurrentActions = 0);
+ ~WindowsProcessRunner();
+
+ [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
+ void SweepRunningActions() override;
+ void CancelRunningActions() override;
+ bool CancelAction(int ActionLsn) override;
+ void SampleProcessCpu(RunningAction& Running) override;
+
+private:
+ void GrantAppContainerAccess(const std::filesystem::path& Path, DWORD AccessMask);
+
+ bool m_Sandboxed = false;
+ PSID m_AppContainerSid = nullptr;
+ std::wstring m_AppContainerName;
+};
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/winerunner.cpp b/src/zencompute/runners/winerunner.cpp
new file mode 100644
index 000000000..506bec73b
--- /dev/null
+++ b/src/zencompute/runners/winerunner.cpp
@@ -0,0 +1,237 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "winerunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES && ZEN_PLATFORM_LINUX
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/timer.h>
+# include <zencore/trace.h>
+
+# include <signal.h>
+# include <sys/wait.h>
+# include <unistd.h>
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+WineProcessRunner::WineProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool)
+: LocalProcessRunner(Resolver, BaseDir, Deleter, WorkerPool)
+{
+ // Restore SIGCHLD to default behavior so waitpid() can properly collect
+ // child exit status. zenserver/main.cpp sets SIGCHLD to SIG_IGN which
+ // causes the kernel to auto-reap children, making waitpid() return
+ // -1/ECHILD instead of the exit status we need.
+ struct sigaction Action = {};
+ sigemptyset(&Action.sa_mask);
+ Action.sa_handler = SIG_DFL;
+ sigaction(SIGCHLD, &Action, nullptr);
+}
+
+SubmitResult
+WineProcessRunner::SubmitAction(Ref<RunnerAction> Action)
+{
+ ZEN_TRACE_CPU("WineProcessRunner::SubmitAction");
+ std::optional<PreparedAction> Prepared = PrepareActionSubmission(Action);
+
+ if (!Prepared)
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+
+ // Build environment array from worker descriptor
+
+ CbObject WorkerDescription = Prepared->WorkerPackage.GetObject();
+
+ std::vector<std::string> EnvStrings;
+ for (auto& It : WorkerDescription["environment"sv])
+ {
+ EnvStrings.emplace_back(It.AsString());
+ }
+
+ std::vector<char*> Envp;
+ Envp.reserve(EnvStrings.size() + 1);
+ for (auto& Str : EnvStrings)
+ {
+ Envp.push_back(Str.data());
+ }
+ Envp.push_back(nullptr);
+
+ // Build argv: wine <worker_exe_path> -Build=build.action
+
+ std::string_view ExecPath = WorkerDescription["path"sv].AsString();
+ std::filesystem::path ExePath = Prepared->WorkerPath / std::filesystem::path(ExecPath);
+ std::string ExePathStr = ExePath.string();
+ std::string WinePathStr = m_WinePath;
+ std::string BuildArg = "-Build=build.action";
+
+ std::vector<char*> ArgV;
+ ArgV.push_back(WinePathStr.data());
+ ArgV.push_back(ExePathStr.data());
+ ArgV.push_back(BuildArg.data());
+ ArgV.push_back(nullptr);
+
+ ZEN_DEBUG("Executing via Wine: {} {} {}", WinePathStr, ExePathStr, BuildArg);
+
+ std::string SandboxPathStr = Prepared->SandboxPath.string();
+
+ pid_t ChildPid = fork();
+
+ if (ChildPid < 0)
+ {
+ throw std::runtime_error(fmt::format("fork() failed: {}", strerror(errno)));
+ }
+
+ if (ChildPid == 0)
+ {
+ // Child process
+ if (chdir(SandboxPathStr.c_str()) != 0)
+ {
+ _exit(127);
+ }
+
+ execve(WinePathStr.c_str(), ArgV.data(), Envp.data());
+
+ // execve only returns on failure
+ _exit(127);
+ }
+
+ // Parent: store child pid as void* (same convention as zencore/process.cpp)
+
+ Ref<RunningAction> NewAction{new RunningAction()};
+ NewAction->Action = Action;
+ NewAction->ProcessHandle = reinterpret_cast<void*>(static_cast<intptr_t>(ChildPid));
+ NewAction->SandboxPath = std::move(Prepared->SandboxPath);
+
+ {
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+ m_RunningMap[Prepared->ActionLsn] = std::move(NewAction);
+ }
+
+ Action->SetActionState(RunnerAction::State::Running);
+
+ return SubmitResult{.IsAccepted = true};
+}
+
+void
+WineProcessRunner::SweepRunningActions()
+{
+ ZEN_TRACE_CPU("WineProcessRunner::SweepRunningActions");
+ std::vector<Ref<RunningAction>> CompletedActions;
+
+ m_RunningLock.WithExclusiveLock([&] {
+ for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd;)
+ {
+ Ref<RunningAction> Running = It->second;
+
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running->ProcessHandle));
+ int Status = 0;
+
+ pid_t Result = waitpid(Pid, &Status, WNOHANG);
+
+ if (Result == Pid)
+ {
+ if (WIFEXITED(Status))
+ {
+ Running->ExitCode = WEXITSTATUS(Status);
+ }
+ else if (WIFSIGNALED(Status))
+ {
+ Running->ExitCode = 128 + WTERMSIG(Status);
+ }
+ else
+ {
+ Running->ExitCode = 1;
+ }
+
+ Running->ProcessHandle = nullptr;
+
+ CompletedActions.push_back(std::move(Running));
+ It = m_RunningMap.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+ });
+
+ ProcessCompletedActions(CompletedActions);
+}
+
+void
+WineProcessRunner::CancelRunningActions()
+{
+ ZEN_TRACE_CPU("WineProcessRunner::CancelRunningActions");
+ Stopwatch Timer;
+ std::unordered_map<int, Ref<RunningAction>> RunningMap;
+
+ m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); });
+
+ if (RunningMap.empty())
+ {
+ return;
+ }
+
+ ZEN_INFO("cancelling all running actions");
+
+ // Send SIGTERM to all running processes first
+
+ for (const auto& [Lsn, Running] : RunningMap)
+ {
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running->ProcessHandle));
+
+ if (kill(Pid, SIGTERM) != 0)
+ {
+ ZEN_WARN("kill(SIGTERM) for LSN {} (pid {}) failed: {}", Running->Action->ActionLsn, Pid, strerror(errno));
+ }
+ }
+
+ // Wait for all processes, regardless of whether SIGTERM succeeded, then clean up.
+
+ for (auto& [Lsn, Running] : RunningMap)
+ {
+ pid_t Pid = static_cast<pid_t>(reinterpret_cast<intptr_t>(Running->ProcessHandle));
+
+ // Poll for up to 2 seconds
+ bool Exited = false;
+ for (int i = 0; i < 20; ++i)
+ {
+ int Status = 0;
+ pid_t WaitResult = waitpid(Pid, &Status, WNOHANG);
+ if (WaitResult == Pid)
+ {
+ Exited = true;
+ ZEN_DEBUG("LSN {}: process exit OK", Running->Action->ActionLsn);
+ break;
+ }
+ usleep(100000); // 100ms
+ }
+
+ if (!Exited)
+ {
+ ZEN_WARN("LSN {}: process did not exit after SIGTERM, sending SIGKILL", Running->Action->ActionLsn);
+ kill(Pid, SIGKILL);
+ waitpid(Pid, nullptr, 0);
+ }
+
+ m_DeferredDeleter.Enqueue(Running->Action->ActionLsn, std::move(Running->SandboxPath));
+ Running->Action->SetActionState(RunnerAction::State::Failed);
+ }
+
+ ZEN_INFO("DONE - cancelled {} running processes (took {})", RunningMap.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+}
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/winerunner.h b/src/zencompute/runners/winerunner.h
new file mode 100644
index 000000000..7df62e7c0
--- /dev/null
+++ b/src/zencompute/runners/winerunner.h
@@ -0,0 +1,37 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "localrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES && ZEN_PLATFORM_LINUX
+
+# include <string>
+
+namespace zen::compute {
+
+/** Wine-based process runner for executing Windows worker executables on Linux.
+
+ Subclasses LocalProcessRunner, reusing sandbox management, worker manifesting,
+ input/output handling, and monitor thread infrastructure. Overrides only the
+ platform-specific methods: process spawning, sweep, and cancellation.
+ */
+class WineProcessRunner : public LocalProcessRunner
+{
+public:
+ WineProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool);
+
+ [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
+ void SweepRunningActions() override;
+ void CancelRunningActions() override;
+
+private:
+ std::string m_WinePath = "wine";
+};
+
+} // namespace zen::compute
+
+#endif