aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/runners
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/runners')
-rw-r--r--src/zencompute/runners/functionrunner.cpp120
-rw-r--r--src/zencompute/runners/functionrunner.h27
-rw-r--r--src/zencompute/runners/linuxrunner.cpp20
-rw-r--r--src/zencompute/runners/localrunner.cpp23
-rw-r--r--src/zencompute/runners/macrunner.cpp16
-rw-r--r--src/zencompute/runners/managedrunner.cpp2
-rw-r--r--src/zencompute/runners/remotehttprunner.cpp366
-rw-r--r--src/zencompute/runners/remotehttprunner.h14
-rw-r--r--src/zencompute/runners/windowsrunner.cpp8
-rw-r--r--src/zencompute/runners/winerunner.cpp4
10 files changed, 420 insertions, 180 deletions
diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp
index 67e12b84e..34bf065b4 100644
--- a/src/zencompute/runners/functionrunner.cpp
+++ b/src/zencompute/runners/functionrunner.cpp
@@ -6,9 +6,15 @@
# include <zencore/compactbinary.h>
# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/logging.h>
+# include <zencore/string.h>
+# include <zencore/timer.h>
# include <zencore/trace.h>
+# include <zencore/workthreadpool.h>
# include <fmt/format.h>
+# include <future>
# include <vector>
namespace zen::compute {
@@ -118,23 +124,34 @@ std::vector<SubmitResult>
BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
{
ZEN_TRACE_CPU("BaseRunnerGroup::SubmitActions");
- RwLock::SharedLockScope _(m_RunnersLock);
- const int RunnerCount = gsl::narrow<int>(m_Runners.size());
+ // Snapshot runners and query capacity under the lock, then release
+ // before submitting - HTTP submissions to remote runners can take
+ // hundreds of milliseconds and we must not hold m_RunnersLock during I/O.
- if (RunnerCount == 0)
- {
- return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"});
- }
+ std::vector<Ref<FunctionRunner>> Runners;
+ std::vector<size_t> Capacities;
+ std::vector<std::vector<Ref<RunnerAction>>> PerRunnerActions;
+ size_t TotalCapacity = 0;
- // Query capacity per runner and compute total
- std::vector<size_t> Capacities(RunnerCount);
- size_t TotalCapacity = 0;
+ m_RunnersLock.WithSharedLock([&] {
+ const int RunnerCount = gsl::narrow<int>(m_Runners.size());
+ Runners.assign(m_Runners.begin(), m_Runners.end());
+ Capacities.resize(RunnerCount);
+ PerRunnerActions.resize(RunnerCount);
- for (int i = 0; i < RunnerCount; ++i)
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ Capacities[i] = Runners[i]->QueryCapacity();
+ TotalCapacity += Capacities[i];
+ }
+ });
+
+ const int RunnerCount = gsl::narrow<int>(Runners.size());
+
+ if (RunnerCount == 0)
{
- Capacities[i] = m_Runners[i]->QueryCapacity();
- TotalCapacity += Capacities[i];
+ return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"});
}
if (TotalCapacity == 0)
@@ -143,9 +160,8 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
}
// Distribute actions across runners proportionally to their available capacity
- std::vector<std::vector<Ref<RunnerAction>>> PerRunnerActions(RunnerCount);
- std::vector<size_t> ActionRunnerIndex(Actions.size());
- size_t ActionIdx = 0;
+ std::vector<size_t> ActionRunnerIndex(Actions.size());
+ size_t ActionIdx = 0;
for (int i = 0; i < RunnerCount; ++i)
{
@@ -176,14 +192,74 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
}
}
- // Submit batches per runner
+ // Submit batches per runner - in parallel when a worker pool is available
+
std::vector<std::vector<SubmitResult>> PerRunnerResults(RunnerCount);
+ int ActiveRunnerCount = 0;
for (int i = 0; i < RunnerCount; ++i)
{
if (!PerRunnerActions[i].empty())
{
- PerRunnerResults[i] = m_Runners[i]->SubmitActions(PerRunnerActions[i]);
+ ++ActiveRunnerCount;
+ }
+ }
+
+ static constexpr uint64_t SubmitWarnThresholdMs = 500;
+
+ auto SubmitToRunner = [&](int RunnerIndex) {
+ auto& Runner = Runners[RunnerIndex];
+ Runner->m_LastSubmitStats.Reset();
+
+ Stopwatch Timer;
+
+ PerRunnerResults[RunnerIndex] = Runner->SubmitActions(PerRunnerActions[RunnerIndex]);
+
+ uint64_t ElapsedMs = Timer.GetElapsedTimeMs();
+ if (ElapsedMs >= SubmitWarnThresholdMs)
+ {
+ size_t Attachments = Runner->m_LastSubmitStats.TotalAttachments.load(std::memory_order_relaxed);
+ uint64_t AttachmentBytes = Runner->m_LastSubmitStats.TotalAttachmentBytes.load(std::memory_order_relaxed);
+
+ ZEN_WARN("submit of {} actions ({} attachments, {}) to '{}' took {}ms",
+ PerRunnerActions[RunnerIndex].size(),
+ Attachments,
+ NiceBytes(AttachmentBytes),
+ Runner->GetDisplayName(),
+ ElapsedMs);
+ }
+ };
+
+ if (m_WorkerPool && ActiveRunnerCount > 1)
+ {
+ std::vector<std::future<void>> Futures(RunnerCount);
+
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (!PerRunnerActions[i].empty())
+ {
+ std::packaged_task<void()> Task([&SubmitToRunner, i]() { SubmitToRunner(i); });
+
+ Futures[i] = m_WorkerPool->EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog);
+ }
+ }
+
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (Futures[i].valid())
+ {
+ Futures[i].get();
+ }
+ }
+ }
+ else
+ {
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (!PerRunnerActions[i].empty())
+ {
+ SubmitToRunner(i);
+ }
}
}
@@ -309,10 +385,11 @@ RunnerAction::RetractAction()
bool
RunnerAction::ResetActionStateToPending()
{
- // Only allow reset from Failed, Abandoned, or Retracted states
+ // Only allow reset from Failed, Abandoned, Rejected, or Retracted states
State CurrentState = m_ActionState.load();
- if (CurrentState != State::Failed && CurrentState != State::Abandoned && CurrentState != State::Retracted)
+ if (CurrentState != State::Failed && CurrentState != State::Abandoned && CurrentState != State::Rejected &&
+ CurrentState != State::Retracted)
{
return false;
}
@@ -333,11 +410,12 @@ RunnerAction::ResetActionStateToPending()
// Clear execution fields
ExecutionLocation.clear();
+ FailureReason.clear();
CpuUsagePercent.store(-1.0f, std::memory_order_relaxed);
CpuSeconds.store(0.0f, std::memory_order_relaxed);
- // Increment retry count (skip for Retracted — nothing failed)
- if (CurrentState != State::Retracted)
+ // Increment retry count (skip for Retracted/Rejected - nothing failed)
+ if (CurrentState != State::Retracted && CurrentState != State::Rejected)
{
RetryCount.fetch_add(1, std::memory_order_relaxed);
}
diff --git a/src/zencompute/runners/functionrunner.h b/src/zencompute/runners/functionrunner.h
index 56c3f3af0..371a60b7a 100644
--- a/src/zencompute/runners/functionrunner.h
+++ b/src/zencompute/runners/functionrunner.h
@@ -10,6 +10,10 @@
# include <filesystem>
# include <vector>
+namespace zen {
+class WorkerThreadPool;
+}
+
namespace zen::compute {
struct SubmitResult
@@ -37,6 +41,22 @@ public:
[[nodiscard]] virtual bool IsHealthy() = 0;
[[nodiscard]] virtual size_t QueryCapacity();
[[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
+ [[nodiscard]] virtual std::string_view GetDisplayName() const { return "local"; }
+
+ // Accumulated stats from the most recent SubmitActions call.
+ // Reset before each call, populated by the runner implementation.
+ struct SubmitStats
+ {
+ std::atomic<size_t> TotalAttachments{0};
+ std::atomic<uint64_t> TotalAttachmentBytes{0};
+
+ void Reset()
+ {
+ TotalAttachments.store(0, std::memory_order_relaxed);
+ TotalAttachmentBytes.store(0, std::memory_order_relaxed);
+ }
+ };
+ SubmitStats m_LastSubmitStats;
// Best-effort cancellation of a specific in-flight action. Returns true if the
// cancellation signal was successfully sent. The action will transition to Cancelled
@@ -68,6 +88,8 @@ public:
bool CancelAction(int ActionLsn);
void CancelRemoteQueue(int QueueId);
+ void SetWorkerPool(WorkerThreadPool* Pool) { m_WorkerPool = Pool; }
+
size_t GetRunnerCount()
{
return m_RunnersLock.WithSharedLock([this] { return m_Runners.size(); });
@@ -79,6 +101,7 @@ protected:
RwLock m_RunnersLock;
std::vector<Ref<FunctionRunner>> m_Runners;
std::atomic<int> m_NextSubmitIndex{0};
+ WorkerThreadPool* m_WorkerPool = nullptr;
};
/** Typed RunnerGroup that adds type-safe runner addition and predicate-based removal.
@@ -151,6 +174,7 @@ struct RunnerAction : public RefCounted
CbObject ActionObj;
int Priority = 0;
std::string ExecutionLocation; // "local" or remote hostname
+ std::string FailureReason; // human-readable reason when action fails (empty on success)
// CPU usage and total CPU time of the running process, sampled periodically by the local runner.
// CpuUsagePercent: -1.0 means not yet sampled; >=0.0 is the most recent reading as a percentage.
@@ -168,6 +192,7 @@ struct RunnerAction : public RefCounted
Completed, // Finished successfully with results available
Failed, // Execution failed (transient error, eligible for retry)
Abandoned, // Infrastructure termination (e.g. spot eviction, session abandon)
+ Rejected, // Runner declined (e.g. at capacity) - rescheduled without retry cost
Cancelled, // Intentional user cancellation (never retried)
Retracted, // Pulled back for rescheduling on a different runner (no retry cost)
_Count
@@ -194,6 +219,8 @@ struct RunnerAction : public RefCounted
return "Failed";
case State::Abandoned:
return "Abandoned";
+ case State::Rejected:
+ return "Rejected";
case State::Cancelled:
return "Cancelled";
case State::Retracted:
diff --git a/src/zencompute/runners/linuxrunner.cpp b/src/zencompute/runners/linuxrunner.cpp
index 9055005d9..be4274823 100644
--- a/src/zencompute/runners/linuxrunner.cpp
+++ b/src/zencompute/runners/linuxrunner.cpp
@@ -195,7 +195,7 @@ namespace {
WriteErrorAndExit(ErrorPipeFd, "bind mount /lib failed", errno);
}
- // /lib64 (optional — not all distros have it)
+ // /lib64 (optional - not all distros have it)
{
struct stat St;
if (stat("/lib64", &St) == 0 && S_ISDIR(St.st_mode))
@@ -208,7 +208,7 @@ namespace {
}
}
- // /etc (required — for resolv.conf, ld.so.cache, etc.)
+ // /etc (required - for resolv.conf, ld.so.cache, etc.)
if (MkdirIfNeeded(BuildPath("etc"), 0755) != 0)
{
WriteErrorAndExit(ErrorPipeFd, "mkdir sandbox/etc failed", errno);
@@ -218,7 +218,7 @@ namespace {
WriteErrorAndExit(ErrorPipeFd, "bind mount /etc failed", errno);
}
- // /worker — bind-mount worker directory (contains the executable)
+ // /worker - bind-mount worker directory (contains the executable)
if (MkdirIfNeeded(BuildPath("worker"), 0755) != 0)
{
WriteErrorAndExit(ErrorPipeFd, "mkdir sandbox/worker failed", errno);
@@ -430,11 +430,12 @@ LinuxProcessRunner::SubmitAction(Ref<RunnerAction> Action)
if (ChildPid == 0)
{
- // Child process
+ // Child process - lower priority so workers don't starve the main server
+ nice(5);
if (m_Sandboxed)
{
- // Close read end of error pipe — child only writes
+ // Close read end of error pipe - child only writes
close(ErrorPipe[0]);
SetupNamespaceSandbox(SandboxPathStr.c_str(), CurrentUid, CurrentGid, WorkerPathStr.c_str(), ErrorPipe[1]);
@@ -461,7 +462,7 @@ LinuxProcessRunner::SubmitAction(Ref<RunnerAction> Action)
if (m_Sandboxed)
{
- // Close write end of error pipe — parent only reads
+ // Close write end of error pipe - parent only reads
close(ErrorPipe[1]);
// Read from error pipe. If execve succeeded, pipe was closed by O_CLOEXEC
@@ -481,7 +482,8 @@ LinuxProcessRunner::SubmitAction(Ref<RunnerAction> Action)
// Clean up the sandbox in the background
m_DeferredDeleter.Enqueue(Action->ActionLsn, std::move(Prepared->SandboxPath));
- ZEN_ERROR("Sandbox setup failed for action {}: {}", Action->ActionLsn, ErrBuf);
+ Action->FailureReason = fmt::format("sandbox setup failed: {}", ErrBuf);
+ ZEN_ERROR("action {} ({}): {}", Action->ActionId, Action->ActionLsn, Action->FailureReason);
Action->SetActionState(RunnerAction::State::Failed);
return SubmitResult{.IsAccepted = false};
@@ -677,7 +679,7 @@ ReadProcStatCpuTicks(pid_t Pid)
Buf[Len] = '\0';
- // Skip past "pid (name) " — find last ')' to handle names containing spaces or parens
+ // Skip past "pid (name) " - find last ')' to handle names containing spaces or parens
const char* P = strrchr(Buf, ')');
if (!P)
{
@@ -707,7 +709,7 @@ LinuxProcessRunner::SampleProcessCpu(RunningAction& Running)
if (CurrentOsTicks == 0)
{
- // Process gone or /proc entry unreadable — record timestamp without updating usage
+ // Process gone or /proc entry unreadable - record timestamp without updating usage
Running.LastCpuSampleTicks = NowTicks;
Running.LastCpuOsTicks = 0;
return;
diff --git a/src/zencompute/runners/localrunner.cpp b/src/zencompute/runners/localrunner.cpp
index 1b748c0e5..259965e23 100644
--- a/src/zencompute/runners/localrunner.cpp
+++ b/src/zencompute/runners/localrunner.cpp
@@ -337,7 +337,7 @@ LocalProcessRunner::PrepareActionSubmission(Ref<RunnerAction> Action)
SubmitResult
LocalProcessRunner::SubmitAction(Ref<RunnerAction> Action)
{
- // Base class is not directly usable — platform subclasses override this
+ // Base class is not directly usable - platform subclasses override this
ZEN_UNUSED(Action);
return SubmitResult{.IsAccepted = false};
}
@@ -357,14 +357,21 @@ LocalProcessRunner::ManifestWorker(const WorkerDesc& Worker)
std::filesystem::path WorkerDir = m_WorkerPath / fmt::format("runner_{}", Worker.WorkerId);
- if (!std::filesystem::exists(WorkerDir))
+ // worker.zcb is written as the last step of ManifestWorker, so its presence
+ // indicates a complete manifest. If the directory exists but the marker is
+ // missing, a previous manifest was interrupted and we need to start over.
+ bool NeedsManifest = !std::filesystem::exists(WorkerDir / "worker.zcb");
+
+ if (NeedsManifest)
{
_.ReleaseNow();
RwLock::ExclusiveLockScope $(m_WorkerLock);
- if (!std::filesystem::exists(WorkerDir))
+ if (!std::filesystem::exists(WorkerDir / "worker.zcb"))
{
+ std::error_code Ec;
+ std::filesystem::remove_all(WorkerDir, Ec);
ManifestWorker(Worker.Descriptor, WorkerDir, [](const IoHash&, CompressedBuffer&) {});
}
}
@@ -613,7 +620,7 @@ LocalProcessRunner::MonitorThreadFunction()
void
LocalProcessRunner::CancelRunningActions()
{
- // Base class is not directly usable — platform subclasses override this
+ // Base class is not directly usable - platform subclasses override this
}
void
@@ -673,9 +680,15 @@ LocalProcessRunner::ProcessCompletedActions(std::vector<Ref<RunningAction>>& Com
}
catch (std::exception& Ex)
{
- ZEN_ERROR("Encountered failure while gathering outputs for action lsn {}, '{}'", ActionLsn, Ex.what());
+ Running->Action->FailureReason = fmt::format("exception gathering outputs: {}", Ex.what());
+ ZEN_ERROR("action {} ({}) failed: {}", Running->Action->ActionId, ActionLsn, Running->Action->FailureReason);
}
}
+ else
+ {
+ Running->Action->FailureReason = fmt::format("process exited with code {}", Running->ExitCode);
+ ZEN_WARN("action {} ({}) failed: {}", Running->Action->ActionId, ActionLsn, Running->Action->FailureReason);
+ }
// Failed - clean up the sandbox in the background.
diff --git a/src/zencompute/runners/macrunner.cpp b/src/zencompute/runners/macrunner.cpp
index c2ccca9a6..ab24d4672 100644
--- a/src/zencompute/runners/macrunner.cpp
+++ b/src/zencompute/runners/macrunner.cpp
@@ -211,18 +211,19 @@ MacProcessRunner::SubmitAction(Ref<RunnerAction> Action)
if (ChildPid == 0)
{
- // Child process
+ // Child process - lower priority so workers don't starve the main server
+ nice(5);
if (m_Sandboxed)
{
- // Close read end of error pipe — child only writes
+ // Close read end of error pipe - child only writes
close(ErrorPipe[0]);
// Apply Seatbelt sandbox profile
char* ErrorBuf = nullptr;
if (sandbox_init(SandboxProfile.c_str(), 0, &ErrorBuf) != 0)
{
- // sandbox_init failed — write error to pipe and exit
+ // sandbox_init failed - write error to pipe and exit
if (ErrorBuf)
{
WriteErrorAndExit(ErrorPipe[1], ErrorBuf, 0);
@@ -261,7 +262,7 @@ MacProcessRunner::SubmitAction(Ref<RunnerAction> Action)
if (m_Sandboxed)
{
- // Close write end of error pipe — parent only reads
+ // Close write end of error pipe - parent only reads
close(ErrorPipe[1]);
// Read from error pipe. If execve succeeded, pipe was closed by O_CLOEXEC
@@ -281,7 +282,8 @@ MacProcessRunner::SubmitAction(Ref<RunnerAction> Action)
// Clean up the sandbox in the background
m_DeferredDeleter.Enqueue(Action->ActionLsn, std::move(Prepared->SandboxPath));
- ZEN_ERROR("Sandbox setup failed for action {}: {}", Action->ActionLsn, ErrBuf);
+ Action->FailureReason = fmt::format("sandbox setup failed: {}", ErrBuf);
+ ZEN_ERROR("action {} ({}): {}", Action->ActionId, Action->ActionLsn, Action->FailureReason);
Action->SetActionState(RunnerAction::State::Failed);
return SubmitResult{.IsAccepted = false};
@@ -469,7 +471,7 @@ MacProcessRunner::SampleProcessCpu(RunningAction& Running)
const uint64_t CurrentOsTicks = Info.pti_total_user + Info.pti_total_system;
const uint64_t NowTicks = GetHifreqTimerValue();
- // Cumulative CPU seconds (absolute, available from first sample): ns → seconds
+ // Cumulative CPU seconds (absolute, available from first sample): ns -> seconds
Running.Action->CpuSeconds.store(static_cast<float>(static_cast<double>(CurrentOsTicks) / 1'000'000'000.0), std::memory_order_relaxed);
if (Running.LastCpuSampleTicks != 0 && Running.LastCpuOsTicks != 0)
@@ -478,7 +480,7 @@ MacProcessRunner::SampleProcessCpu(RunningAction& Running)
if (ElapsedMs > 0)
{
const uint64_t DeltaOsTicks = CurrentOsTicks - Running.LastCpuOsTicks;
- // ns → ms: divide by 1,000,000; then as percent of elapsed ms
+ // ns -> ms: divide by 1,000,000; then as percent of elapsed ms
const float CpuPct = static_cast<float>(static_cast<double>(DeltaOsTicks) / 1'000'000.0 / ElapsedMs * 100.0);
Running.Action->CpuUsagePercent.store(CpuPct, std::memory_order_relaxed);
}
diff --git a/src/zencompute/runners/managedrunner.cpp b/src/zencompute/runners/managedrunner.cpp
index e4a7ba388..a4f586852 100644
--- a/src/zencompute/runners/managedrunner.cpp
+++ b/src/zencompute/runners/managedrunner.cpp
@@ -128,7 +128,7 @@ ManagedProcessRunner::SubmitAction(Ref<RunnerAction> Action)
CreateProcOptions Options;
Options.WorkingDirectory = &Prepared->SandboxPath;
- Options.Flags = CreateProcOptions::Flag_NoConsole;
+ Options.Flags = CreateProcOptions::Flag_NoConsole | CreateProcOptions::Flag_BelowNormalPriority;
Options.Environment = std::move(EnvPairs);
const int32_t ActionLsn = Prepared->ActionLsn;
diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp
index ce6a81173..08f381b7f 100644
--- a/src/zencompute/runners/remotehttprunner.cpp
+++ b/src/zencompute/runners/remotehttprunner.cpp
@@ -20,6 +20,7 @@
# include <zenstore/cidstore.h>
# include <span>
+# include <unordered_set>
//////////////////////////////////////////////////////////////////////////
@@ -38,6 +39,7 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver,
, m_ChunkResolver{InChunkResolver}
, m_WorkerPool{InWorkerPool}
, m_HostName{HostName}
+, m_DisplayName{HostName}
, m_BaseUrl{fmt::format("{}/compute", HostName)}
, m_Http(m_BaseUrl)
, m_InstanceId(Oid::NewOid())
@@ -59,6 +61,15 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver,
m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this};
}
+void
+RemoteHttpRunner::SetRemoteHostname(std::string_view Hostname)
+{
+ if (!Hostname.empty())
+ {
+ m_DisplayName = fmt::format("{} ({})", m_HostName, Hostname);
+ }
+}
+
RemoteHttpRunner::~RemoteHttpRunner()
{
Shutdown();
@@ -108,6 +119,7 @@ RemoteHttpRunner::Shutdown()
for (auto& [RemoteLsn, HttpAction] : Remaining)
{
ZEN_DEBUG("shutdown: marking remote action LSN {} (local LSN {}) as Failed", RemoteLsn, HttpAction.Action->ActionLsn);
+ HttpAction.Action->FailureReason = "remote runner shutdown";
HttpAction.Action->SetActionState(RunnerAction::State::Failed);
}
}
@@ -213,11 +225,13 @@ RemoteHttpRunner::QueryCapacity()
return 0;
}
- // Estimate how much more work we're ready to accept
+ // Estimate how much more work we're ready to accept.
+ // Include actions currently being submitted over HTTP so we don't
+ // keep queueing new submissions while previous ones are still in flight.
RwLock::SharedLockScope _{m_RunningLock};
- size_t RunningCount = m_RemoteRunningMap.size();
+ size_t RunningCount = m_RemoteRunningMap.size() + m_InFlightSubmissions.load(std::memory_order_relaxed);
if (RunningCount >= size_t(m_MaxRunningActions))
{
@@ -232,6 +246,9 @@ RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
{
ZEN_TRACE_CPU("RemoteHttpRunner::SubmitActions");
+ m_InFlightSubmissions.fetch_add(Actions.size(), std::memory_order_relaxed);
+ auto InFlightGuard = MakeGuard([&] { m_InFlightSubmissions.fetch_sub(Actions.size(), std::memory_order_relaxed); });
+
if (Actions.size() <= 1)
{
std::vector<SubmitResult> Results;
@@ -246,7 +263,7 @@ RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
// Collect distinct QueueIds and ensure remote queues exist once per queue
- std::unordered_map<int, Oid> QueueTokens; // QueueId → remote token (0 stays as Zero)
+ std::unordered_map<int, Oid> QueueTokens; // QueueId -> remote token (0 stays as Zero)
for (const Ref<RunnerAction>& Action : Actions)
{
@@ -359,108 +376,141 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
}
}
- // Enqueue job. If the remote returns FailedDependency (424), it means it
- // cannot resolve the worker/function — re-register the worker and retry once.
+ // Submit the action to the remote. In eager-attach mode we build a
+ // CbPackage with all referenced attachments upfront to avoid the 404
+ // round-trip. In the default mode we POST the bare object first and
+ // only upload missing attachments if the remote requests them.
+ //
+ // In both modes, FailedDependency (424) triggers a worker re-register
+ // and a single retry.
CbObject Result;
HttpClient::Response WorkResponse;
HttpResponseCode WorkResponseCode{};
- for (int Attempt = 0; Attempt < 2; ++Attempt)
- {
- WorkResponse = m_Http.Post(SubmitUrl, ActionObj);
- WorkResponseCode = WorkResponse.StatusCode;
-
- if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0)
- {
- ZEN_WARN("remote {} returned FailedDependency for action {} — re-registering worker and retrying",
- m_Http.GetBaseUri(),
- ActionId);
-
- (void)RegisterWorker(Action->Worker.Descriptor);
- }
- else
- {
- break;
- }
- }
-
- if (WorkResponseCode == HttpResponseCode::OK)
- {
- Result = WorkResponse.AsObject();
- }
- else if (WorkResponseCode == HttpResponseCode::NotFound)
+ if (m_EagerAttach)
{
- // Not all attachments are present
-
- // Build response package including all required attachments
-
CbPackage Pkg;
Pkg.SetObject(ActionObj);
- CbObject Response = WorkResponse.AsObject();
+ ActionObj.IterateAttachments([&](CbFieldView Field) {
+ const IoHash AttachHash = Field.AsHash();
- for (auto& Item : Response["need"sv])
- {
- const IoHash NeedHash = Item.AsHash();
-
- if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(AttachHash))
{
uint64_t DataRawSize = 0;
IoHash DataRawHash;
CompressedBuffer Compressed =
CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
- ZEN_ASSERT(DataRawHash == NeedHash);
+ Pkg.AddAttachment(CbAttachment(Compressed, AttachHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
+ }
+ });
+
+ for (int Attempt = 0; Attempt < 2; ++Attempt)
+ {
+ WorkResponse = m_Http.Post(SubmitUrl, Pkg);
+ WorkResponseCode = WorkResponse.StatusCode;
+
+ if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0)
+ {
+ ZEN_WARN("remote {} returned FailedDependency for action {} - re-registering worker and retrying",
+ m_Http.GetBaseUri(),
+ ActionId);
- Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ (void)RegisterWorker(Action->Worker.Descriptor);
}
else
{
- // No such attachment
-
- return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)};
+ break;
}
}
+ }
+ else
+ {
+ for (int Attempt = 0; Attempt < 2; ++Attempt)
+ {
+ WorkResponse = m_Http.Post(SubmitUrl, ActionObj);
+ WorkResponseCode = WorkResponse.StatusCode;
- // Post resulting package
+ if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0)
+ {
+ ZEN_WARN("remote {} returned FailedDependency for action {} - re-registering worker and retrying",
+ m_Http.GetBaseUri(),
+ ActionId);
- HttpClient::Response PayloadResponse = m_Http.Post(SubmitUrl, Pkg);
+ (void)RegisterWorker(Action->Worker.Descriptor);
+ }
+ else
+ {
+ break;
+ }
+ }
- if (!PayloadResponse)
+ if (WorkResponseCode == HttpResponseCode::NotFound)
{
- ZEN_WARN("unable to register payloads for action {} at {}{}", ActionId, m_Http.GetBaseUri(), SubmitUrl);
+ // Remote needs attachments - resolve them and retry with a CbPackage
- // TODO: include more information about the failure in the response
+ CbPackage Pkg;
+ Pkg.SetObject(ActionObj);
- return {.IsAccepted = false, .Reason = "HTTP request failed"};
- }
- else if (PayloadResponse.StatusCode == HttpResponseCode::OK)
- {
- Result = PayloadResponse.AsObject();
- }
- else
- {
- // Unexpected response
-
- const int ResponseStatusCode = (int)PayloadResponse.StatusCode;
-
- ZEN_WARN("unable to register payloads for action {} at {}{} (error: {} {})",
- ActionId,
- m_Http.GetBaseUri(),
- SubmitUrl,
- ResponseStatusCode,
- ToString(ResponseStatusCode));
-
- return {.IsAccepted = false,
- .Reason = fmt::format("unexpected response code {} {} from {}{}",
- ResponseStatusCode,
- ToString(ResponseStatusCode),
- m_Http.GetBaseUri(),
- SubmitUrl)};
+ CbObject Response = WorkResponse.AsObject();
+
+ for (auto& Item : Response["need"sv])
+ {
+ const IoHash NeedHash = Item.AsHash();
+
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ {
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
+
+ ZEN_ASSERT(DataRawHash == NeedHash);
+
+ Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
+ }
+ else
+ {
+ return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)};
+ }
+ }
+
+ HttpClient::Response PayloadResponse = m_Http.Post(SubmitUrl, Pkg);
+
+ if (!PayloadResponse)
+ {
+ ZEN_WARN("unable to register payloads for action {} at {}{}", ActionId, m_Http.GetBaseUri(), SubmitUrl);
+ return {.IsAccepted = false, .Reason = "HTTP request failed"};
+ }
+
+ WorkResponse = std::move(PayloadResponse);
+ WorkResponseCode = WorkResponse.StatusCode;
}
}
+ if (WorkResponseCode == HttpResponseCode::OK)
+ {
+ Result = WorkResponse.AsObject();
+ }
+ else if (!WorkResponse)
+ {
+ ZEN_WARN("submit of action {} to {}{} failed", ActionId, m_Http.GetBaseUri(), SubmitUrl);
+ return {.IsAccepted = false, .Reason = "HTTP request failed"};
+ }
+ else if (!IsHttpSuccessCode(WorkResponseCode))
+ {
+ const int Code = static_cast<int>(WorkResponseCode);
+ ZEN_WARN("submit of action {} to {}{} returned {} {}", ActionId, m_Http.GetBaseUri(), SubmitUrl, Code, ToString(Code));
+ return {.IsAccepted = false,
+ .Reason = fmt::format("unexpected response code {} {} from {}{}", Code, ToString(Code), m_Http.GetBaseUri(), SubmitUrl)};
+ }
+
if (Result)
{
if (const int32_t LsnField = Result["lsn"].AsInt32(0))
@@ -512,83 +562,111 @@ RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vec
CbObjectWriter Body;
Body.BeginArray("actions"sv);
+ std::unordered_set<IoHash, IoHash::Hasher> AttachmentsSeen;
+
for (const Ref<RunnerAction>& Action : Actions)
{
Action->ExecutionLocation = m_HostName;
MaybeDumpAction(Action->ActionLsn, Action->ActionObj);
Body.AddObject(Action->ActionObj);
+
+ if (m_EagerAttach)
+ {
+ Action->ActionObj.IterateAttachments([&](CbFieldView Field) { AttachmentsSeen.insert(Field.AsHash()); });
+ }
}
Body.EndArray();
- // POST the batch
-
- HttpClient::Response Response = m_Http.Post(SubmitUrl, Body.Save());
-
- if (Response.StatusCode == HttpResponseCode::OK)
- {
- return ParseBatchResponse(Response, Actions);
- }
+ // In eager-attach mode, build a CbPackage with all referenced attachments
+ // so the remote can accept in a single round-trip. Otherwise POST a bare
+ // CbObject and handle the 404 need-list flow.
- if (Response.StatusCode == HttpResponseCode::NotFound)
+ if (m_EagerAttach)
{
- // Server needs attachments — resolve them and retry with a CbPackage
-
- CbObject NeedObj = Response.AsObject();
-
CbPackage Pkg;
Pkg.SetObject(Body.Save());
- for (auto& Item : NeedObj["need"sv])
+ for (const IoHash& AttachHash : AttachmentsSeen)
{
- const IoHash NeedHash = Item.AsHash();
-
- if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(AttachHash))
{
uint64_t DataRawSize = 0;
IoHash DataRawHash;
CompressedBuffer Compressed =
CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
- ZEN_ASSERT(DataRawHash == NeedHash);
-
- Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
- }
- else
- {
- ZEN_WARN("batch submit: missing attachment {} — falling back to individual submit", NeedHash);
- return FallbackToIndividualSubmit(Actions);
+ Pkg.AddAttachment(CbAttachment(Compressed, AttachHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
}
}
- HttpClient::Response RetryResponse = m_Http.Post(SubmitUrl, Pkg);
+ HttpClient::Response Response = m_Http.Post(SubmitUrl, Pkg);
- if (RetryResponse.StatusCode == HttpResponseCode::OK)
+ if (Response.StatusCode == HttpResponseCode::OK)
{
- return ParseBatchResponse(RetryResponse, Actions);
+ return ParseBatchResponse(Response, Actions);
}
-
- ZEN_WARN("batch submit retry failed with {} {} — falling back to individual submit",
- (int)RetryResponse.StatusCode,
- ToString(RetryResponse.StatusCode));
- return FallbackToIndividualSubmit(Actions);
- }
-
- // Unexpected status or connection error — fall back to individual submission
-
- if (Response)
- {
- ZEN_WARN("batch submit to {}{} returned {} {} — falling back to individual submit",
- m_Http.GetBaseUri(),
- SubmitUrl,
- (int)Response.StatusCode,
- ToString(Response.StatusCode));
}
else
{
- ZEN_WARN("batch submit to {}{} failed — falling back to individual submit", m_Http.GetBaseUri(), SubmitUrl);
+ HttpClient::Response Response = m_Http.Post(SubmitUrl, Body.Save());
+
+ if (Response.StatusCode == HttpResponseCode::OK)
+ {
+ return ParseBatchResponse(Response, Actions);
+ }
+
+ if (Response.StatusCode == HttpResponseCode::NotFound)
+ {
+ CbObject NeedObj = Response.AsObject();
+
+ CbPackage Pkg;
+ Pkg.SetObject(Body.Save());
+
+ for (auto& Item : NeedObj["need"sv])
+ {
+ const IoHash NeedHash = Item.AsHash();
+
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ {
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
+
+ ZEN_ASSERT(DataRawHash == NeedHash);
+
+ Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
+ }
+ else
+ {
+ ZEN_WARN("batch submit: missing attachment {} - falling back to individual submit", NeedHash);
+ return FallbackToIndividualSubmit(Actions);
+ }
+ }
+
+ HttpClient::Response RetryResponse = m_Http.Post(SubmitUrl, Pkg);
+
+ if (RetryResponse.StatusCode == HttpResponseCode::OK)
+ {
+ return ParseBatchResponse(RetryResponse, Actions);
+ }
+
+ ZEN_WARN("batch submit retry failed with {} {} - falling back to individual submit",
+ (int)RetryResponse.StatusCode,
+ ToString(RetryResponse.StatusCode));
+ return FallbackToIndividualSubmit(Actions);
+ }
}
+ // Unexpected status or connection error - fall back to individual submission
+
+ ZEN_WARN("batch submit to {}{} failed - falling back to individual submit", m_Http.GetBaseUri(), SubmitUrl);
+
return FallbackToIndividualSubmit(Actions);
}
@@ -849,7 +927,7 @@ RemoteHttpRunner::MonitorThreadFunction()
SweepOnce();
}
- // Signal received — may be a WS wakeup or a quit signal
+ // Signal received - may be a WS wakeup or a quit signal
SweepOnce();
} while (m_MonitorThreadEnabled);
@@ -869,9 +947,10 @@ RemoteHttpRunner::SweepRunningActions()
{
for (auto& FieldIt : Completed["completed"sv])
{
- CbObjectView EntryObj = FieldIt.AsObjectView();
- const int32_t CompleteLsn = EntryObj["lsn"sv].AsInt32();
- std::string_view StateName = EntryObj["state"sv].AsString();
+ CbObjectView EntryObj = FieldIt.AsObjectView();
+ const int32_t CompleteLsn = EntryObj["lsn"sv].AsInt32();
+ std::string_view StateName = EntryObj["state"sv].AsString();
+ std::string_view FailureReason = EntryObj["reason"sv].AsString();
RunnerAction::State RemoteState = RunnerAction::FromString(StateName);
@@ -884,6 +963,7 @@ RemoteHttpRunner::SweepRunningActions()
{
HttpRunningAction CompletedAction = std::move(CompleteIt->second);
CompletedAction.RemoteState = RemoteState;
+ CompletedAction.FailureReason = std::string(FailureReason);
if (RemoteState == RunnerAction::State::Completed && ResponseJob)
{
@@ -927,16 +1007,44 @@ RemoteHttpRunner::SweepRunningActions()
{
const int ActionLsn = HttpAction.Action->ActionLsn;
- ZEN_DEBUG("action {} LSN {} (remote LSN {}) -> {}",
- HttpAction.Action->ActionId,
- ActionLsn,
- HttpAction.RemoteActionLsn,
- RunnerAction::ToString(HttpAction.RemoteState));
-
if (HttpAction.RemoteState == RunnerAction::State::Completed)
{
+ ZEN_DEBUG("action {} LSN {} (remote LSN {}) completed on {}",
+ HttpAction.Action->ActionId,
+ ActionLsn,
+ HttpAction.RemoteActionLsn,
+ m_HostName);
HttpAction.Action->SetResult(std::move(HttpAction.ActionResults));
}
+ else if (HttpAction.RemoteState == RunnerAction::State::Failed || HttpAction.RemoteState == RunnerAction::State::Abandoned)
+ {
+ HttpAction.Action->FailureReason = HttpAction.FailureReason;
+ if (HttpAction.FailureReason.empty())
+ {
+ ZEN_WARN("action {} ({}) {} on remote {}",
+ HttpAction.Action->ActionId,
+ ActionLsn,
+ RunnerAction::ToString(HttpAction.RemoteState),
+ m_HostName);
+ }
+ else
+ {
+ ZEN_WARN("action {} ({}) {} on remote {}: {}",
+ HttpAction.Action->ActionId,
+ ActionLsn,
+ RunnerAction::ToString(HttpAction.RemoteState),
+ m_HostName,
+ HttpAction.FailureReason);
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("action {} LSN {} (remote LSN {}) -> {}",
+ HttpAction.Action->ActionId,
+ ActionLsn,
+ HttpAction.RemoteActionLsn,
+ RunnerAction::ToString(HttpAction.RemoteState));
+ }
HttpAction.Action->SetActionState(HttpAction.RemoteState);
}
diff --git a/src/zencompute/runners/remotehttprunner.h b/src/zencompute/runners/remotehttprunner.h
index c17d0cf2a..521bf2f82 100644
--- a/src/zencompute/runners/remotehttprunner.h
+++ b/src/zencompute/runners/remotehttprunner.h
@@ -54,8 +54,10 @@ public:
[[nodiscard]] virtual size_t QueryCapacity() override;
[[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;
virtual void CancelRemoteQueue(int QueueId) override;
+ [[nodiscard]] virtual std::string_view GetDisplayName() const override { return m_DisplayName; }
std::string_view GetHostName() const { return m_HostName; }
+ void SetRemoteHostname(std::string_view Hostname);
protected:
LoggerRef Log() { return m_Log; }
@@ -65,12 +67,15 @@ private:
ChunkResolver& m_ChunkResolver;
WorkerThreadPool& m_WorkerPool;
std::string m_HostName;
+ std::string m_DisplayName;
std::string m_BaseUrl;
HttpClient m_Http;
- std::atomic<bool> m_AcceptNewActions{true};
- int32_t m_MaxRunningActions = 256; // arbitrary limit for testing
- int32_t m_MaxBatchSize = 50;
+ std::atomic<bool> m_AcceptNewActions{true};
+ int32_t m_MaxRunningActions = 256; // arbitrary limit for testing
+ int32_t m_MaxBatchSize = 50;
+ bool m_EagerAttach = true; ///< Send attachments with every submit instead of the two-step 404 retry
+ std::atomic<size_t> m_InFlightSubmissions{0}; // actions currently being submitted over HTTP
struct HttpRunningAction
{
@@ -78,6 +83,7 @@ private:
int RemoteActionLsn = 0; // Remote LSN
RunnerAction::State RemoteState = RunnerAction::State::Failed;
CbPackage ActionResults;
+ std::string FailureReason;
};
RwLock m_RunningLock;
@@ -90,7 +96,7 @@ private:
size_t SweepRunningActions();
RwLock m_QueueTokenLock;
- std::unordered_map<int, Oid> m_RemoteQueueTokens; // local QueueId → remote queue token
+ std::unordered_map<int, Oid> m_RemoteQueueTokens; // local QueueId -> remote queue token
// Stable identity for this runner instance, used as part of the idempotency key when
// creating remote queues. Generated once at construction and never changes.
diff --git a/src/zencompute/runners/windowsrunner.cpp b/src/zencompute/runners/windowsrunner.cpp
index 92ee65c2d..c6b3e82ea 100644
--- a/src/zencompute/runners/windowsrunner.cpp
+++ b/src/zencompute/runners/windowsrunner.cpp
@@ -48,7 +48,9 @@ WindowsProcessRunner::WindowsProcessRunner(ChunkResolver& Resolver,
if (m_JobObject)
{
JOBOBJECT_EXTENDED_LIMIT_INFORMATION ExtLimits{};
- ExtLimits.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE | JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION;
+ ExtLimits.BasicLimitInformation.LimitFlags =
+ JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE | JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION | JOB_OBJECT_LIMIT_PRIORITY_CLASS;
+ ExtLimits.BasicLimitInformation.PriorityClass = BELOW_NORMAL_PRIORITY_CLASS;
SetInformationJobObject(m_JobObject, JobObjectExtendedLimitInformation, &ExtLimits, sizeof(ExtLimits));
JOBOBJECT_BASIC_UI_RESTRICTIONS UiRestrictions{};
@@ -483,7 +485,7 @@ WindowsProcessRunner::SampleProcessCpu(RunningAction& Running)
const uint64_t CurrentOsTicks = FtToU64(KernelTime) + FtToU64(UserTime);
const uint64_t NowTicks = GetHifreqTimerValue();
- // Cumulative CPU seconds (absolute, available from first sample): 100ns → seconds
+ // Cumulative CPU seconds (absolute, available from first sample): 100ns -> seconds
Running.Action->CpuSeconds.store(static_cast<float>(static_cast<double>(CurrentOsTicks) / 10'000'000.0), std::memory_order_relaxed);
if (Running.LastCpuSampleTicks != 0 && Running.LastCpuOsTicks != 0)
@@ -492,7 +494,7 @@ WindowsProcessRunner::SampleProcessCpu(RunningAction& Running)
if (ElapsedMs > 0)
{
const uint64_t DeltaOsTicks = CurrentOsTicks - Running.LastCpuOsTicks;
- // 100ns → ms: divide by 10000; then as percent of elapsed ms
+ // 100ns -> ms: divide by 10000; then as percent of elapsed ms
const float CpuPct = static_cast<float>(static_cast<double>(DeltaOsTicks) / 10000.0 / ElapsedMs * 100.0);
Running.Action->CpuUsagePercent.store(CpuPct, std::memory_order_relaxed);
}
diff --git a/src/zencompute/runners/winerunner.cpp b/src/zencompute/runners/winerunner.cpp
index b4fafb467..29ab93663 100644
--- a/src/zencompute/runners/winerunner.cpp
+++ b/src/zencompute/runners/winerunner.cpp
@@ -96,7 +96,9 @@ WineProcessRunner::SubmitAction(Ref<RunnerAction> Action)
if (ChildPid == 0)
{
- // Child process
+ // Child process - lower priority so workers don't starve the main server
+ nice(5);
+
if (chdir(SandboxPathStr.c_str()) != 0)
{
_exit(127);