aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/runners
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/runners')
-rw-r--r--src/zencompute/runners/functionrunner.cpp132
-rw-r--r--src/zencompute/runners/functionrunner.h27
-rw-r--r--src/zencompute/runners/linuxrunner.cpp22
-rw-r--r--src/zencompute/runners/localrunner.cpp40
-rw-r--r--src/zencompute/runners/localrunner.h4
-rw-r--r--src/zencompute/runners/macrunner.cpp18
-rw-r--r--src/zencompute/runners/managedrunner.cpp279
-rw-r--r--src/zencompute/runners/managedrunner.h64
-rw-r--r--src/zencompute/runners/remotehttprunner.cpp366
-rw-r--r--src/zencompute/runners/remotehttprunner.h14
-rw-r--r--src/zencompute/runners/windowsrunner.cpp105
-rw-r--r--src/zencompute/runners/windowsrunner.h1
-rw-r--r--src/zencompute/runners/winerunner.cpp6
13 files changed, 864 insertions, 214 deletions
diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp
index 4f116e7d8..34bf065b4 100644
--- a/src/zencompute/runners/functionrunner.cpp
+++ b/src/zencompute/runners/functionrunner.cpp
@@ -6,9 +6,15 @@
# include <zencore/compactbinary.h>
# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/logging.h>
+# include <zencore/string.h>
+# include <zencore/timer.h>
# include <zencore/trace.h>
+# include <zencore/workthreadpool.h>
# include <fmt/format.h>
+# include <future>
# include <vector>
namespace zen::compute {
@@ -118,23 +124,34 @@ std::vector<SubmitResult>
BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
{
ZEN_TRACE_CPU("BaseRunnerGroup::SubmitActions");
- RwLock::SharedLockScope _(m_RunnersLock);
- const int RunnerCount = gsl::narrow<int>(m_Runners.size());
+ // Snapshot runners and query capacity under the lock, then release
+ // before submitting - HTTP submissions to remote runners can take
+ // hundreds of milliseconds and we must not hold m_RunnersLock during I/O.
- if (RunnerCount == 0)
- {
- return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"});
- }
+ std::vector<Ref<FunctionRunner>> Runners;
+ std::vector<size_t> Capacities;
+ std::vector<std::vector<Ref<RunnerAction>>> PerRunnerActions;
+ size_t TotalCapacity = 0;
- // Query capacity per runner and compute total
- std::vector<size_t> Capacities(RunnerCount);
- size_t TotalCapacity = 0;
+ m_RunnersLock.WithSharedLock([&] {
+ const int RunnerCount = gsl::narrow<int>(m_Runners.size());
+ Runners.assign(m_Runners.begin(), m_Runners.end());
+ Capacities.resize(RunnerCount);
+ PerRunnerActions.resize(RunnerCount);
- for (int i = 0; i < RunnerCount; ++i)
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ Capacities[i] = Runners[i]->QueryCapacity();
+ TotalCapacity += Capacities[i];
+ }
+ });
+
+ const int RunnerCount = gsl::narrow<int>(Runners.size());
+
+ if (RunnerCount == 0)
{
- Capacities[i] = m_Runners[i]->QueryCapacity();
- TotalCapacity += Capacities[i];
+ return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"});
}
if (TotalCapacity == 0)
@@ -143,9 +160,8 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
}
// Distribute actions across runners proportionally to their available capacity
- std::vector<std::vector<Ref<RunnerAction>>> PerRunnerActions(RunnerCount);
- std::vector<size_t> ActionRunnerIndex(Actions.size());
- size_t ActionIdx = 0;
+ std::vector<size_t> ActionRunnerIndex(Actions.size());
+ size_t ActionIdx = 0;
for (int i = 0; i < RunnerCount; ++i)
{
@@ -164,8 +180,9 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
}
}
- // Assign any remaining actions to runners with capacity (round-robin)
- for (int i = 0; ActionIdx < Actions.size(); i = (i + 1) % RunnerCount)
+ // Assign any remaining actions to runners with capacity (round-robin).
+ // Cap at TotalCapacity to avoid spinning when there are more actions than runners can accept.
+ for (int i = 0; ActionIdx < Actions.size() && ActionIdx < TotalCapacity; i = (i + 1) % RunnerCount)
{
if (Capacities[i] > PerRunnerActions[i].size())
{
@@ -175,22 +192,83 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
}
}
- // Submit batches per runner
+ // Submit batches per runner - in parallel when a worker pool is available
+
std::vector<std::vector<SubmitResult>> PerRunnerResults(RunnerCount);
+ int ActiveRunnerCount = 0;
for (int i = 0; i < RunnerCount; ++i)
{
if (!PerRunnerActions[i].empty())
{
- PerRunnerResults[i] = m_Runners[i]->SubmitActions(PerRunnerActions[i]);
+ ++ActiveRunnerCount;
+ }
+ }
+
+ static constexpr uint64_t SubmitWarnThresholdMs = 500;
+
+ auto SubmitToRunner = [&](int RunnerIndex) {
+ auto& Runner = Runners[RunnerIndex];
+ Runner->m_LastSubmitStats.Reset();
+
+ Stopwatch Timer;
+
+ PerRunnerResults[RunnerIndex] = Runner->SubmitActions(PerRunnerActions[RunnerIndex]);
+
+ uint64_t ElapsedMs = Timer.GetElapsedTimeMs();
+ if (ElapsedMs >= SubmitWarnThresholdMs)
+ {
+ size_t Attachments = Runner->m_LastSubmitStats.TotalAttachments.load(std::memory_order_relaxed);
+ uint64_t AttachmentBytes = Runner->m_LastSubmitStats.TotalAttachmentBytes.load(std::memory_order_relaxed);
+
+ ZEN_WARN("submit of {} actions ({} attachments, {}) to '{}' took {}ms",
+ PerRunnerActions[RunnerIndex].size(),
+ Attachments,
+ NiceBytes(AttachmentBytes),
+ Runner->GetDisplayName(),
+ ElapsedMs);
+ }
+ };
+
+ if (m_WorkerPool && ActiveRunnerCount > 1)
+ {
+ std::vector<std::future<void>> Futures(RunnerCount);
+
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (!PerRunnerActions[i].empty())
+ {
+ std::packaged_task<void()> Task([&SubmitToRunner, i]() { SubmitToRunner(i); });
+
+ Futures[i] = m_WorkerPool->EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog);
+ }
+ }
+
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (Futures[i].valid())
+ {
+ Futures[i].get();
+ }
+ }
+ }
+ else
+ {
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (!PerRunnerActions[i].empty())
+ {
+ SubmitToRunner(i);
+ }
}
}
- // Reassemble results in original action order
- std::vector<SubmitResult> Results(Actions.size());
+ // Reassemble results in original action order.
+ // Actions beyond ActionIdx were not assigned to any runner (insufficient capacity).
+ std::vector<SubmitResult> Results(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No capacity"});
std::vector<size_t> PerRunnerIdx(RunnerCount, 0);
- for (size_t i = 0; i < Actions.size(); ++i)
+ for (size_t i = 0; i < ActionIdx; ++i)
{
size_t RunnerIdx = ActionRunnerIndex[i];
size_t Idx = PerRunnerIdx[RunnerIdx]++;
@@ -307,10 +385,11 @@ RunnerAction::RetractAction()
bool
RunnerAction::ResetActionStateToPending()
{
- // Only allow reset from Failed, Abandoned, or Retracted states
+ // Only allow reset from Failed, Abandoned, Rejected, or Retracted states
State CurrentState = m_ActionState.load();
- if (CurrentState != State::Failed && CurrentState != State::Abandoned && CurrentState != State::Retracted)
+ if (CurrentState != State::Failed && CurrentState != State::Abandoned && CurrentState != State::Rejected &&
+ CurrentState != State::Retracted)
{
return false;
}
@@ -331,11 +410,12 @@ RunnerAction::ResetActionStateToPending()
// Clear execution fields
ExecutionLocation.clear();
+ FailureReason.clear();
CpuUsagePercent.store(-1.0f, std::memory_order_relaxed);
CpuSeconds.store(0.0f, std::memory_order_relaxed);
- // Increment retry count (skip for Retracted — nothing failed)
- if (CurrentState != State::Retracted)
+ // Increment retry count (skip for Retracted/Rejected - nothing failed)
+ if (CurrentState != State::Retracted && CurrentState != State::Rejected)
{
RetryCount.fetch_add(1, std::memory_order_relaxed);
}
diff --git a/src/zencompute/runners/functionrunner.h b/src/zencompute/runners/functionrunner.h
index 56c3f3af0..371a60b7a 100644
--- a/src/zencompute/runners/functionrunner.h
+++ b/src/zencompute/runners/functionrunner.h
@@ -10,6 +10,10 @@
# include <filesystem>
# include <vector>
+namespace zen {
+class WorkerThreadPool;
+}
+
namespace zen::compute {
struct SubmitResult
@@ -37,6 +41,22 @@ public:
[[nodiscard]] virtual bool IsHealthy() = 0;
[[nodiscard]] virtual size_t QueryCapacity();
[[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
+ [[nodiscard]] virtual std::string_view GetDisplayName() const { return "local"; }
+
+ // Accumulated stats from the most recent SubmitActions call.
+ // Reset before each call, populated by the runner implementation.
+ struct SubmitStats
+ {
+ std::atomic<size_t> TotalAttachments{0};
+ std::atomic<uint64_t> TotalAttachmentBytes{0};
+
+ void Reset()
+ {
+ TotalAttachments.store(0, std::memory_order_relaxed);
+ TotalAttachmentBytes.store(0, std::memory_order_relaxed);
+ }
+ };
+ SubmitStats m_LastSubmitStats;
// Best-effort cancellation of a specific in-flight action. Returns true if the
// cancellation signal was successfully sent. The action will transition to Cancelled
@@ -68,6 +88,8 @@ public:
bool CancelAction(int ActionLsn);
void CancelRemoteQueue(int QueueId);
+ void SetWorkerPool(WorkerThreadPool* Pool) { m_WorkerPool = Pool; }
+
size_t GetRunnerCount()
{
return m_RunnersLock.WithSharedLock([this] { return m_Runners.size(); });
@@ -79,6 +101,7 @@ protected:
RwLock m_RunnersLock;
std::vector<Ref<FunctionRunner>> m_Runners;
std::atomic<int> m_NextSubmitIndex{0};
+ WorkerThreadPool* m_WorkerPool = nullptr;
};
/** Typed RunnerGroup that adds type-safe runner addition and predicate-based removal.
@@ -151,6 +174,7 @@ struct RunnerAction : public RefCounted
CbObject ActionObj;
int Priority = 0;
std::string ExecutionLocation; // "local" or remote hostname
+ std::string FailureReason; // human-readable reason when action fails (empty on success)
// CPU usage and total CPU time of the running process, sampled periodically by the local runner.
// CpuUsagePercent: -1.0 means not yet sampled; >=0.0 is the most recent reading as a percentage.
@@ -168,6 +192,7 @@ struct RunnerAction : public RefCounted
Completed, // Finished successfully with results available
Failed, // Execution failed (transient error, eligible for retry)
Abandoned, // Infrastructure termination (e.g. spot eviction, session abandon)
+ Rejected, // Runner declined (e.g. at capacity) - rescheduled without retry cost
Cancelled, // Intentional user cancellation (never retried)
Retracted, // Pulled back for rescheduling on a different runner (no retry cost)
_Count
@@ -194,6 +219,8 @@ struct RunnerAction : public RefCounted
return "Failed";
case State::Abandoned:
return "Abandoned";
+ case State::Rejected:
+ return "Rejected";
case State::Cancelled:
return "Cancelled";
case State::Retracted:
diff --git a/src/zencompute/runners/linuxrunner.cpp b/src/zencompute/runners/linuxrunner.cpp
index e79a6c90f..be4274823 100644
--- a/src/zencompute/runners/linuxrunner.cpp
+++ b/src/zencompute/runners/linuxrunner.cpp
@@ -195,7 +195,7 @@ namespace {
WriteErrorAndExit(ErrorPipeFd, "bind mount /lib failed", errno);
}
- // /lib64 (optional — not all distros have it)
+ // /lib64 (optional - not all distros have it)
{
struct stat St;
if (stat("/lib64", &St) == 0 && S_ISDIR(St.st_mode))
@@ -208,7 +208,7 @@ namespace {
}
}
- // /etc (required — for resolv.conf, ld.so.cache, etc.)
+ // /etc (required - for resolv.conf, ld.so.cache, etc.)
if (MkdirIfNeeded(BuildPath("etc"), 0755) != 0)
{
WriteErrorAndExit(ErrorPipeFd, "mkdir sandbox/etc failed", errno);
@@ -218,7 +218,7 @@ namespace {
WriteErrorAndExit(ErrorPipeFd, "bind mount /etc failed", errno);
}
- // /worker — bind-mount worker directory (contains the executable)
+ // /worker - bind-mount worker directory (contains the executable)
if (MkdirIfNeeded(BuildPath("worker"), 0755) != 0)
{
WriteErrorAndExit(ErrorPipeFd, "mkdir sandbox/worker failed", errno);
@@ -331,6 +331,8 @@ LinuxProcessRunner::LinuxProcessRunner(ChunkResolver& Resolver,
{
ZEN_INFO("namespace sandboxing enabled for child processes");
}
+
+ StartMonitorThread();
}
SubmitResult
@@ -428,11 +430,12 @@ LinuxProcessRunner::SubmitAction(Ref<RunnerAction> Action)
if (ChildPid == 0)
{
- // Child process
+ // Child process - lower priority so workers don't starve the main server
+ nice(5);
if (m_Sandboxed)
{
- // Close read end of error pipe — child only writes
+ // Close read end of error pipe - child only writes
close(ErrorPipe[0]);
SetupNamespaceSandbox(SandboxPathStr.c_str(), CurrentUid, CurrentGid, WorkerPathStr.c_str(), ErrorPipe[1]);
@@ -459,7 +462,7 @@ LinuxProcessRunner::SubmitAction(Ref<RunnerAction> Action)
if (m_Sandboxed)
{
- // Close write end of error pipe — parent only reads
+ // Close write end of error pipe - parent only reads
close(ErrorPipe[1]);
// Read from error pipe. If execve succeeded, pipe was closed by O_CLOEXEC
@@ -479,7 +482,8 @@ LinuxProcessRunner::SubmitAction(Ref<RunnerAction> Action)
// Clean up the sandbox in the background
m_DeferredDeleter.Enqueue(Action->ActionLsn, std::move(Prepared->SandboxPath));
- ZEN_ERROR("Sandbox setup failed for action {}: {}", Action->ActionLsn, ErrBuf);
+ Action->FailureReason = fmt::format("sandbox setup failed: {}", ErrBuf);
+ ZEN_ERROR("action {} ({}): {}", Action->ActionId, Action->ActionLsn, Action->FailureReason);
Action->SetActionState(RunnerAction::State::Failed);
return SubmitResult{.IsAccepted = false};
@@ -675,7 +679,7 @@ ReadProcStatCpuTicks(pid_t Pid)
Buf[Len] = '\0';
- // Skip past "pid (name) " — find last ')' to handle names containing spaces or parens
+ // Skip past "pid (name) " - find last ')' to handle names containing spaces or parens
const char* P = strrchr(Buf, ')');
if (!P)
{
@@ -705,7 +709,7 @@ LinuxProcessRunner::SampleProcessCpu(RunningAction& Running)
if (CurrentOsTicks == 0)
{
- // Process gone or /proc entry unreadable — record timestamp without updating usage
+ // Process gone or /proc entry unreadable - record timestamp without updating usage
Running.LastCpuSampleTicks = NowTicks;
Running.LastCpuOsTicks = 0;
return;
diff --git a/src/zencompute/runners/localrunner.cpp b/src/zencompute/runners/localrunner.cpp
index b61e0a46f..259965e23 100644
--- a/src/zencompute/runners/localrunner.cpp
+++ b/src/zencompute/runners/localrunner.cpp
@@ -4,6 +4,8 @@
#if ZEN_WITH_COMPUTE_SERVICES
+# include "pathvalidation.h"
+
# include <zencore/compactbinary.h>
# include <zencore/compactbinarybuilder.h>
# include <zencore/compactbinarypackage.h>
@@ -104,8 +106,6 @@ LocalProcessRunner::LocalProcessRunner(ChunkResolver& Resolver,
ZEN_INFO("Cleanup complete");
}
- m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this};
-
# if ZEN_PLATFORM_WINDOWS
// Suppress any error dialogs caused by missing dependencies
UINT OldMode = ::SetErrorMode(0);
@@ -337,7 +337,7 @@ LocalProcessRunner::PrepareActionSubmission(Ref<RunnerAction> Action)
SubmitResult
LocalProcessRunner::SubmitAction(Ref<RunnerAction> Action)
{
- // Base class is not directly usable — platform subclasses override this
+ // Base class is not directly usable - platform subclasses override this
ZEN_UNUSED(Action);
return SubmitResult{.IsAccepted = false};
}
@@ -357,14 +357,21 @@ LocalProcessRunner::ManifestWorker(const WorkerDesc& Worker)
std::filesystem::path WorkerDir = m_WorkerPath / fmt::format("runner_{}", Worker.WorkerId);
- if (!std::filesystem::exists(WorkerDir))
+ // worker.zcb is written as the last step of ManifestWorker, so its presence
+ // indicates a complete manifest. If the directory exists but the marker is
+ // missing, a previous manifest was interrupted and we need to start over.
+ bool NeedsManifest = !std::filesystem::exists(WorkerDir / "worker.zcb");
+
+ if (NeedsManifest)
{
_.ReleaseNow();
RwLock::ExclusiveLockScope $(m_WorkerLock);
- if (!std::filesystem::exists(WorkerDir))
+ if (!std::filesystem::exists(WorkerDir / "worker.zcb"))
{
+ std::error_code Ec;
+ std::filesystem::remove_all(WorkerDir, Ec);
ManifestWorker(Worker.Descriptor, WorkerDir, [](const IoHash&, CompressedBuffer&) {});
}
}
@@ -382,6 +389,8 @@ LocalProcessRunner::DecompressAttachmentToFile(const CbPackage& FromP
const IoHash ChunkHash = FileEntry["hash"sv].AsHash();
const uint64_t Size = FileEntry["size"sv].AsUInt64();
+ ValidateSandboxRelativePath(Name);
+
CompressedBuffer Compressed;
if (const CbAttachment* Attachment = FromPackage.FindAttachment(ChunkHash))
@@ -457,7 +466,8 @@ LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage,
for (auto& It : WorkerDescription["dirs"sv])
{
- std::string_view Name = It.AsString();
+ std::string_view Name = It.AsString();
+ ValidateSandboxRelativePath(Name);
std::filesystem::path DirPath{SandboxPath / std::filesystem::path(Name).make_preferred()};
// Validate dir path stays within sandbox
@@ -482,6 +492,8 @@ LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage,
}
WriteFile(SandboxPath / "worker.zcb", WorkerDescription.GetBuffer().AsIoBuffer());
+
+ ZEN_INFO("manifested worker '{}' in '{}'", WorkerPackage.GetObjectHash(), SandboxPath);
}
CbPackage
@@ -540,6 +552,12 @@ LocalProcessRunner::GatherActionOutputs(std::filesystem::path SandboxPath)
}
void
+LocalProcessRunner::StartMonitorThread()
+{
+ m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this};
+}
+
+void
LocalProcessRunner::MonitorThreadFunction()
{
SetCurrentThreadName("LocalProcessRunner_Monitor");
@@ -602,7 +620,7 @@ LocalProcessRunner::MonitorThreadFunction()
void
LocalProcessRunner::CancelRunningActions()
{
- // Base class is not directly usable — platform subclasses override this
+ // Base class is not directly usable - platform subclasses override this
}
void
@@ -662,9 +680,15 @@ LocalProcessRunner::ProcessCompletedActions(std::vector<Ref<RunningAction>>& Com
}
catch (std::exception& Ex)
{
- ZEN_ERROR("Encountered failure while gathering outputs for action lsn {}, '{}'", ActionLsn, Ex.what());
+ Running->Action->FailureReason = fmt::format("exception gathering outputs: {}", Ex.what());
+ ZEN_ERROR("action {} ({}) failed: {}", Running->Action->ActionId, ActionLsn, Running->Action->FailureReason);
}
}
+ else
+ {
+ Running->Action->FailureReason = fmt::format("process exited with code {}", Running->ExitCode);
+ ZEN_WARN("action {} ({}) failed: {}", Running->Action->ActionId, ActionLsn, Running->Action->FailureReason);
+ }
// Failed - clean up the sandbox in the background.
diff --git a/src/zencompute/runners/localrunner.h b/src/zencompute/runners/localrunner.h
index b8cff6826..d6589db43 100644
--- a/src/zencompute/runners/localrunner.h
+++ b/src/zencompute/runners/localrunner.h
@@ -67,6 +67,7 @@ protected:
{
Ref<RunnerAction> Action;
void* ProcessHandle = nullptr;
+ int Pid = 0;
int ExitCode = 0;
std::filesystem::path SandboxPath;
@@ -83,8 +84,6 @@ protected:
std::filesystem::path m_SandboxPath;
int32_t m_MaxRunningActions = 64; // arbitrary limit for testing
- // if used in conjuction with m_ResultsLock, this lock must be taken *after*
- // m_ResultsLock to avoid deadlocks
RwLock m_RunningLock;
std::unordered_map<int, Ref<RunningAction>> m_RunningMap;
@@ -95,6 +94,7 @@ protected:
std::thread m_MonitorThread;
std::atomic<bool> m_MonitorThreadEnabled{true};
Event m_MonitorThreadEvent;
+ void StartMonitorThread();
void MonitorThreadFunction();
virtual void SweepRunningActions();
virtual void CancelRunningActions();
diff --git a/src/zencompute/runners/macrunner.cpp b/src/zencompute/runners/macrunner.cpp
index 5cec90699..ab24d4672 100644
--- a/src/zencompute/runners/macrunner.cpp
+++ b/src/zencompute/runners/macrunner.cpp
@@ -130,6 +130,8 @@ MacProcessRunner::MacProcessRunner(ChunkResolver& Resolver,
{
ZEN_INFO("Seatbelt sandboxing enabled for child processes");
}
+
+ StartMonitorThread();
}
SubmitResult
@@ -209,18 +211,19 @@ MacProcessRunner::SubmitAction(Ref<RunnerAction> Action)
if (ChildPid == 0)
{
- // Child process
+ // Child process - lower priority so workers don't starve the main server
+ nice(5);
if (m_Sandboxed)
{
- // Close read end of error pipe — child only writes
+ // Close read end of error pipe - child only writes
close(ErrorPipe[0]);
// Apply Seatbelt sandbox profile
char* ErrorBuf = nullptr;
if (sandbox_init(SandboxProfile.c_str(), 0, &ErrorBuf) != 0)
{
- // sandbox_init failed — write error to pipe and exit
+ // sandbox_init failed - write error to pipe and exit
if (ErrorBuf)
{
WriteErrorAndExit(ErrorPipe[1], ErrorBuf, 0);
@@ -259,7 +262,7 @@ MacProcessRunner::SubmitAction(Ref<RunnerAction> Action)
if (m_Sandboxed)
{
- // Close write end of error pipe — parent only reads
+ // Close write end of error pipe - parent only reads
close(ErrorPipe[1]);
// Read from error pipe. If execve succeeded, pipe was closed by O_CLOEXEC
@@ -279,7 +282,8 @@ MacProcessRunner::SubmitAction(Ref<RunnerAction> Action)
// Clean up the sandbox in the background
m_DeferredDeleter.Enqueue(Action->ActionLsn, std::move(Prepared->SandboxPath));
- ZEN_ERROR("Sandbox setup failed for action {}: {}", Action->ActionLsn, ErrBuf);
+ Action->FailureReason = fmt::format("sandbox setup failed: {}", ErrBuf);
+ ZEN_ERROR("action {} ({}): {}", Action->ActionId, Action->ActionLsn, Action->FailureReason);
Action->SetActionState(RunnerAction::State::Failed);
return SubmitResult{.IsAccepted = false};
@@ -467,7 +471,7 @@ MacProcessRunner::SampleProcessCpu(RunningAction& Running)
const uint64_t CurrentOsTicks = Info.pti_total_user + Info.pti_total_system;
const uint64_t NowTicks = GetHifreqTimerValue();
- // Cumulative CPU seconds (absolute, available from first sample): ns → seconds
+ // Cumulative CPU seconds (absolute, available from first sample): ns -> seconds
Running.Action->CpuSeconds.store(static_cast<float>(static_cast<double>(CurrentOsTicks) / 1'000'000'000.0), std::memory_order_relaxed);
if (Running.LastCpuSampleTicks != 0 && Running.LastCpuOsTicks != 0)
@@ -476,7 +480,7 @@ MacProcessRunner::SampleProcessCpu(RunningAction& Running)
if (ElapsedMs > 0)
{
const uint64_t DeltaOsTicks = CurrentOsTicks - Running.LastCpuOsTicks;
- // ns → ms: divide by 1,000,000; then as percent of elapsed ms
+ // ns -> ms: divide by 1,000,000; then as percent of elapsed ms
const float CpuPct = static_cast<float>(static_cast<double>(DeltaOsTicks) / 1'000'000.0 / ElapsedMs * 100.0);
Running.Action->CpuUsagePercent.store(CpuPct, std::memory_order_relaxed);
}
diff --git a/src/zencompute/runners/managedrunner.cpp b/src/zencompute/runners/managedrunner.cpp
new file mode 100644
index 000000000..a4f586852
--- /dev/null
+++ b/src/zencompute/runners/managedrunner.cpp
@@ -0,0 +1,279 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "managedrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/except_fmt.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/scopeguard.h>
+# include <zencore/timer.h>
+# include <zencore/trace.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <asio/io_context.hpp>
+# include <asio/executor_work_guard.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+ManagedProcessRunner::ManagedProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool,
+ int32_t MaxConcurrentActions)
+: LocalProcessRunner(Resolver, BaseDir, Deleter, WorkerPool, MaxConcurrentActions)
+, m_IoContext(std::make_unique<asio::io_context>())
+, m_SubprocessManager(std::make_unique<SubprocessManager>(*m_IoContext))
+{
+ m_ProcessGroup = m_SubprocessManager->CreateGroup("compute-workers");
+
+ // Run the io_context on a small thread pool so that exit callbacks and
+ // metrics sampling are dispatched without blocking each other.
+ for (int i = 0; i < kIoThreadCount; ++i)
+ {
+ m_IoThreads.emplace_back([this, i] {
+ SetCurrentThreadName(fmt::format("mrunner_{}", i));
+
+ // work_guard keeps run() alive even when there is no pending work yet
+ auto WorkGuard = asio::make_work_guard(*m_IoContext);
+
+ m_IoContext->run();
+ });
+ }
+}
+
+ManagedProcessRunner::~ManagedProcessRunner()
+{
+ try
+ {
+ Shutdown();
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("exception during managed process runner shutdown: {}", Ex.what());
+ }
+}
+
+void
+ManagedProcessRunner::Shutdown()
+{
+ ZEN_TRACE_CPU("ManagedProcessRunner::Shutdown");
+ m_AcceptNewActions = false;
+
+ CancelRunningActions();
+
+ // Tear down the SubprocessManager before stopping the io_context so that
+ // any in-flight callbacks are drained cleanly.
+ if (m_SubprocessManager)
+ {
+ m_SubprocessManager->DestroyGroup("compute-workers");
+ m_ProcessGroup = nullptr;
+ m_SubprocessManager.reset();
+ }
+
+ if (m_IoContext)
+ {
+ m_IoContext->stop();
+ }
+
+ for (std::thread& Thread : m_IoThreads)
+ {
+ if (Thread.joinable())
+ {
+ Thread.join();
+ }
+ }
+ m_IoThreads.clear();
+}
+
+SubmitResult
+ManagedProcessRunner::SubmitAction(Ref<RunnerAction> Action)
+{
+ ZEN_TRACE_CPU("ManagedProcessRunner::SubmitAction");
+ std::optional<PreparedAction> Prepared = PrepareActionSubmission(Action);
+
+ if (!Prepared)
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+
+ CbObject WorkerDescription = Prepared->WorkerPackage.GetObject();
+
+ // Parse environment variables from worker descriptor ("KEY=VALUE" strings)
+ // into the key-value pairs expected by CreateProcOptions.
+ std::vector<std::pair<std::string, std::string>> EnvPairs;
+ for (auto& It : WorkerDescription["environment"sv])
+ {
+ std::string_view Str = It.AsString();
+ size_t Eq = Str.find('=');
+ if (Eq != std::string_view::npos)
+ {
+ EnvPairs.emplace_back(std::string(Str.substr(0, Eq)), std::string(Str.substr(Eq + 1)));
+ }
+ }
+
+ // Build command line
+ std::string_view ExecPath = WorkerDescription["path"sv].AsString();
+ std::filesystem::path ExePath = Prepared->WorkerPath / std::filesystem::path(ExecPath).make_preferred();
+
+ std::string CommandLine = fmt::format("\"{}\" -Build=build.action"sv, ExePath.string());
+
+ ZEN_DEBUG("Executing (managed): '{}' (sandbox='{}')", CommandLine, Prepared->SandboxPath);
+
+ CreateProcOptions Options;
+ Options.WorkingDirectory = &Prepared->SandboxPath;
+ Options.Flags = CreateProcOptions::Flag_NoConsole | CreateProcOptions::Flag_BelowNormalPriority;
+ Options.Environment = std::move(EnvPairs);
+
+ const int32_t ActionLsn = Prepared->ActionLsn;
+
+ ManagedProcess* Proc = nullptr;
+
+ try
+ {
+ Proc = m_ProcessGroup->Spawn(ExePath, CommandLine, Options, [this, ActionLsn](ManagedProcess& /*Process*/, int ExitCode) {
+ OnProcessExit(ActionLsn, ExitCode);
+ });
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to spawn process for action LSN {}: {}", ActionLsn, Ex.what());
+ m_DeferredDeleter.Enqueue(ActionLsn, std::move(Prepared->SandboxPath));
+ return SubmitResult{.IsAccepted = false};
+ }
+
+ {
+ Ref<RunningAction> NewAction{new RunningAction()};
+ NewAction->Action = Action;
+ NewAction->ProcessHandle = static_cast<void*>(Proc);
+ NewAction->Pid = Proc->Pid();
+ NewAction->SandboxPath = std::move(Prepared->SandboxPath);
+
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+ m_RunningMap[ActionLsn] = std::move(NewAction);
+ }
+
+ Action->SetActionState(RunnerAction::State::Running);
+
+ ZEN_DEBUG("Managed runner: action LSN {} -> PID {}", ActionLsn, Proc->Pid());
+
+ return SubmitResult{.IsAccepted = true};
+}
+
+void
+ManagedProcessRunner::OnProcessExit(int ActionLsn, int ExitCode)
+{
+ ZEN_TRACE_CPU("ManagedProcessRunner::OnProcessExit");
+
+ Ref<RunningAction> Running;
+
+ m_RunningLock.WithExclusiveLock([&] {
+ auto It = m_RunningMap.find(ActionLsn);
+ if (It != m_RunningMap.end())
+ {
+ Running = std::move(It->second);
+ m_RunningMap.erase(It);
+ }
+ });
+
+ if (!Running)
+ {
+ return;
+ }
+
+ ZEN_DEBUG("Managed runner: action LSN {} + PID {} exited with code " ZEN_BRIGHT_WHITE("{}"), ActionLsn, Running->Pid, ExitCode);
+
+ Running->ExitCode = ExitCode;
+
+ // Capture final CPU metrics from the managed process before it is removed.
+ auto* Proc = static_cast<ManagedProcess*>(Running->ProcessHandle);
+ if (Proc)
+ {
+ ProcessMetrics Metrics = Proc->GetLatestMetrics();
+ float CpuMs = static_cast<float>(Metrics.UserTimeMs + Metrics.KernelTimeMs);
+ Running->Action->CpuSeconds.store(CpuMs / 1000.0f, std::memory_order_relaxed);
+
+ float CpuPct = Proc->GetCpuUsagePercent();
+ if (CpuPct >= 0.0f)
+ {
+ Running->Action->CpuUsagePercent.store(CpuPct, std::memory_order_relaxed);
+ }
+ }
+
+ Running->ProcessHandle = nullptr;
+
+ std::vector<Ref<RunningAction>> CompletedActions;
+ CompletedActions.push_back(std::move(Running));
+ ProcessCompletedActions(CompletedActions);
+}
+
+void
+ManagedProcessRunner::CancelRunningActions()
+{
+ ZEN_TRACE_CPU("ManagedProcessRunner::CancelRunningActions");
+
+ std::unordered_map<int, Ref<RunningAction>> RunningMap;
+ m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); });
+
+ if (RunningMap.empty())
+ {
+ return;
+ }
+
+ ZEN_INFO("cancelling {} running actions via process group", RunningMap.size());
+
+ Stopwatch Timer;
+
+ // Kill all processes in the group atomically (TerminateJobObject on Windows,
+ // SIGTERM+SIGKILL on POSIX).
+ if (m_ProcessGroup)
+ {
+ m_ProcessGroup->KillAll();
+ }
+
+ for (auto& [Lsn, Running] : RunningMap)
+ {
+ m_DeferredDeleter.Enqueue(Running->Action->ActionLsn, std::move(Running->SandboxPath));
+ Running->Action->SetActionState(RunnerAction::State::Failed);
+ }
+
+ ZEN_INFO("DONE - cancelled {} running processes (took {})", RunningMap.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+}
+
+bool
+ManagedProcessRunner::CancelAction(int ActionLsn)
+{
+ ZEN_TRACE_CPU("ManagedProcessRunner::CancelAction");
+
+ ManagedProcess* Proc = nullptr;
+
+ m_RunningLock.WithSharedLock([&] {
+ auto It = m_RunningMap.find(ActionLsn);
+ if (It != m_RunningMap.end() && It->second->ProcessHandle != nullptr)
+ {
+ Proc = static_cast<ManagedProcess*>(It->second->ProcessHandle);
+ }
+ });
+
+ if (!Proc)
+ {
+ return false;
+ }
+
+ // Terminate the process. The exit callback will handle the rest
+ // (remove from running map, gather outputs or mark failed).
+ Proc->Terminate(222);
+
+ ZEN_DEBUG("CancelAction: initiated cancellation of LSN {}", ActionLsn);
+ return true;
+}
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/managedrunner.h b/src/zencompute/runners/managedrunner.h
new file mode 100644
index 000000000..21a44d43c
--- /dev/null
+++ b/src/zencompute/runners/managedrunner.h
@@ -0,0 +1,64 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "localrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zenutil/process/subprocessmanager.h>
+
+# include <memory>
+
+namespace asio {
+class io_context;
+}
+
+namespace zen::compute {
+
+/** Cross-platform process runner backed by SubprocessManager.
+
+ Subclasses LocalProcessRunner, reusing sandbox management, worker manifesting,
+ input/output handling, and shared action preparation. Replaces the polling-based
+ monitor thread with async exit callbacks driven by SubprocessManager, and
+ delegates CPU/memory metrics sampling to the manager's built-in round-robin
+ sampler.
+
+ A ProcessGroup (backed by a JobObject on Windows, process group on POSIX) is
+ used for bulk cancellation on shutdown.
+
+ This runner does not perform any platform-specific sandboxing (AppContainer,
+ namespaces, Seatbelt). It is intended as a simpler, cross-platform alternative
+ to the platform-specific runners for non-sandboxed workloads.
+ */
+class ManagedProcessRunner : public LocalProcessRunner
+{
+public:
+ ManagedProcessRunner(ChunkResolver& Resolver,
+ const std::filesystem::path& BaseDir,
+ DeferredDirectoryDeleter& Deleter,
+ WorkerThreadPool& WorkerPool,
+ int32_t MaxConcurrentActions = 0);
+ ~ManagedProcessRunner();
+
+ void Shutdown() override;
+ [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
+ void CancelRunningActions() override;
+ bool CancelAction(int ActionLsn) override;
+ [[nodiscard]] bool IsHealthy() override { return true; }
+
+private:
+ static constexpr int kIoThreadCount = 4;
+
+ // Exit callback posted on an io_context thread.
+ void OnProcessExit(int ActionLsn, int ExitCode);
+
+ std::unique_ptr<asio::io_context> m_IoContext;
+ std::unique_ptr<SubprocessManager> m_SubprocessManager;
+ ProcessGroup* m_ProcessGroup = nullptr;
+ std::vector<std::thread> m_IoThreads;
+};
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp
index ce6a81173..08f381b7f 100644
--- a/src/zencompute/runners/remotehttprunner.cpp
+++ b/src/zencompute/runners/remotehttprunner.cpp
@@ -20,6 +20,7 @@
# include <zenstore/cidstore.h>
# include <span>
+# include <unordered_set>
//////////////////////////////////////////////////////////////////////////
@@ -38,6 +39,7 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver,
, m_ChunkResolver{InChunkResolver}
, m_WorkerPool{InWorkerPool}
, m_HostName{HostName}
+, m_DisplayName{HostName}
, m_BaseUrl{fmt::format("{}/compute", HostName)}
, m_Http(m_BaseUrl)
, m_InstanceId(Oid::NewOid())
@@ -59,6 +61,15 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver,
m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this};
}
+void
+RemoteHttpRunner::SetRemoteHostname(std::string_view Hostname)
+{
+ if (!Hostname.empty())
+ {
+ m_DisplayName = fmt::format("{} ({})", m_HostName, Hostname);
+ }
+}
+
RemoteHttpRunner::~RemoteHttpRunner()
{
Shutdown();
@@ -108,6 +119,7 @@ RemoteHttpRunner::Shutdown()
for (auto& [RemoteLsn, HttpAction] : Remaining)
{
ZEN_DEBUG("shutdown: marking remote action LSN {} (local LSN {}) as Failed", RemoteLsn, HttpAction.Action->ActionLsn);
+ HttpAction.Action->FailureReason = "remote runner shutdown";
HttpAction.Action->SetActionState(RunnerAction::State::Failed);
}
}
@@ -213,11 +225,13 @@ RemoteHttpRunner::QueryCapacity()
return 0;
}
- // Estimate how much more work we're ready to accept
+ // Estimate how much more work we're ready to accept.
+ // Include actions currently being submitted over HTTP so we don't
+ // keep queueing new submissions while previous ones are still in flight.
RwLock::SharedLockScope _{m_RunningLock};
- size_t RunningCount = m_RemoteRunningMap.size();
+ size_t RunningCount = m_RemoteRunningMap.size() + m_InFlightSubmissions.load(std::memory_order_relaxed);
if (RunningCount >= size_t(m_MaxRunningActions))
{
@@ -232,6 +246,9 @@ RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
{
ZEN_TRACE_CPU("RemoteHttpRunner::SubmitActions");
+ m_InFlightSubmissions.fetch_add(Actions.size(), std::memory_order_relaxed);
+ auto InFlightGuard = MakeGuard([&] { m_InFlightSubmissions.fetch_sub(Actions.size(), std::memory_order_relaxed); });
+
if (Actions.size() <= 1)
{
std::vector<SubmitResult> Results;
@@ -246,7 +263,7 @@ RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
// Collect distinct QueueIds and ensure remote queues exist once per queue
- std::unordered_map<int, Oid> QueueTokens; // QueueId → remote token (0 stays as Zero)
+ std::unordered_map<int, Oid> QueueTokens; // QueueId -> remote token (0 stays as Zero)
for (const Ref<RunnerAction>& Action : Actions)
{
@@ -359,108 +376,141 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
}
}
- // Enqueue job. If the remote returns FailedDependency (424), it means it
- // cannot resolve the worker/function — re-register the worker and retry once.
+ // Submit the action to the remote. In eager-attach mode we build a
+ // CbPackage with all referenced attachments upfront to avoid the 404
+ // round-trip. In the default mode we POST the bare object first and
+ // only upload missing attachments if the remote requests them.
+ //
+ // In both modes, FailedDependency (424) triggers a worker re-register
+ // and a single retry.
CbObject Result;
HttpClient::Response WorkResponse;
HttpResponseCode WorkResponseCode{};
- for (int Attempt = 0; Attempt < 2; ++Attempt)
- {
- WorkResponse = m_Http.Post(SubmitUrl, ActionObj);
- WorkResponseCode = WorkResponse.StatusCode;
-
- if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0)
- {
- ZEN_WARN("remote {} returned FailedDependency for action {} — re-registering worker and retrying",
- m_Http.GetBaseUri(),
- ActionId);
-
- (void)RegisterWorker(Action->Worker.Descriptor);
- }
- else
- {
- break;
- }
- }
-
- if (WorkResponseCode == HttpResponseCode::OK)
- {
- Result = WorkResponse.AsObject();
- }
- else if (WorkResponseCode == HttpResponseCode::NotFound)
+ if (m_EagerAttach)
{
- // Not all attachments are present
-
- // Build response package including all required attachments
-
CbPackage Pkg;
Pkg.SetObject(ActionObj);
- CbObject Response = WorkResponse.AsObject();
+ ActionObj.IterateAttachments([&](CbFieldView Field) {
+ const IoHash AttachHash = Field.AsHash();
- for (auto& Item : Response["need"sv])
- {
- const IoHash NeedHash = Item.AsHash();
-
- if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(AttachHash))
{
uint64_t DataRawSize = 0;
IoHash DataRawHash;
CompressedBuffer Compressed =
CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
- ZEN_ASSERT(DataRawHash == NeedHash);
+ Pkg.AddAttachment(CbAttachment(Compressed, AttachHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
+ }
+ });
+
+ for (int Attempt = 0; Attempt < 2; ++Attempt)
+ {
+ WorkResponse = m_Http.Post(SubmitUrl, Pkg);
+ WorkResponseCode = WorkResponse.StatusCode;
+
+ if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0)
+ {
+ ZEN_WARN("remote {} returned FailedDependency for action {} - re-registering worker and retrying",
+ m_Http.GetBaseUri(),
+ ActionId);
- Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ (void)RegisterWorker(Action->Worker.Descriptor);
}
else
{
- // No such attachment
-
- return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)};
+ break;
}
}
+ }
+ else
+ {
+ for (int Attempt = 0; Attempt < 2; ++Attempt)
+ {
+ WorkResponse = m_Http.Post(SubmitUrl, ActionObj);
+ WorkResponseCode = WorkResponse.StatusCode;
- // Post resulting package
+ if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0)
+ {
+ ZEN_WARN("remote {} returned FailedDependency for action {} - re-registering worker and retrying",
+ m_Http.GetBaseUri(),
+ ActionId);
- HttpClient::Response PayloadResponse = m_Http.Post(SubmitUrl, Pkg);
+ (void)RegisterWorker(Action->Worker.Descriptor);
+ }
+ else
+ {
+ break;
+ }
+ }
- if (!PayloadResponse)
+ if (WorkResponseCode == HttpResponseCode::NotFound)
{
- ZEN_WARN("unable to register payloads for action {} at {}{}", ActionId, m_Http.GetBaseUri(), SubmitUrl);
+ // Remote needs attachments - resolve them and retry with a CbPackage
- // TODO: include more information about the failure in the response
+ CbPackage Pkg;
+ Pkg.SetObject(ActionObj);
- return {.IsAccepted = false, .Reason = "HTTP request failed"};
- }
- else if (PayloadResponse.StatusCode == HttpResponseCode::OK)
- {
- Result = PayloadResponse.AsObject();
- }
- else
- {
- // Unexpected response
-
- const int ResponseStatusCode = (int)PayloadResponse.StatusCode;
-
- ZEN_WARN("unable to register payloads for action {} at {}{} (error: {} {})",
- ActionId,
- m_Http.GetBaseUri(),
- SubmitUrl,
- ResponseStatusCode,
- ToString(ResponseStatusCode));
-
- return {.IsAccepted = false,
- .Reason = fmt::format("unexpected response code {} {} from {}{}",
- ResponseStatusCode,
- ToString(ResponseStatusCode),
- m_Http.GetBaseUri(),
- SubmitUrl)};
+ CbObject Response = WorkResponse.AsObject();
+
+ for (auto& Item : Response["need"sv])
+ {
+ const IoHash NeedHash = Item.AsHash();
+
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ {
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
+
+ ZEN_ASSERT(DataRawHash == NeedHash);
+
+ Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
+ }
+ else
+ {
+ return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)};
+ }
+ }
+
+ HttpClient::Response PayloadResponse = m_Http.Post(SubmitUrl, Pkg);
+
+ if (!PayloadResponse)
+ {
+ ZEN_WARN("unable to register payloads for action {} at {}{}", ActionId, m_Http.GetBaseUri(), SubmitUrl);
+ return {.IsAccepted = false, .Reason = "HTTP request failed"};
+ }
+
+ WorkResponse = std::move(PayloadResponse);
+ WorkResponseCode = WorkResponse.StatusCode;
}
}
+ if (WorkResponseCode == HttpResponseCode::OK)
+ {
+ Result = WorkResponse.AsObject();
+ }
+ else if (!WorkResponse)
+ {
+ ZEN_WARN("submit of action {} to {}{} failed", ActionId, m_Http.GetBaseUri(), SubmitUrl);
+ return {.IsAccepted = false, .Reason = "HTTP request failed"};
+ }
+ else if (!IsHttpSuccessCode(WorkResponseCode))
+ {
+ const int Code = static_cast<int>(WorkResponseCode);
+ ZEN_WARN("submit of action {} to {}{} returned {} {}", ActionId, m_Http.GetBaseUri(), SubmitUrl, Code, ToString(Code));
+ return {.IsAccepted = false,
+ .Reason = fmt::format("unexpected response code {} {} from {}{}", Code, ToString(Code), m_Http.GetBaseUri(), SubmitUrl)};
+ }
+
if (Result)
{
if (const int32_t LsnField = Result["lsn"].AsInt32(0))
@@ -512,83 +562,111 @@ RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vec
CbObjectWriter Body;
Body.BeginArray("actions"sv);
+ std::unordered_set<IoHash, IoHash::Hasher> AttachmentsSeen;
+
for (const Ref<RunnerAction>& Action : Actions)
{
Action->ExecutionLocation = m_HostName;
MaybeDumpAction(Action->ActionLsn, Action->ActionObj);
Body.AddObject(Action->ActionObj);
+
+ if (m_EagerAttach)
+ {
+ Action->ActionObj.IterateAttachments([&](CbFieldView Field) { AttachmentsSeen.insert(Field.AsHash()); });
+ }
}
Body.EndArray();
- // POST the batch
-
- HttpClient::Response Response = m_Http.Post(SubmitUrl, Body.Save());
-
- if (Response.StatusCode == HttpResponseCode::OK)
- {
- return ParseBatchResponse(Response, Actions);
- }
+ // In eager-attach mode, build a CbPackage with all referenced attachments
+ // so the remote can accept in a single round-trip. Otherwise POST a bare
+ // CbObject and handle the 404 need-list flow.
- if (Response.StatusCode == HttpResponseCode::NotFound)
+ if (m_EagerAttach)
{
- // Server needs attachments — resolve them and retry with a CbPackage
-
- CbObject NeedObj = Response.AsObject();
-
CbPackage Pkg;
Pkg.SetObject(Body.Save());
- for (auto& Item : NeedObj["need"sv])
+ for (const IoHash& AttachHash : AttachmentsSeen)
{
- const IoHash NeedHash = Item.AsHash();
-
- if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(AttachHash))
{
uint64_t DataRawSize = 0;
IoHash DataRawHash;
CompressedBuffer Compressed =
CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
- ZEN_ASSERT(DataRawHash == NeedHash);
-
- Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
- }
- else
- {
- ZEN_WARN("batch submit: missing attachment {} — falling back to individual submit", NeedHash);
- return FallbackToIndividualSubmit(Actions);
+ Pkg.AddAttachment(CbAttachment(Compressed, AttachHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
}
}
- HttpClient::Response RetryResponse = m_Http.Post(SubmitUrl, Pkg);
+ HttpClient::Response Response = m_Http.Post(SubmitUrl, Pkg);
- if (RetryResponse.StatusCode == HttpResponseCode::OK)
+ if (Response.StatusCode == HttpResponseCode::OK)
{
- return ParseBatchResponse(RetryResponse, Actions);
+ return ParseBatchResponse(Response, Actions);
}
-
- ZEN_WARN("batch submit retry failed with {} {} — falling back to individual submit",
- (int)RetryResponse.StatusCode,
- ToString(RetryResponse.StatusCode));
- return FallbackToIndividualSubmit(Actions);
- }
-
- // Unexpected status or connection error — fall back to individual submission
-
- if (Response)
- {
- ZEN_WARN("batch submit to {}{} returned {} {} — falling back to individual submit",
- m_Http.GetBaseUri(),
- SubmitUrl,
- (int)Response.StatusCode,
- ToString(Response.StatusCode));
}
else
{
- ZEN_WARN("batch submit to {}{} failed — falling back to individual submit", m_Http.GetBaseUri(), SubmitUrl);
+ HttpClient::Response Response = m_Http.Post(SubmitUrl, Body.Save());
+
+ if (Response.StatusCode == HttpResponseCode::OK)
+ {
+ return ParseBatchResponse(Response, Actions);
+ }
+
+ if (Response.StatusCode == HttpResponseCode::NotFound)
+ {
+ CbObject NeedObj = Response.AsObject();
+
+ CbPackage Pkg;
+ Pkg.SetObject(Body.Save());
+
+ for (auto& Item : NeedObj["need"sv])
+ {
+ const IoHash NeedHash = Item.AsHash();
+
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ {
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
+
+ ZEN_ASSERT(DataRawHash == NeedHash);
+
+ Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
+ }
+ else
+ {
+ ZEN_WARN("batch submit: missing attachment {} - falling back to individual submit", NeedHash);
+ return FallbackToIndividualSubmit(Actions);
+ }
+ }
+
+ HttpClient::Response RetryResponse = m_Http.Post(SubmitUrl, Pkg);
+
+ if (RetryResponse.StatusCode == HttpResponseCode::OK)
+ {
+ return ParseBatchResponse(RetryResponse, Actions);
+ }
+
+ ZEN_WARN("batch submit retry failed with {} {} - falling back to individual submit",
+ (int)RetryResponse.StatusCode,
+ ToString(RetryResponse.StatusCode));
+ return FallbackToIndividualSubmit(Actions);
+ }
}
+ // Unexpected status or connection error - fall back to individual submission
+
+ ZEN_WARN("batch submit to {}{} failed - falling back to individual submit", m_Http.GetBaseUri(), SubmitUrl);
+
return FallbackToIndividualSubmit(Actions);
}
@@ -849,7 +927,7 @@ RemoteHttpRunner::MonitorThreadFunction()
SweepOnce();
}
- // Signal received — may be a WS wakeup or a quit signal
+ // Signal received - may be a WS wakeup or a quit signal
SweepOnce();
} while (m_MonitorThreadEnabled);
@@ -869,9 +947,10 @@ RemoteHttpRunner::SweepRunningActions()
{
for (auto& FieldIt : Completed["completed"sv])
{
- CbObjectView EntryObj = FieldIt.AsObjectView();
- const int32_t CompleteLsn = EntryObj["lsn"sv].AsInt32();
- std::string_view StateName = EntryObj["state"sv].AsString();
+ CbObjectView EntryObj = FieldIt.AsObjectView();
+ const int32_t CompleteLsn = EntryObj["lsn"sv].AsInt32();
+ std::string_view StateName = EntryObj["state"sv].AsString();
+ std::string_view FailureReason = EntryObj["reason"sv].AsString();
RunnerAction::State RemoteState = RunnerAction::FromString(StateName);
@@ -884,6 +963,7 @@ RemoteHttpRunner::SweepRunningActions()
{
HttpRunningAction CompletedAction = std::move(CompleteIt->second);
CompletedAction.RemoteState = RemoteState;
+ CompletedAction.FailureReason = std::string(FailureReason);
if (RemoteState == RunnerAction::State::Completed && ResponseJob)
{
@@ -927,16 +1007,44 @@ RemoteHttpRunner::SweepRunningActions()
{
const int ActionLsn = HttpAction.Action->ActionLsn;
- ZEN_DEBUG("action {} LSN {} (remote LSN {}) -> {}",
- HttpAction.Action->ActionId,
- ActionLsn,
- HttpAction.RemoteActionLsn,
- RunnerAction::ToString(HttpAction.RemoteState));
-
if (HttpAction.RemoteState == RunnerAction::State::Completed)
{
+ ZEN_DEBUG("action {} LSN {} (remote LSN {}) completed on {}",
+ HttpAction.Action->ActionId,
+ ActionLsn,
+ HttpAction.RemoteActionLsn,
+ m_HostName);
HttpAction.Action->SetResult(std::move(HttpAction.ActionResults));
}
+ else if (HttpAction.RemoteState == RunnerAction::State::Failed || HttpAction.RemoteState == RunnerAction::State::Abandoned)
+ {
+ HttpAction.Action->FailureReason = HttpAction.FailureReason;
+ if (HttpAction.FailureReason.empty())
+ {
+ ZEN_WARN("action {} ({}) {} on remote {}",
+ HttpAction.Action->ActionId,
+ ActionLsn,
+ RunnerAction::ToString(HttpAction.RemoteState),
+ m_HostName);
+ }
+ else
+ {
+ ZEN_WARN("action {} ({}) {} on remote {}: {}",
+ HttpAction.Action->ActionId,
+ ActionLsn,
+ RunnerAction::ToString(HttpAction.RemoteState),
+ m_HostName,
+ HttpAction.FailureReason);
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("action {} LSN {} (remote LSN {}) -> {}",
+ HttpAction.Action->ActionId,
+ ActionLsn,
+ HttpAction.RemoteActionLsn,
+ RunnerAction::ToString(HttpAction.RemoteState));
+ }
HttpAction.Action->SetActionState(HttpAction.RemoteState);
}
diff --git a/src/zencompute/runners/remotehttprunner.h b/src/zencompute/runners/remotehttprunner.h
index c17d0cf2a..521bf2f82 100644
--- a/src/zencompute/runners/remotehttprunner.h
+++ b/src/zencompute/runners/remotehttprunner.h
@@ -54,8 +54,10 @@ public:
[[nodiscard]] virtual size_t QueryCapacity() override;
[[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;
virtual void CancelRemoteQueue(int QueueId) override;
+ [[nodiscard]] virtual std::string_view GetDisplayName() const override { return m_DisplayName; }
std::string_view GetHostName() const { return m_HostName; }
+ void SetRemoteHostname(std::string_view Hostname);
protected:
LoggerRef Log() { return m_Log; }
@@ -65,12 +67,15 @@ private:
ChunkResolver& m_ChunkResolver;
WorkerThreadPool& m_WorkerPool;
std::string m_HostName;
+ std::string m_DisplayName;
std::string m_BaseUrl;
HttpClient m_Http;
- std::atomic<bool> m_AcceptNewActions{true};
- int32_t m_MaxRunningActions = 256; // arbitrary limit for testing
- int32_t m_MaxBatchSize = 50;
+ std::atomic<bool> m_AcceptNewActions{true};
+ int32_t m_MaxRunningActions = 256; // arbitrary limit for testing
+ int32_t m_MaxBatchSize = 50;
+ bool m_EagerAttach = true; ///< Send attachments with every submit instead of the two-step 404 retry
+ std::atomic<size_t> m_InFlightSubmissions{0}; // actions currently being submitted over HTTP
struct HttpRunningAction
{
@@ -78,6 +83,7 @@ private:
int RemoteActionLsn = 0; // Remote LSN
RunnerAction::State RemoteState = RunnerAction::State::Failed;
CbPackage ActionResults;
+ std::string FailureReason;
};
RwLock m_RunningLock;
@@ -90,7 +96,7 @@ private:
size_t SweepRunningActions();
RwLock m_QueueTokenLock;
- std::unordered_map<int, Oid> m_RemoteQueueTokens; // local QueueId → remote queue token
+ std::unordered_map<int, Oid> m_RemoteQueueTokens; // local QueueId -> remote queue token
// Stable identity for this runner instance, used as part of the idempotency key when
// creating remote queues. Generated once at construction and never changes.
diff --git a/src/zencompute/runners/windowsrunner.cpp b/src/zencompute/runners/windowsrunner.cpp
index e9a1ae8b6..c6b3e82ea 100644
--- a/src/zencompute/runners/windowsrunner.cpp
+++ b/src/zencompute/runners/windowsrunner.cpp
@@ -21,6 +21,12 @@ ZEN_THIRD_PARTY_INCLUDES_START
# include <sddl.h>
ZEN_THIRD_PARTY_INCLUDES_END
+// JOB_OBJECT_UILIMIT_ERRORMODE is defined in winuser.h which may be
+// excluded by WIN32_LEAN_AND_MEAN.
+# if !defined(JOB_OBJECT_UILIMIT_ERRORMODE)
+# define JOB_OBJECT_UILIMIT_ERRORMODE 0x00000400
+# endif
+
namespace zen::compute {
using namespace std::literals;
@@ -34,38 +40,67 @@ WindowsProcessRunner::WindowsProcessRunner(ChunkResolver& Resolver,
: LocalProcessRunner(Resolver, BaseDir, Deleter, WorkerPool, MaxConcurrentActions)
, m_Sandboxed(Sandboxed)
{
- if (!m_Sandboxed)
+ // Create a job object shared by all child processes. Restricting the
+ // error-mode UI prevents crash dialogs (WER / Dr. Watson) from
+ // blocking the monitor thread when a worker process terminates
+ // abnormally.
+ m_JobObject = CreateJobObjectW(nullptr, nullptr);
+ if (m_JobObject)
{
- return;
+ JOBOBJECT_EXTENDED_LIMIT_INFORMATION ExtLimits{};
+ ExtLimits.BasicLimitInformation.LimitFlags =
+ JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE | JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION | JOB_OBJECT_LIMIT_PRIORITY_CLASS;
+ ExtLimits.BasicLimitInformation.PriorityClass = BELOW_NORMAL_PRIORITY_CLASS;
+ SetInformationJobObject(m_JobObject, JobObjectExtendedLimitInformation, &ExtLimits, sizeof(ExtLimits));
+
+ JOBOBJECT_BASIC_UI_RESTRICTIONS UiRestrictions{};
+ UiRestrictions.UIRestrictionsClass = JOB_OBJECT_UILIMIT_ERRORMODE;
+ SetInformationJobObject(m_JobObject, JobObjectBasicUIRestrictions, &UiRestrictions, sizeof(UiRestrictions));
+
+ // Set error mode on this process so children inherit it. The
+ // UILIMIT_ERRORMODE restriction above prevents them from clearing
+ // SEM_NOGPFAULTERRORBOX.
+ SetErrorMode(SEM_FAILCRITICALERRORS | SEM_NOGPFAULTERRORBOX);
}
- // Build a unique profile name per process to avoid collisions
- m_AppContainerName = L"zenserver-sandbox-" + std::to_wstring(GetCurrentProcessId());
+ if (m_Sandboxed)
+ {
+ // Build a unique profile name per process to avoid collisions
+ m_AppContainerName = L"zenserver-sandbox-" + std::to_wstring(GetCurrentProcessId());
- // Clean up any stale profile from a previous crash
- DeleteAppContainerProfile(m_AppContainerName.c_str());
+ // Clean up any stale profile from a previous crash
+ DeleteAppContainerProfile(m_AppContainerName.c_str());
- PSID Sid = nullptr;
+ PSID Sid = nullptr;
- HRESULT Hr = CreateAppContainerProfile(m_AppContainerName.c_str(),
- m_AppContainerName.c_str(), // display name
- m_AppContainerName.c_str(), // description
- nullptr, // no capabilities
- 0, // capability count
- &Sid);
+ HRESULT Hr = CreateAppContainerProfile(m_AppContainerName.c_str(),
+ m_AppContainerName.c_str(), // display name
+ m_AppContainerName.c_str(), // description
+ nullptr, // no capabilities
+ 0, // capability count
+ &Sid);
- if (FAILED(Hr))
- {
- throw zen::runtime_error("CreateAppContainerProfile failed: HRESULT 0x{:08X}", static_cast<uint32_t>(Hr));
- }
+ if (FAILED(Hr))
+ {
+ throw zen::runtime_error("CreateAppContainerProfile failed: HRESULT 0x{:08X}", static_cast<uint32_t>(Hr));
+ }
- m_AppContainerSid = Sid;
+ m_AppContainerSid = Sid;
+
+ ZEN_INFO("AppContainer sandboxing enabled for child processes (profile={})", WideToUtf8(m_AppContainerName));
+ }
- ZEN_INFO("AppContainer sandboxing enabled for child processes (profile={})", WideToUtf8(m_AppContainerName));
+ StartMonitorThread();
}
WindowsProcessRunner::~WindowsProcessRunner()
{
+ if (m_JobObject)
+ {
+ CloseHandle(m_JobObject);
+ m_JobObject = nullptr;
+ }
+
if (m_AppContainerSid)
{
FreeSid(m_AppContainerSid);
@@ -172,9 +207,9 @@ WindowsProcessRunner::SubmitAction(Ref<RunnerAction> Action)
LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr;
LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr;
BOOL bInheritHandles = FALSE;
- DWORD dwCreationFlags = 0;
+ DWORD dwCreationFlags = CREATE_SUSPENDED | DETACHED_PROCESS;
- ZEN_DEBUG("Executing: {} (sandboxed={})", WideToUtf8(CommandLine.c_str()), m_Sandboxed);
+ ZEN_DEBUG("{}: '{}' (sandbox='{}')", m_Sandboxed ? "Sandboxing" : "Executing", WideToUtf8(CommandLine.c_str()), Prepared->SandboxPath);
CommandLine.EnsureNulTerminated();
@@ -260,14 +295,21 @@ WindowsProcessRunner::SubmitAction(Ref<RunnerAction> Action)
}
}
- CloseHandle(ProcessInformation.hThread);
+ if (m_JobObject)
+ {
+ AssignProcessToJobObject(m_JobObject, ProcessInformation.hProcess);
+ }
- Ref<RunningAction> NewAction{new RunningAction()};
- NewAction->Action = Action;
- NewAction->ProcessHandle = ProcessInformation.hProcess;
- NewAction->SandboxPath = std::move(Prepared->SandboxPath);
+ ResumeThread(ProcessInformation.hThread);
+ CloseHandle(ProcessInformation.hThread);
{
+ Ref<RunningAction> NewAction{new RunningAction()};
+ NewAction->Action = Action;
+ NewAction->ProcessHandle = ProcessInformation.hProcess;
+ NewAction->Pid = ProcessInformation.dwProcessId;
+ NewAction->SandboxPath = std::move(Prepared->SandboxPath);
+
RwLock::ExclusiveLockScope _(m_RunningLock);
m_RunningMap[Prepared->ActionLsn] = std::move(NewAction);
@@ -275,6 +317,8 @@ WindowsProcessRunner::SubmitAction(Ref<RunnerAction> Action)
Action->SetActionState(RunnerAction::State::Running);
+ ZEN_DEBUG("Local runner: action LSN {} -> PID {}", Action->ActionLsn, ProcessInformation.dwProcessId);
+
return SubmitResult{.IsAccepted = true};
}
@@ -294,6 +338,11 @@ WindowsProcessRunner::SweepRunningActions()
if (IsSuccess && ExitCode != STILL_ACTIVE)
{
+ ZEN_DEBUG("Local runner: action LSN {} + PID {} exited with code " ZEN_BRIGHT_WHITE("{}"),
+ Running->Action->ActionLsn,
+ Running->Pid,
+ ExitCode);
+
CloseHandle(Running->ProcessHandle);
Running->ProcessHandle = INVALID_HANDLE_VALUE;
Running->ExitCode = ExitCode;
@@ -436,7 +485,7 @@ WindowsProcessRunner::SampleProcessCpu(RunningAction& Running)
const uint64_t CurrentOsTicks = FtToU64(KernelTime) + FtToU64(UserTime);
const uint64_t NowTicks = GetHifreqTimerValue();
- // Cumulative CPU seconds (absolute, available from first sample): 100ns → seconds
+ // Cumulative CPU seconds (absolute, available from first sample): 100ns -> seconds
Running.Action->CpuSeconds.store(static_cast<float>(static_cast<double>(CurrentOsTicks) / 10'000'000.0), std::memory_order_relaxed);
if (Running.LastCpuSampleTicks != 0 && Running.LastCpuOsTicks != 0)
@@ -445,7 +494,7 @@ WindowsProcessRunner::SampleProcessCpu(RunningAction& Running)
if (ElapsedMs > 0)
{
const uint64_t DeltaOsTicks = CurrentOsTicks - Running.LastCpuOsTicks;
- // 100ns → ms: divide by 10000; then as percent of elapsed ms
+ // 100ns -> ms: divide by 10000; then as percent of elapsed ms
const float CpuPct = static_cast<float>(static_cast<double>(DeltaOsTicks) / 10000.0 / ElapsedMs * 100.0);
Running.Action->CpuUsagePercent.store(CpuPct, std::memory_order_relaxed);
}
diff --git a/src/zencompute/runners/windowsrunner.h b/src/zencompute/runners/windowsrunner.h
index 9f2385cc4..adeaf02fc 100644
--- a/src/zencompute/runners/windowsrunner.h
+++ b/src/zencompute/runners/windowsrunner.h
@@ -46,6 +46,7 @@ private:
bool m_Sandboxed = false;
PSID m_AppContainerSid = nullptr;
std::wstring m_AppContainerName;
+ HANDLE m_JobObject = nullptr;
};
} // namespace zen::compute
diff --git a/src/zencompute/runners/winerunner.cpp b/src/zencompute/runners/winerunner.cpp
index 506bec73b..29ab93663 100644
--- a/src/zencompute/runners/winerunner.cpp
+++ b/src/zencompute/runners/winerunner.cpp
@@ -36,6 +36,8 @@ WineProcessRunner::WineProcessRunner(ChunkResolver& Resolver,
sigemptyset(&Action.sa_mask);
Action.sa_handler = SIG_DFL;
sigaction(SIGCHLD, &Action, nullptr);
+
+ StartMonitorThread();
}
SubmitResult
@@ -94,7 +96,9 @@ WineProcessRunner::SubmitAction(Ref<RunnerAction> Action)
if (ChildPid == 0)
{
- // Child process
+ // Child process - lower priority so workers don't starve the main server
+ nice(5);
+
if (chdir(SandboxPathStr.c_str()) != 0)
{
_exit(127);