aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-13 16:38:16 +0200
committerGitHub Enterprise <[email protected]>2026-04-13 16:38:16 +0200
commit795345e5fd7974a1f5227d507a58bb3ed75eafd5 (patch)
tree7a0f142bf562c3590400586c82b0e7a1b5ad6493 /src/zencompute
parent5.8.4-pre2 (diff)
downloadzen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.tar.xz
zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.zip
Compute OIDC auth, async Horde agents, and orchestrator improvements (#913)
Rework of the Horde agent subsystem from synchronous per-thread I/O to an async ASIO-driven architecture, plus provisioner scale-down with graceful draining, OIDC authentication, scheduler improvements, and dashboard UI for provisioner control. ### Async Horde Agent Rewrite - Replace synchronous `HordeAgent` (one thread per agent, blocking I/O) with `AsyncHordeAgent` — an ASIO state machine running on a shared `io_context` thread pool - Replace `TcpComputeTransport`/`AesComputeTransport` with `AsyncTcpComputeTransport`/`AsyncAesComputeTransport` - Replace `AgentMessageChannel` with `AsyncAgentMessageChannel` using frame queuing and ASIO timers - Delete `ComputeBuffer` and `ComputeChannel` ring-buffer classes (no longer needed) ### Provisioner Drain / Scale-Down - `HordeProvisioner` can now drain agents when target core count is lowered: queries each agent's `/compute/session/status` for workload, selects candidates by largest-fit/lowest-workload, and sends `/compute/session/drain` - Configurable `--horde-drain-grace-period` (default 300s) before force-kill - Implement `IProvisionerStateProvider` interface to expose provisioner state to the orchestrator HTTP layer - Forward `--coordinator-session`, `--provision-clean`, and `--provision-tracehost` through both Horde and Nomad provisioners to spawned workers ### OIDC Authentication - `HordeClient` accepts an `AccessTokenProvider` (refreshable token function) as alternative to static `--horde-token` - Wire up `OidcToken.exe` auto-discovery via `httpclientauth::CreateFromOidcTokenExecutable` with `--HordeUrl` mode - New `--horde-oidctoken-exe-path` CLI option for explicit path override ### Orchestrator & Scheduler - Orchestrator generates a session ID at startup; workers include `coordinator_session` in announcements so the orchestrator can reject stale-session workers - New `Rejected` action state — when a remote runner declines at capacity, the action is rescheduled without retry count increment - Reduce scheduler lock contention: snapshot pending actions under shared lock, sort/trim outside the lock - Parallelize remote action submission across runners via `WorkerThreadPool` with slow-submit warnings - New action field `FailureReason` populated by all runner types (exit codes, sandbox failures, exceptions) - New endpoints: `session/drain`, `session/status`, `session/sunset`, `provisioner/status`, `provisioner/target` ### Remote Execution - Eager-attach mode for `RemoteHttpRunner` — bundles all attachments upfront in a `CbPackage` for single-roundtrip submits - Track in-flight submissions to prevent over-queuing - Show remote runner hostname in `GetDisplayName()` - `--announce-url` to override the endpoint announced to the coordinator (e.g. relay-visible address) ### Frontend Dashboard - Delete standalone `compute.html` (925 lines) and `orchestrator.html` (669 lines), consolidated into JS page modules - Add provisioner panel to orchestrator dashboard: target/active/estimated core counts, draining agent count - Editable target-cores input with debounced POST to `/orch/provisioner/target` - Per-agent provisioning status badges (active / draining / deallocated) in the agents table - Active vs total CPU counts in agents summary row ### CLI - New `zen compute record-start` / `record-stop` subcommands - `zen exec` progress bar with submit and completion phases, atomic work counters, `--progress` mode (Pretty/Plain/Quiet) ### Other - `DataDir` supports environment variable expansion - Worker manifest validation checks for `worker.zcb` marker to detect incomplete cached directories - Linux/Mac runners `nice(5)` child processes to avoid starving the main server - `ComputeService::SetShutdownCallback` wired to `RequestExit` via `session/sunset` - Curl HTTP client logs effective URL on failure - `MachineInfo` carries `Pool` and `Mode` from Horde response - Horde bundle creation includes `.pdb` on Windows
Diffstat (limited to 'src/zencompute')
-rw-r--r--src/zencompute/CLAUDE.md7
-rw-r--r--src/zencompute/computeservice.cpp164
-rw-r--r--src/zencompute/httpcomputeservice.cpp95
-rw-r--r--src/zencompute/httporchestrator.cpp135
-rw-r--r--src/zencompute/include/zencompute/computeservice.h7
-rw-r--r--src/zencompute/include/zencompute/httpcomputeservice.h4
-rw-r--r--src/zencompute/include/zencompute/httporchestrator.h17
-rw-r--r--src/zencompute/include/zencompute/orchestratorservice.h12
-rw-r--r--src/zencompute/include/zencompute/provisionerstate.h38
-rw-r--r--src/zencompute/orchestratorservice.cpp29
-rw-r--r--src/zencompute/runners/functionrunner.cpp120
-rw-r--r--src/zencompute/runners/functionrunner.h27
-rw-r--r--src/zencompute/runners/linuxrunner.cpp6
-rw-r--r--src/zencompute/runners/localrunner.cpp19
-rw-r--r--src/zencompute/runners/macrunner.cpp6
-rw-r--r--src/zencompute/runners/managedrunner.cpp2
-rw-r--r--src/zencompute/runners/remotehttprunner.cpp360
-rw-r--r--src/zencompute/runners/remotehttprunner.h12
-rw-r--r--src/zencompute/runners/windowsrunner.cpp4
-rw-r--r--src/zencompute/runners/winerunner.cpp4
20 files changed, 848 insertions, 220 deletions
diff --git a/src/zencompute/CLAUDE.md b/src/zencompute/CLAUDE.md
index 750879d5a..bb574edc2 100644
--- a/src/zencompute/CLAUDE.md
+++ b/src/zencompute/CLAUDE.md
@@ -218,6 +218,13 @@ Worker handler logic is extracted into private helpers (`HandleWorkersGet`, `Han
**Locking discipline:** The three action maps (`m_PendingActions`, `m_RunningMap`, `m_ResultsMap`) are guarded by a single `m_ActionMapLock`. This eliminates lock-ordering concerns between maps and prevents actions from being temporarily absent from all maps during state transitions. Runner-level `m_RunningLock` in `LocalProcessRunner` / `RemoteHttpRunner` is a separate lock on a different class — unrelated to the session-level action map lock.
+**Lock ordering:** When acquiring multiple session-level locks, always acquire in this order to avoid deadlocks:
+1. `m_ActionMapLock` (session action maps)
+2. `QueueEntry::m_Lock` (per-queue state)
+3. `m_ActionHistoryLock` (action history ring)
+
+Never acquire an earlier lock while holding a later one (e.g. never acquire `m_ActionMapLock` while holding `QueueEntry::m_Lock`).
+
**Atomic fields** for counters and simple state: queue counts, `CpuUsagePercent`, `CpuSeconds`, `RetryCount`, `RunnerAction::m_ActionState`.
**Update decoupling:** Runners call `PostUpdate(RunnerAction*)` rather than directly mutating service state. The scheduler thread batches and deduplicates updates.
diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp
index aaf34cbe2..852e93fa0 100644
--- a/src/zencompute/computeservice.cpp
+++ b/src/zencompute/computeservice.cpp
@@ -121,6 +121,8 @@ struct ComputeServiceSession::Impl
, m_LocalSubmitPool(GetLargeWorkerPool(EWorkloadType::Burst))
, m_RemoteSubmitPool(GetLargeWorkerPool(EWorkloadType::Burst))
{
+ m_RemoteRunnerGroup.SetWorkerPool(&m_RemoteSubmitPool);
+
// Create a non-expiring, non-deletable implicit queue for legacy endpoints
auto Result = CreateQueue("implicit"sv, {}, {});
m_ImplicitQueueId = Result.QueueId;
@@ -240,8 +242,9 @@ struct ComputeServiceSession::Impl
// Recording
- void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath);
- void StopRecording();
+ bool StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath);
+ bool StopRecording();
+ bool IsRecording() const;
std::unique_ptr<ActionRecorder> m_Recorder;
@@ -615,6 +618,7 @@ ComputeServiceSession::Impl::UpdateCoordinatorState()
m_KnownWorkerUris.insert(UriStr);
auto* NewRunner = new RemoteHttpRunner(m_ChunkResolver, m_OrchestratorBasePath, UriStr, m_RemoteSubmitPool);
+ NewRunner->SetRemoteHostname(Hostname);
SyncWorkersToRunner(*NewRunner);
m_RemoteRunnerGroup.AddRunner(NewRunner);
}
@@ -716,24 +720,44 @@ ComputeServiceSession::Impl::ShutdownRunners()
m_RemoteRunnerGroup.Shutdown();
}
-void
+bool
ComputeServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath)
{
+ if (m_Recorder)
+ {
+ ZEN_WARN("recording is already active");
+ return false;
+ }
+
ZEN_INFO("starting recording to '{}'", RecordingPath);
m_Recorder = std::make_unique<ActionRecorder>(InCidStore, RecordingPath);
ZEN_INFO("started recording to '{}'", RecordingPath);
+ return true;
}
-void
+bool
ComputeServiceSession::Impl::StopRecording()
{
+ if (!m_Recorder)
+ {
+ ZEN_WARN("no recording is active");
+ return false;
+ }
+
ZEN_INFO("stopping recording");
m_Recorder = nullptr;
ZEN_INFO("stopped recording");
+ return true;
+}
+
+bool
+ComputeServiceSession::Impl::IsRecording() const
+{
+ return m_Recorder != nullptr;
}
std::vector<ComputeServiceSession::RunningActionInfo>
@@ -1128,6 +1152,10 @@ ComputeServiceSession::Impl::GetCompleted(CbWriter& Cbo)
Cbo.BeginObject();
Cbo << "lsn"sv << Lsn;
Cbo << "state"sv << RunnerAction::ToString(Action->ActionState());
+ if (!Action->FailureReason.empty())
+ {
+ Cbo << "reason"sv << Action->FailureReason;
+ }
Cbo.EndObject();
}
});
@@ -1416,8 +1444,8 @@ ComputeServiceSession::Impl::GetQueueCompleted(int QueueId, CbWriter& Cbo)
if (Queue)
{
- Queue->m_Lock.WithSharedLock([&] {
- m_ActionMapLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
+ Queue->m_Lock.WithSharedLock([&] {
for (int Lsn : Queue->FinishedLsns)
{
if (m_ResultsMap.contains(Lsn))
@@ -1530,12 +1558,12 @@ ComputeServiceSession::Impl::SchedulePendingActions()
static Stopwatch DumpRunningTimer;
auto _ = MakeGuard([&] {
- ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results",
- ScheduledCount,
- RunningCount,
- m_RetiredCount.load(),
- PendingCount,
- ResultCount);
+ ZEN_DEBUG("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results",
+ ScheduledCount,
+ RunningCount,
+ m_RetiredCount.load(),
+ PendingCount,
+ ResultCount);
if (DumpRunningTimer.GetElapsedTimeMs() > 30000)
{
@@ -1584,13 +1612,13 @@ ComputeServiceSession::Impl::SchedulePendingActions()
// Also note that the m_PendingActions list is not maintained
// here, that's done periodically in SchedulePendingActions()
- m_ActionMapLock.WithExclusiveLock([&] {
- if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Paused)
- {
- return;
- }
+ // Extract pending actions under a shared lock — we only need to read
+ // the map and take Ref copies. ActionState() is atomic so this is safe.
+ // Sorting and capacity trimming happen outside the lock to avoid
+ // blocking HTTP handlers on O(N log N) work with large pending queues.
- if (m_PendingActions.empty())
+ m_ActionMapLock.WithSharedLock([&] {
+ if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Paused)
{
return;
}
@@ -1610,6 +1638,7 @@ ComputeServiceSession::Impl::SchedulePendingActions()
case RunnerAction::State::Completed:
case RunnerAction::State::Failed:
case RunnerAction::State::Abandoned:
+ case RunnerAction::State::Rejected:
case RunnerAction::State::Cancelled:
break;
@@ -1620,30 +1649,30 @@ ComputeServiceSession::Impl::SchedulePendingActions()
}
}
- // Sort by priority descending, then by LSN ascending (FIFO within same priority)
- std::sort(ActionsToSchedule.begin(), ActionsToSchedule.end(), [](const Ref<RunnerAction>& A, const Ref<RunnerAction>& B) {
- if (A->Priority != B->Priority)
- {
- return A->Priority > B->Priority;
- }
- return A->ActionLsn < B->ActionLsn;
- });
+ PendingCount = m_PendingActions.size();
+ });
- if (ActionsToSchedule.size() > Capacity)
+ // Sort by priority descending, then by LSN ascending (FIFO within same priority)
+ std::sort(ActionsToSchedule.begin(), ActionsToSchedule.end(), [](const Ref<RunnerAction>& A, const Ref<RunnerAction>& B) {
+ if (A->Priority != B->Priority)
{
- ActionsToSchedule.resize(Capacity);
+ return A->Priority > B->Priority;
}
-
- PendingCount = m_PendingActions.size();
+ return A->ActionLsn < B->ActionLsn;
});
+ if (ActionsToSchedule.size() > Capacity)
+ {
+ ActionsToSchedule.resize(Capacity);
+ }
+
if (ActionsToSchedule.empty())
{
_.Dismiss();
return;
}
- ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size());
+ ZEN_DEBUG("attempting schedule of {} pending actions", ActionsToSchedule.size());
Stopwatch SubmitTimer;
std::vector<SubmitResult> SubmitResults = SubmitActions(ActionsToSchedule);
@@ -1663,10 +1692,10 @@ ComputeServiceSession::Impl::SchedulePendingActions()
}
}
- ZEN_INFO("scheduled {} pending actions in {} ({} rejected)",
- ScheduledActionCount,
- NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()),
- NotAcceptedCount);
+ ZEN_DEBUG("scheduled {} pending actions in {} ({} rejected)",
+ ScheduledActionCount,
+ NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()),
+ NotAcceptedCount);
ScheduledCount += ScheduledActionCount;
PendingCount -= ScheduledActionCount;
@@ -1975,6 +2004,14 @@ ComputeServiceSession::Impl::HandleActionUpdates()
break;
}
+ // Rejected — runner was at capacity, reschedule without retry cost
+ case RunnerAction::State::Rejected:
+ {
+ Action->ResetActionStateToPending();
+ ZEN_DEBUG("action {} ({}) rescheduled after runner rejection", Action->ActionId, ActionLsn);
+ break;
+ }
+
// Terminal states — move to results, record history, notify queue
case RunnerAction::State::Completed:
case RunnerAction::State::Failed:
@@ -2009,6 +2046,14 @@ ComputeServiceSession::Impl::HandleActionUpdates()
MaxRetries);
break;
}
+ else
+ {
+ ZEN_WARN("action {} ({}) {} after {} retries, not rescheduling",
+ Action->ActionId,
+ ActionLsn,
+ RunnerAction::ToString(TerminalState),
+ Action->RetryCount.load(std::memory_order_relaxed));
+ }
}
m_ActionMapLock.WithExclusiveLock([&] {
@@ -2101,10 +2146,9 @@ ComputeServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>&
ZEN_TRACE_CPU("ComputeServiceSession::SubmitActions");
std::vector<SubmitResult> Results(Actions.size());
- // First try submitting the batch to local runners in parallel
+ // First try submitting the batch to local runners
std::vector<SubmitResult> LocalResults = m_LocalRunnerGroup.SubmitActions(Actions);
- std::vector<size_t> RemoteIndices;
std::vector<Ref<RunnerAction>> RemoteActions;
for (size_t i = 0; i < Actions.size(); ++i)
@@ -2115,20 +2159,40 @@ ComputeServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>&
}
else
{
- RemoteIndices.push_back(i);
RemoteActions.push_back(Actions[i]);
+ Results[i] = SubmitResult{.IsAccepted = true, .Reason = "dispatched to remote"};
}
}
- // Submit remaining actions to remote runners in parallel
+ // Dispatch remaining actions to remote runners asynchronously.
+ // Mark actions as Submitting so the scheduler won't re-pick them.
+ // The remote runner will transition them to Running on success, or
+ // we mark them Failed on rejection so HandleActionUpdates retries.
if (!RemoteActions.empty())
{
- std::vector<SubmitResult> RemoteResults = m_RemoteRunnerGroup.SubmitActions(RemoteActions);
-
- for (size_t j = 0; j < RemoteIndices.size(); ++j)
+ for (const Ref<RunnerAction>& Action : RemoteActions)
{
- Results[RemoteIndices[j]] = std::move(RemoteResults[j]);
+ Action->SetActionState(RunnerAction::State::Submitting);
}
+
+ m_RemoteSubmitPool.ScheduleWork(
+ [this, RemoteActions = std::move(RemoteActions)]() {
+ std::vector<SubmitResult> RemoteResults = m_RemoteRunnerGroup.SubmitActions(RemoteActions);
+
+ for (size_t j = 0; j < RemoteResults.size(); ++j)
+ {
+ if (!RemoteResults[j].IsAccepted)
+ {
+ ZEN_DEBUG("remote submission rejected for action {} ({}): {}",
+ RemoteActions[j]->ActionId,
+ RemoteActions[j]->ActionLsn,
+ RemoteResults[j].Reason);
+
+ RemoteActions[j]->SetActionState(RunnerAction::State::Rejected);
+ }
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
return Results;
@@ -2194,16 +2258,22 @@ ComputeServiceSession::NotifyOrchestratorChanged()
m_Impl->NotifyOrchestratorChanged();
}
-void
+bool
ComputeServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath)
{
- m_Impl->StartRecording(InResolver, RecordingPath);
+ return m_Impl->StartRecording(InResolver, RecordingPath);
}
-void
+bool
ComputeServiceSession::StopRecording()
{
- m_Impl->StopRecording();
+ return m_Impl->StopRecording();
+}
+
+bool
+ComputeServiceSession::IsRecording() const
+{
+ return m_Impl->IsRecording();
}
ComputeServiceSession::ActionCounts
diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp
index 6cb975dd3..8cbb25afd 100644
--- a/src/zencompute/httpcomputeservice.cpp
+++ b/src/zencompute/httpcomputeservice.cpp
@@ -62,6 +62,8 @@ struct HttpComputeService::Impl
RwLock m_WsConnectionsLock;
std::vector<Ref<WebSocketConnection>> m_WsConnections;
+ std::function<void()> m_ShutdownCallback;
+
// Metrics
metrics::OperationTiming m_HttpRequests;
@@ -190,6 +192,65 @@ HttpComputeService::Impl::RegisterRoutes()
HttpVerb::kPost);
m_Router.RegisterRoute(
+ "session/drain",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (m_ComputeService.RequestStateTransition(ComputeServiceSession::SessionState::Draining))
+ {
+ CbObjectWriter Cbo;
+ Cbo << "state"sv << ToString(m_ComputeService.GetSessionState());
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+
+ CbObjectWriter Cbo;
+ Cbo << "error"sv
+ << "Cannot transition to Draining from current state"sv;
+ HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save());
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "session/status",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ CbObjectWriter Cbo;
+ Cbo << "state"sv << ToString(m_ComputeService.GetSessionState());
+ auto Counts = m_ComputeService.GetActionCounts();
+ Cbo << "actions_pending"sv << Counts.Pending;
+ Cbo << "actions_running"sv << Counts.Running;
+ Cbo << "actions_completed"sv << Counts.Completed;
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "session/sunset",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (m_ComputeService.RequestStateTransition(ComputeServiceSession::SessionState::Sunset))
+ {
+ CbObjectWriter Cbo;
+ Cbo << "state"sv << ToString(m_ComputeService.GetSessionState());
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+
+ if (m_ShutdownCallback)
+ {
+ m_ShutdownCallback();
+ }
+ return;
+ }
+
+ CbObjectWriter Cbo;
+ Cbo << "error"sv
+ << "Cannot transition to Sunset from current state"sv;
+ HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save());
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
"workers",
[this](HttpRouterRequest& Req) { HandleWorkersGet(Req.ServerRequest()); },
HttpVerb::kGet);
@@ -506,9 +567,19 @@ HttpComputeService::Impl::RegisterRoutes()
return HttpReq.WriteResponse(HttpResponseCode::Forbidden);
}
- m_ComputeService.StartRecording(m_CombinedResolver, m_BaseDir / "recording");
+ std::filesystem::path RecordingPath = m_BaseDir / "recording";
- return HttpReq.WriteResponse(HttpResponseCode::OK);
+ if (!m_ComputeService.StartRecording(m_CombinedResolver, RecordingPath))
+ {
+ CbObjectWriter Cbo;
+ Cbo << "error"
+ << "recording is already active";
+ return HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save());
+ }
+
+ CbObjectWriter Cbo;
+ Cbo << "path" << RecordingPath.string();
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
},
HttpVerb::kPost);
@@ -522,9 +593,19 @@ HttpComputeService::Impl::RegisterRoutes()
return HttpReq.WriteResponse(HttpResponseCode::Forbidden);
}
- m_ComputeService.StopRecording();
+ std::filesystem::path RecordingPath = m_BaseDir / "recording";
+
+ if (!m_ComputeService.StopRecording())
+ {
+ CbObjectWriter Cbo;
+ Cbo << "error"
+ << "no recording is active";
+ return HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save());
+ }
- return HttpReq.WriteResponse(HttpResponseCode::OK);
+ CbObjectWriter Cbo;
+ Cbo << "path" << RecordingPath.string();
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
},
HttpVerb::kPost);
@@ -1066,6 +1147,12 @@ HttpComputeService::GetActionCounts()
return m_Impl->m_ComputeService.GetActionCounts();
}
+void
+HttpComputeService::SetShutdownCallback(std::function<void()> Callback)
+{
+ m_Impl->m_ShutdownCallback = std::move(Callback);
+}
+
const char*
HttpComputeService::BaseUri() const
{
diff --git a/src/zencompute/httporchestrator.cpp b/src/zencompute/httporchestrator.cpp
index d92af8716..1f51e560e 100644
--- a/src/zencompute/httporchestrator.cpp
+++ b/src/zencompute/httporchestrator.cpp
@@ -7,6 +7,7 @@
# include <zencompute/orchestratorservice.h>
# include <zencore/compactbinarybuilder.h>
# include <zencore/logging.h>
+# include <zencore/session.h>
# include <zencore/string.h>
# include <zencore/system.h>
@@ -77,10 +78,47 @@ ParseWorkerAnnouncement(const CbObjectView& Data, OrchestratorService::WorkerAnn
return Ann.Id;
}
+static OrchestratorService::WorkerAnnotator
+MakeWorkerAnnotator(IProvisionerStateProvider* Prov)
+{
+ if (!Prov)
+ {
+ return {};
+ }
+ return [Prov](std::string_view WorkerId, CbObjectWriter& Cbo) {
+ AgentProvisioningStatus Status = Prov->GetAgentStatus(WorkerId);
+ if (Status != AgentProvisioningStatus::Unknown)
+ {
+ const char* StatusStr = (Status == AgentProvisioningStatus::Draining) ? "draining" : "active";
+ Cbo << "provisioner_status" << std::string_view(StatusStr);
+ }
+ };
+}
+
+bool
+HttpOrchestratorService::ValidateCoordinatorSession(const CbObjectView& Data, std::string_view WorkerId)
+{
+ std::string_view SessionStr = Data["coordinator_session"].AsString("");
+ if (SessionStr.empty())
+ {
+ return true; // backwards compatibility: accept announcements without a session
+ }
+ Oid Session = Oid::TryFromHexString(SessionStr);
+ if (Session == m_SessionId)
+ {
+ return true;
+ }
+ ZEN_WARN("rejecting stale announcement from '{}' (session {} != {})", WorkerId, SessionStr, m_SessionId.ToString());
+ return false;
+}
+
HttpOrchestratorService::HttpOrchestratorService(std::filesystem::path DataDir, bool EnableWorkerWebSocket)
: m_Service(std::make_unique<OrchestratorService>(std::move(DataDir), EnableWorkerWebSocket))
, m_Hostname(GetMachineName())
{
+ m_SessionId = zen::GetSessionId();
+ ZEN_INFO("orchestrator session id: {}", m_SessionId.ToString());
+
m_Router.AddMatcher("workerid", [](std::string_view Segment) { return IsValidWorkerId(Segment); });
m_Router.AddMatcher("clientid", [](std::string_view Segment) { return IsValidWorkerId(Segment); });
@@ -95,13 +133,17 @@ HttpOrchestratorService::HttpOrchestratorService(std::filesystem::path DataDir,
[this](HttpRouterRequest& Req) {
CbObjectWriter Cbo;
Cbo << "hostname" << std::string_view(m_Hostname);
+ Cbo << "session_id" << m_SessionId.ToString();
Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cbo.Save());
},
HttpVerb::kGet);
m_Router.RegisterRoute(
"provision",
- [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_Service->GetWorkerList()); },
+ [this](HttpRouterRequest& Req) {
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK,
+ m_Service->GetWorkerList(MakeWorkerAnnotator(m_Provisioner.load(std::memory_order_acquire))));
+ },
HttpVerb::kPost);
m_Router.RegisterRoute(
@@ -122,6 +164,11 @@ HttpOrchestratorService::HttpOrchestratorService(std::filesystem::path DataDir,
"characters and uri must start with http:// or https://");
}
+ if (!ValidateCoordinatorSession(Data, WorkerId))
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::Conflict, HttpContentType::kText, "Stale coordinator session");
+ }
+
m_Service->AnnounceWorker(Ann);
HttpReq.WriteResponse(HttpResponseCode::OK);
@@ -135,7 +182,10 @@ HttpOrchestratorService::HttpOrchestratorService(std::filesystem::path DataDir,
m_Router.RegisterRoute(
"agents",
- [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_Service->GetWorkerList()); },
+ [this](HttpRouterRequest& Req) {
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK,
+ m_Service->GetWorkerList(MakeWorkerAnnotator(m_Provisioner.load(std::memory_order_acquire))));
+ },
HttpVerb::kGet);
m_Router.RegisterRoute(
@@ -241,6 +291,59 @@ HttpOrchestratorService::HttpOrchestratorService(std::filesystem::path DataDir,
},
HttpVerb::kGet);
+ // Provisioner endpoints
+
+ m_Router.RegisterRoute(
+ "provisioner/status",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ CbObjectWriter Cbo;
+ if (IProvisionerStateProvider* Prov = m_Provisioner.load(std::memory_order_acquire))
+ {
+ Cbo << "name" << Prov->GetName();
+ Cbo << "target_cores" << Prov->GetTargetCoreCount();
+ Cbo << "estimated_cores" << Prov->GetEstimatedCoreCount();
+ Cbo << "active_cores" << Prov->GetActiveCoreCount();
+ Cbo << "agents" << Prov->GetAgentCount();
+ Cbo << "agents_draining" << Prov->GetDrainingAgentCount();
+ }
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "provisioner/target",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ CbObject Data = HttpReq.ReadPayloadObject();
+ int32_t Cores = Data["target_cores"].AsInt32(-1);
+
+ ZEN_INFO("provisioner/target: received request (target_cores={}, payload_valid={})", Cores, Data ? true : false);
+
+ if (Cores < 0)
+ {
+ ZEN_WARN("provisioner/target: bad request (target_cores={})", Cores);
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Missing or invalid target_cores field");
+ }
+
+ IProvisionerStateProvider* Prov = m_Provisioner.load(std::memory_order_acquire);
+ if (!Prov)
+ {
+ ZEN_WARN("provisioner/target: no provisioner configured");
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "No provisioner configured");
+ }
+
+ ZEN_INFO("provisioner/target: setting target to {} cores", Cores);
+ Prov->SetTargetCoreCount(static_cast<uint32_t>(Cores));
+
+ CbObjectWriter Cbo;
+ Cbo << "target_cores" << Prov->GetTargetCoreCount();
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kPost);
+
// Client tracking endpoints
m_Router.RegisterRoute(
@@ -411,6 +514,13 @@ HttpOrchestratorService::HandleRequest(HttpServerRequest& Request)
}
}
+void
+HttpOrchestratorService::SetProvisionerStateProvider(IProvisionerStateProvider* Provider)
+{
+ m_Provisioner.store(Provider, std::memory_order_release);
+ m_Service->SetProvisionerStateProvider(Provider);
+}
+
//////////////////////////////////////////////////////////////////////////
//
// IWebSocketHandler
@@ -488,6 +598,11 @@ HttpOrchestratorService::HandleWorkerWebSocketMessage(const WebSocketMessage& Ms
return {};
}
+ if (!ValidateCoordinatorSession(Data, WorkerId))
+ {
+ return {};
+ }
+
m_Service->AnnounceWorker(Ann);
return std::string(WorkerId);
}
@@ -563,7 +678,7 @@ HttpOrchestratorService::PushThreadFunction()
}
// Build combined JSON with worker list, provisioning history, clients, and client history
- CbObject WorkerList = m_Service->GetWorkerList();
+ CbObject WorkerList = m_Service->GetWorkerList(MakeWorkerAnnotator(m_Provisioner.load(std::memory_order_acquire)));
CbObject History = m_Service->GetProvisioningHistory(50);
CbObject ClientList = m_Service->GetClientList();
CbObject ClientHistory = m_Service->GetClientHistory(50);
@@ -615,6 +730,20 @@ HttpOrchestratorService::PushThreadFunction()
JsonBuilder.Append(ClientHistoryJsonView.substr(1, ClientHistoryJsonView.size() - 2));
}
+ // Emit provisioner stats if available
+ if (IProvisionerStateProvider* Prov = m_Provisioner.load(std::memory_order_acquire))
+ {
+ JsonBuilder.Append(
+ fmt::format(",\"provisioner\":{{\"name\":\"{}\",\"target_cores\":{},\"estimated_cores\":{}"
+ ",\"active_cores\":{},\"agents\":{},\"agents_draining\":{}}}",
+ Prov->GetName(),
+ Prov->GetTargetCoreCount(),
+ Prov->GetEstimatedCoreCount(),
+ Prov->GetActiveCoreCount(),
+ Prov->GetAgentCount(),
+ Prov->GetDrainingAgentCount()));
+ }
+
JsonBuilder.Append("}");
std::string_view Json = JsonBuilder.ToView();
diff --git a/src/zencompute/include/zencompute/computeservice.h b/src/zencompute/include/zencompute/computeservice.h
index ad556f546..97de4321a 100644
--- a/src/zencompute/include/zencompute/computeservice.h
+++ b/src/zencompute/include/zencompute/computeservice.h
@@ -279,7 +279,7 @@ public:
// sized to match RunnerAction::State::_Count but we can't use the enum here
// for dependency reasons, so just use a fixed size array and static assert in
// the implementation file
- uint64_t Timestamps[9] = {};
+ uint64_t Timestamps[10] = {};
};
[[nodiscard]] std::vector<ActionHistoryEntry> GetActionHistory(int Limit = 100);
@@ -305,8 +305,9 @@ public:
// Recording
- void StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath);
- void StopRecording();
+ bool StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath);
+ bool StopRecording();
+ bool IsRecording() const;
private:
void PostUpdate(RunnerAction* Action);
diff --git a/src/zencompute/include/zencompute/httpcomputeservice.h b/src/zencompute/include/zencompute/httpcomputeservice.h
index db3fce3c2..32f54f293 100644
--- a/src/zencompute/include/zencompute/httpcomputeservice.h
+++ b/src/zencompute/include/zencompute/httpcomputeservice.h
@@ -35,6 +35,10 @@ public:
void Shutdown();
+ /** Set a callback to be invoked when the session/sunset endpoint is hit.
+ * Typically wired to HttpServer::RequestExit() to shut down the process. */
+ void SetShutdownCallback(std::function<void()> Callback);
+
[[nodiscard]] ComputeServiceSession::ActionCounts GetActionCounts();
const char* BaseUri() const override;
diff --git a/src/zencompute/include/zencompute/httporchestrator.h b/src/zencompute/include/zencompute/httporchestrator.h
index 58b2c9152..ef0a1269a 100644
--- a/src/zencompute/include/zencompute/httporchestrator.h
+++ b/src/zencompute/include/zencompute/httporchestrator.h
@@ -2,10 +2,12 @@
#pragma once
+#include <zencompute/provisionerstate.h>
#include <zencompute/zencompute.h>
#include <zencore/logging.h>
#include <zencore/thread.h>
+#include <zencore/uid.h>
#include <zenhttp/httpserver.h>
#include <zenhttp/websocket.h>
@@ -65,6 +67,16 @@ public:
*/
void Shutdown();
+ /** Return the session ID generated at construction time. Provisioners
+ * pass this to spawned workers so the orchestrator can reject stale
+ * announcements from previous sessions. */
+ Oid GetSessionId() const { return m_SessionId; }
+
+ /** Register a provisioner whose target core count can be read and changed
+ * via the orchestrator HTTP API and dashboard. Caller retains ownership;
+ * the provider must outlive this service. */
+ void SetProvisionerStateProvider(IProvisionerStateProvider* Provider);
+
virtual const char* BaseUri() const override;
virtual void HandleRequest(HttpServerRequest& Request) override;
@@ -81,6 +93,11 @@ private:
std::unique_ptr<OrchestratorService> m_Service;
std::string m_Hostname;
+ Oid m_SessionId;
+ bool ValidateCoordinatorSession(const CbObjectView& Data, std::string_view WorkerId);
+
+ std::atomic<IProvisionerStateProvider*> m_Provisioner{nullptr};
+
// WebSocket push
#if ZEN_WITH_WEBSOCKETS
diff --git a/src/zencompute/include/zencompute/orchestratorservice.h b/src/zencompute/include/zencompute/orchestratorservice.h
index 549ff8e3c..2c49e22df 100644
--- a/src/zencompute/include/zencompute/orchestratorservice.h
+++ b/src/zencompute/include/zencompute/orchestratorservice.h
@@ -6,6 +6,7 @@
#if ZEN_WITH_COMPUTE_SERVICES
+# include <zencompute/provisionerstate.h>
# include <zencore/compactbinary.h>
# include <zencore/compactbinarybuilder.h>
# include <zencore/logbase.h>
@@ -90,9 +91,16 @@ public:
std::string Hostname;
};
- CbObject GetWorkerList();
+ /** Per-worker callback invoked during GetWorkerList serialization.
+ * The callback receives the worker ID and a CbObjectWriter positioned
+ * inside the worker's object, allowing the caller to append extra fields. */
+ using WorkerAnnotator = std::function<void(std::string_view WorkerId, CbObjectWriter& Cbo)>;
+
+ CbObject GetWorkerList(const WorkerAnnotator& Annotate = {});
void AnnounceWorker(const WorkerAnnouncement& Announcement);
+ void SetProvisionerStateProvider(IProvisionerStateProvider* Provider);
+
bool IsWorkerWebSocketEnabled() const;
void SetWorkerWebSocketConnected(std::string_view WorkerId, bool Connected);
@@ -171,6 +179,8 @@ private:
LoggerRef m_Log{"compute.orchestrator"};
bool m_EnableWorkerWebSocket = false;
+ std::atomic<IProvisionerStateProvider*> m_Provisioner{nullptr};
+
std::thread m_ProbeThread;
std::atomic<bool> m_ProbeThreadEnabled{true};
Event m_ProbeThreadEvent;
diff --git a/src/zencompute/include/zencompute/provisionerstate.h b/src/zencompute/include/zencompute/provisionerstate.h
new file mode 100644
index 000000000..e9af8a635
--- /dev/null
+++ b/src/zencompute/include/zencompute/provisionerstate.h
@@ -0,0 +1,38 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <cstdint>
+#include <string_view>
+
+namespace zen::compute {
+
+/** Per-agent provisioning status as seen by the provisioner. */
+enum class AgentProvisioningStatus
+{
+ Unknown, ///< Not known to the provisioner
+ Active, ///< Running and allocated
+ Draining, ///< Being gracefully deprovisioned
+};
+
+/** Abstract interface for querying and controlling a provisioner from the HTTP layer.
+ * This decouples the orchestrator service from specific provisioner implementations. */
+class IProvisionerStateProvider
+{
+public:
+ virtual ~IProvisionerStateProvider() = default;
+
+ virtual std::string_view GetName() const = 0; ///< e.g. "horde", "nomad"
+ virtual uint32_t GetTargetCoreCount() const = 0;
+ virtual uint32_t GetEstimatedCoreCount() const = 0;
+ virtual uint32_t GetActiveCoreCount() const = 0;
+ virtual uint32_t GetAgentCount() const = 0;
+ virtual uint32_t GetDrainingAgentCount() const { return 0; }
+ virtual void SetTargetCoreCount(uint32_t Count) = 0;
+
+ /** Return the provisioning status for a worker by its orchestrator ID
+ * (e.g. "horde-{LeaseId}"). Returns Unknown if the ID is not recognized. */
+ virtual AgentProvisioningStatus GetAgentStatus(std::string_view /*WorkerId*/) const { return AgentProvisioningStatus::Unknown; }
+};
+
+} // namespace zen::compute
diff --git a/src/zencompute/orchestratorservice.cpp b/src/zencompute/orchestratorservice.cpp
index 9ea695305..aee8fa63a 100644
--- a/src/zencompute/orchestratorservice.cpp
+++ b/src/zencompute/orchestratorservice.cpp
@@ -31,7 +31,7 @@ OrchestratorService::~OrchestratorService()
}
CbObject
-OrchestratorService::GetWorkerList()
+OrchestratorService::GetWorkerList(const WorkerAnnotator& Annotate)
{
ZEN_TRACE_CPU("OrchestratorService::GetWorkerList");
CbObjectWriter Cbo;
@@ -71,6 +71,10 @@ OrchestratorService::GetWorkerList()
Cbo << "ws_connected" << true;
}
Cbo << "dt" << Worker.LastSeen.GetElapsedTimeMs();
+ if (Annotate)
+ {
+ Annotate(WorkerId, Cbo);
+ }
Cbo.EndObject();
}
});
@@ -144,6 +148,12 @@ OrchestratorService::AnnounceWorker(const WorkerAnnouncement& Ann)
}
}
+void
+OrchestratorService::SetProvisionerStateProvider(IProvisionerStateProvider* Provider)
+{
+ m_Provisioner.store(Provider, std::memory_order_release);
+}
+
bool
OrchestratorService::IsWorkerWebSocketEnabled() const
{
@@ -607,6 +617,14 @@ OrchestratorService::ProbeThreadFunction()
continue;
}
+ // Check if the provisioner knows this worker is draining — if so,
+ // unreachability is expected and should not be logged as a warning.
+ bool IsDraining = false;
+ if (IProvisionerStateProvider* Prov = m_Provisioner.load(std::memory_order_acquire))
+ {
+ IsDraining = Prov->GetAgentStatus(Snap.Id) == AgentProvisioningStatus::Draining;
+ }
+
ReachableState NewState = ReachableState::Unreachable;
try
@@ -621,7 +639,10 @@ OrchestratorService::ProbeThreadFunction()
}
catch (const std::exception& Ex)
{
- ZEN_WARN("probe failed for worker {} ({}): {}", Snap.Id, Snap.Uri, Ex.what());
+ if (!IsDraining)
+ {
+ ZEN_WARN("probe failed for worker {} ({}): {}", Snap.Id, Snap.Uri, Ex.what());
+ }
}
ReachableState PrevState = ReachableState::Unknown;
@@ -646,6 +667,10 @@ OrchestratorService::ProbeThreadFunction()
{
ZEN_INFO("worker {} ({}) is now reachable", Snap.Id, Snap.Uri);
}
+ else if (IsDraining)
+ {
+ ZEN_INFO("worker {} ({}) shut down (draining)", Snap.Id, Snap.Uri);
+ }
else if (PrevState == ReachableState::Reachable)
{
ZEN_WARN("worker {} ({}) is no longer reachable", Snap.Id, Snap.Uri);
diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp
index 67e12b84e..ab22c6363 100644
--- a/src/zencompute/runners/functionrunner.cpp
+++ b/src/zencompute/runners/functionrunner.cpp
@@ -6,9 +6,15 @@
# include <zencore/compactbinary.h>
# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/logging.h>
+# include <zencore/string.h>
+# include <zencore/timer.h>
# include <zencore/trace.h>
+# include <zencore/workthreadpool.h>
# include <fmt/format.h>
+# include <future>
# include <vector>
namespace zen::compute {
@@ -118,23 +124,34 @@ std::vector<SubmitResult>
BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
{
ZEN_TRACE_CPU("BaseRunnerGroup::SubmitActions");
- RwLock::SharedLockScope _(m_RunnersLock);
- const int RunnerCount = gsl::narrow<int>(m_Runners.size());
+ // Snapshot runners and query capacity under the lock, then release
+ // before submitting — HTTP submissions to remote runners can take
+ // hundreds of milliseconds and we must not hold m_RunnersLock during I/O.
- if (RunnerCount == 0)
- {
- return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"});
- }
+ std::vector<Ref<FunctionRunner>> Runners;
+ std::vector<size_t> Capacities;
+ std::vector<std::vector<Ref<RunnerAction>>> PerRunnerActions;
+ size_t TotalCapacity = 0;
- // Query capacity per runner and compute total
- std::vector<size_t> Capacities(RunnerCount);
- size_t TotalCapacity = 0;
+ m_RunnersLock.WithSharedLock([&] {
+ const int RunnerCount = gsl::narrow<int>(m_Runners.size());
+ Runners.assign(m_Runners.begin(), m_Runners.end());
+ Capacities.resize(RunnerCount);
+ PerRunnerActions.resize(RunnerCount);
- for (int i = 0; i < RunnerCount; ++i)
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ Capacities[i] = Runners[i]->QueryCapacity();
+ TotalCapacity += Capacities[i];
+ }
+ });
+
+ const int RunnerCount = gsl::narrow<int>(Runners.size());
+
+ if (RunnerCount == 0)
{
- Capacities[i] = m_Runners[i]->QueryCapacity();
- TotalCapacity += Capacities[i];
+ return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"});
}
if (TotalCapacity == 0)
@@ -143,9 +160,8 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
}
// Distribute actions across runners proportionally to their available capacity
- std::vector<std::vector<Ref<RunnerAction>>> PerRunnerActions(RunnerCount);
- std::vector<size_t> ActionRunnerIndex(Actions.size());
- size_t ActionIdx = 0;
+ std::vector<size_t> ActionRunnerIndex(Actions.size());
+ size_t ActionIdx = 0;
for (int i = 0; i < RunnerCount; ++i)
{
@@ -176,14 +192,74 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
}
}
- // Submit batches per runner
+ // Submit batches per runner — in parallel when a worker pool is available
+
std::vector<std::vector<SubmitResult>> PerRunnerResults(RunnerCount);
+ int ActiveRunnerCount = 0;
for (int i = 0; i < RunnerCount; ++i)
{
if (!PerRunnerActions[i].empty())
{
- PerRunnerResults[i] = m_Runners[i]->SubmitActions(PerRunnerActions[i]);
+ ++ActiveRunnerCount;
+ }
+ }
+
+ static constexpr uint64_t SubmitWarnThresholdMs = 500;
+
+ auto SubmitToRunner = [&](int RunnerIndex) {
+ auto& Runner = Runners[RunnerIndex];
+ Runner->m_LastSubmitStats.Reset();
+
+ Stopwatch Timer;
+
+ PerRunnerResults[RunnerIndex] = Runner->SubmitActions(PerRunnerActions[RunnerIndex]);
+
+ uint64_t ElapsedMs = Timer.GetElapsedTimeMs();
+ if (ElapsedMs >= SubmitWarnThresholdMs)
+ {
+ size_t Attachments = Runner->m_LastSubmitStats.TotalAttachments.load(std::memory_order_relaxed);
+ uint64_t AttachmentBytes = Runner->m_LastSubmitStats.TotalAttachmentBytes.load(std::memory_order_relaxed);
+
+ ZEN_WARN("submit of {} actions ({} attachments, {}) to '{}' took {}ms",
+ PerRunnerActions[RunnerIndex].size(),
+ Attachments,
+ NiceBytes(AttachmentBytes),
+ Runner->GetDisplayName(),
+ ElapsedMs);
+ }
+ };
+
+ if (m_WorkerPool && ActiveRunnerCount > 1)
+ {
+ std::vector<std::future<void>> Futures(RunnerCount);
+
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (!PerRunnerActions[i].empty())
+ {
+ std::packaged_task<void()> Task([&SubmitToRunner, i]() { SubmitToRunner(i); });
+
+ Futures[i] = m_WorkerPool->EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog);
+ }
+ }
+
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (Futures[i].valid())
+ {
+ Futures[i].get();
+ }
+ }
+ }
+ else
+ {
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (!PerRunnerActions[i].empty())
+ {
+ SubmitToRunner(i);
+ }
}
}
@@ -309,10 +385,11 @@ RunnerAction::RetractAction()
bool
RunnerAction::ResetActionStateToPending()
{
- // Only allow reset from Failed, Abandoned, or Retracted states
+ // Only allow reset from Failed, Abandoned, Rejected, or Retracted states
State CurrentState = m_ActionState.load();
- if (CurrentState != State::Failed && CurrentState != State::Abandoned && CurrentState != State::Retracted)
+ if (CurrentState != State::Failed && CurrentState != State::Abandoned && CurrentState != State::Rejected &&
+ CurrentState != State::Retracted)
{
return false;
}
@@ -333,11 +410,12 @@ RunnerAction::ResetActionStateToPending()
// Clear execution fields
ExecutionLocation.clear();
+ FailureReason.clear();
CpuUsagePercent.store(-1.0f, std::memory_order_relaxed);
CpuSeconds.store(0.0f, std::memory_order_relaxed);
- // Increment retry count (skip for Retracted — nothing failed)
- if (CurrentState != State::Retracted)
+ // Increment retry count (skip for Retracted/Rejected — nothing failed)
+ if (CurrentState != State::Retracted && CurrentState != State::Rejected)
{
RetryCount.fetch_add(1, std::memory_order_relaxed);
}
diff --git a/src/zencompute/runners/functionrunner.h b/src/zencompute/runners/functionrunner.h
index 56c3f3af0..449f0e228 100644
--- a/src/zencompute/runners/functionrunner.h
+++ b/src/zencompute/runners/functionrunner.h
@@ -10,6 +10,10 @@
# include <filesystem>
# include <vector>
+namespace zen {
+class WorkerThreadPool;
+}
+
namespace zen::compute {
struct SubmitResult
@@ -37,6 +41,22 @@ public:
[[nodiscard]] virtual bool IsHealthy() = 0;
[[nodiscard]] virtual size_t QueryCapacity();
[[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
+ [[nodiscard]] virtual std::string_view GetDisplayName() const { return "local"; }
+
+ // Accumulated stats from the most recent SubmitActions call.
+ // Reset before each call, populated by the runner implementation.
+ struct SubmitStats
+ {
+ std::atomic<size_t> TotalAttachments{0};
+ std::atomic<uint64_t> TotalAttachmentBytes{0};
+
+ void Reset()
+ {
+ TotalAttachments.store(0, std::memory_order_relaxed);
+ TotalAttachmentBytes.store(0, std::memory_order_relaxed);
+ }
+ };
+ SubmitStats m_LastSubmitStats;
// Best-effort cancellation of a specific in-flight action. Returns true if the
// cancellation signal was successfully sent. The action will transition to Cancelled
@@ -68,6 +88,8 @@ public:
bool CancelAction(int ActionLsn);
void CancelRemoteQueue(int QueueId);
+ void SetWorkerPool(WorkerThreadPool* Pool) { m_WorkerPool = Pool; }
+
size_t GetRunnerCount()
{
return m_RunnersLock.WithSharedLock([this] { return m_Runners.size(); });
@@ -79,6 +101,7 @@ protected:
RwLock m_RunnersLock;
std::vector<Ref<FunctionRunner>> m_Runners;
std::atomic<int> m_NextSubmitIndex{0};
+ WorkerThreadPool* m_WorkerPool = nullptr;
};
/** Typed RunnerGroup that adds type-safe runner addition and predicate-based removal.
@@ -151,6 +174,7 @@ struct RunnerAction : public RefCounted
CbObject ActionObj;
int Priority = 0;
std::string ExecutionLocation; // "local" or remote hostname
+ std::string FailureReason; // human-readable reason when action fails (empty on success)
// CPU usage and total CPU time of the running process, sampled periodically by the local runner.
// CpuUsagePercent: -1.0 means not yet sampled; >=0.0 is the most recent reading as a percentage.
@@ -168,6 +192,7 @@ struct RunnerAction : public RefCounted
Completed, // Finished successfully with results available
Failed, // Execution failed (transient error, eligible for retry)
Abandoned, // Infrastructure termination (e.g. spot eviction, session abandon)
+ Rejected, // Runner declined (e.g. at capacity) — rescheduled without retry cost
Cancelled, // Intentional user cancellation (never retried)
Retracted, // Pulled back for rescheduling on a different runner (no retry cost)
_Count
@@ -194,6 +219,8 @@ struct RunnerAction : public RefCounted
return "Failed";
case State::Abandoned:
return "Abandoned";
+ case State::Rejected:
+ return "Rejected";
case State::Cancelled:
return "Cancelled";
case State::Retracted:
diff --git a/src/zencompute/runners/linuxrunner.cpp b/src/zencompute/runners/linuxrunner.cpp
index 9055005d9..ce5bbdcc8 100644
--- a/src/zencompute/runners/linuxrunner.cpp
+++ b/src/zencompute/runners/linuxrunner.cpp
@@ -430,7 +430,8 @@ LinuxProcessRunner::SubmitAction(Ref<RunnerAction> Action)
if (ChildPid == 0)
{
- // Child process
+ // Child process — lower priority so workers don't starve the main server
+ nice(5);
if (m_Sandboxed)
{
@@ -481,7 +482,8 @@ LinuxProcessRunner::SubmitAction(Ref<RunnerAction> Action)
// Clean up the sandbox in the background
m_DeferredDeleter.Enqueue(Action->ActionLsn, std::move(Prepared->SandboxPath));
- ZEN_ERROR("Sandbox setup failed for action {}: {}", Action->ActionLsn, ErrBuf);
+ Action->FailureReason = fmt::format("sandbox setup failed: {}", ErrBuf);
+ ZEN_ERROR("action {} ({}): {}", Action->ActionId, Action->ActionLsn, Action->FailureReason);
Action->SetActionState(RunnerAction::State::Failed);
return SubmitResult{.IsAccepted = false};
diff --git a/src/zencompute/runners/localrunner.cpp b/src/zencompute/runners/localrunner.cpp
index 1b748c0e5..96cbdc134 100644
--- a/src/zencompute/runners/localrunner.cpp
+++ b/src/zencompute/runners/localrunner.cpp
@@ -357,14 +357,21 @@ LocalProcessRunner::ManifestWorker(const WorkerDesc& Worker)
std::filesystem::path WorkerDir = m_WorkerPath / fmt::format("runner_{}", Worker.WorkerId);
- if (!std::filesystem::exists(WorkerDir))
+ // worker.zcb is written as the last step of ManifestWorker, so its presence
+ // indicates a complete manifest. If the directory exists but the marker is
+ // missing, a previous manifest was interrupted and we need to start over.
+ bool NeedsManifest = !std::filesystem::exists(WorkerDir / "worker.zcb");
+
+ if (NeedsManifest)
{
_.ReleaseNow();
RwLock::ExclusiveLockScope $(m_WorkerLock);
- if (!std::filesystem::exists(WorkerDir))
+ if (!std::filesystem::exists(WorkerDir / "worker.zcb"))
{
+ std::error_code Ec;
+ std::filesystem::remove_all(WorkerDir, Ec);
ManifestWorker(Worker.Descriptor, WorkerDir, [](const IoHash&, CompressedBuffer&) {});
}
}
@@ -673,9 +680,15 @@ LocalProcessRunner::ProcessCompletedActions(std::vector<Ref<RunningAction>>& Com
}
catch (std::exception& Ex)
{
- ZEN_ERROR("Encountered failure while gathering outputs for action lsn {}, '{}'", ActionLsn, Ex.what());
+ Running->Action->FailureReason = fmt::format("exception gathering outputs: {}", Ex.what());
+ ZEN_ERROR("action {} ({}) failed: {}", Running->Action->ActionId, ActionLsn, Running->Action->FailureReason);
}
}
+ else
+ {
+ Running->Action->FailureReason = fmt::format("process exited with code {}", Running->ExitCode);
+ ZEN_WARN("action {} ({}) failed: {}", Running->Action->ActionId, ActionLsn, Running->Action->FailureReason);
+ }
// Failed - clean up the sandbox in the background.
diff --git a/src/zencompute/runners/macrunner.cpp b/src/zencompute/runners/macrunner.cpp
index c2ccca9a6..13c01d988 100644
--- a/src/zencompute/runners/macrunner.cpp
+++ b/src/zencompute/runners/macrunner.cpp
@@ -211,7 +211,8 @@ MacProcessRunner::SubmitAction(Ref<RunnerAction> Action)
if (ChildPid == 0)
{
- // Child process
+ // Child process — lower priority so workers don't starve the main server
+ nice(5);
if (m_Sandboxed)
{
@@ -281,7 +282,8 @@ MacProcessRunner::SubmitAction(Ref<RunnerAction> Action)
// Clean up the sandbox in the background
m_DeferredDeleter.Enqueue(Action->ActionLsn, std::move(Prepared->SandboxPath));
- ZEN_ERROR("Sandbox setup failed for action {}: {}", Action->ActionLsn, ErrBuf);
+ Action->FailureReason = fmt::format("sandbox setup failed: {}", ErrBuf);
+ ZEN_ERROR("action {} ({}): {}", Action->ActionId, Action->ActionLsn, Action->FailureReason);
Action->SetActionState(RunnerAction::State::Failed);
return SubmitResult{.IsAccepted = false};
diff --git a/src/zencompute/runners/managedrunner.cpp b/src/zencompute/runners/managedrunner.cpp
index e4a7ba388..a4f586852 100644
--- a/src/zencompute/runners/managedrunner.cpp
+++ b/src/zencompute/runners/managedrunner.cpp
@@ -128,7 +128,7 @@ ManagedProcessRunner::SubmitAction(Ref<RunnerAction> Action)
CreateProcOptions Options;
Options.WorkingDirectory = &Prepared->SandboxPath;
- Options.Flags = CreateProcOptions::Flag_NoConsole;
+ Options.Flags = CreateProcOptions::Flag_NoConsole | CreateProcOptions::Flag_BelowNormalPriority;
Options.Environment = std::move(EnvPairs);
const int32_t ActionLsn = Prepared->ActionLsn;
diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp
index ce6a81173..55f78fdd6 100644
--- a/src/zencompute/runners/remotehttprunner.cpp
+++ b/src/zencompute/runners/remotehttprunner.cpp
@@ -20,6 +20,7 @@
# include <zenstore/cidstore.h>
# include <span>
+# include <unordered_set>
//////////////////////////////////////////////////////////////////////////
@@ -38,6 +39,7 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver,
, m_ChunkResolver{InChunkResolver}
, m_WorkerPool{InWorkerPool}
, m_HostName{HostName}
+, m_DisplayName{HostName}
, m_BaseUrl{fmt::format("{}/compute", HostName)}
, m_Http(m_BaseUrl)
, m_InstanceId(Oid::NewOid())
@@ -59,6 +61,15 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver,
m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this};
}
+void
+RemoteHttpRunner::SetRemoteHostname(std::string_view Hostname)
+{
+ if (!Hostname.empty())
+ {
+ m_DisplayName = fmt::format("{} ({})", m_HostName, Hostname);
+ }
+}
+
RemoteHttpRunner::~RemoteHttpRunner()
{
Shutdown();
@@ -108,6 +119,7 @@ RemoteHttpRunner::Shutdown()
for (auto& [RemoteLsn, HttpAction] : Remaining)
{
ZEN_DEBUG("shutdown: marking remote action LSN {} (local LSN {}) as Failed", RemoteLsn, HttpAction.Action->ActionLsn);
+ HttpAction.Action->FailureReason = "remote runner shutdown";
HttpAction.Action->SetActionState(RunnerAction::State::Failed);
}
}
@@ -213,11 +225,13 @@ RemoteHttpRunner::QueryCapacity()
return 0;
}
- // Estimate how much more work we're ready to accept
+ // Estimate how much more work we're ready to accept.
+ // Include actions currently being submitted over HTTP so we don't
+ // keep queueing new submissions while previous ones are still in flight.
RwLock::SharedLockScope _{m_RunningLock};
- size_t RunningCount = m_RemoteRunningMap.size();
+ size_t RunningCount = m_RemoteRunningMap.size() + m_InFlightSubmissions.load(std::memory_order_relaxed);
if (RunningCount >= size_t(m_MaxRunningActions))
{
@@ -232,6 +246,9 @@ RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
{
ZEN_TRACE_CPU("RemoteHttpRunner::SubmitActions");
+ m_InFlightSubmissions.fetch_add(Actions.size(), std::memory_order_relaxed);
+ auto InFlightGuard = MakeGuard([&] { m_InFlightSubmissions.fetch_sub(Actions.size(), std::memory_order_relaxed); });
+
if (Actions.size() <= 1)
{
std::vector<SubmitResult> Results;
@@ -359,108 +376,141 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
}
}
- // Enqueue job. If the remote returns FailedDependency (424), it means it
- // cannot resolve the worker/function — re-register the worker and retry once.
+ // Submit the action to the remote. In eager-attach mode we build a
+ // CbPackage with all referenced attachments upfront to avoid the 404
+ // round-trip. In the default mode we POST the bare object first and
+ // only upload missing attachments if the remote requests them.
+ //
+ // In both modes, FailedDependency (424) triggers a worker re-register
+ // and a single retry.
CbObject Result;
HttpClient::Response WorkResponse;
HttpResponseCode WorkResponseCode{};
- for (int Attempt = 0; Attempt < 2; ++Attempt)
- {
- WorkResponse = m_Http.Post(SubmitUrl, ActionObj);
- WorkResponseCode = WorkResponse.StatusCode;
-
- if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0)
- {
- ZEN_WARN("remote {} returned FailedDependency for action {} — re-registering worker and retrying",
- m_Http.GetBaseUri(),
- ActionId);
-
- (void)RegisterWorker(Action->Worker.Descriptor);
- }
- else
- {
- break;
- }
- }
-
- if (WorkResponseCode == HttpResponseCode::OK)
- {
- Result = WorkResponse.AsObject();
- }
- else if (WorkResponseCode == HttpResponseCode::NotFound)
+ if (m_EagerAttach)
{
- // Not all attachments are present
-
- // Build response package including all required attachments
-
CbPackage Pkg;
Pkg.SetObject(ActionObj);
- CbObject Response = WorkResponse.AsObject();
+ ActionObj.IterateAttachments([&](CbFieldView Field) {
+ const IoHash AttachHash = Field.AsHash();
- for (auto& Item : Response["need"sv])
- {
- const IoHash NeedHash = Item.AsHash();
-
- if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(AttachHash))
{
uint64_t DataRawSize = 0;
IoHash DataRawHash;
CompressedBuffer Compressed =
CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
- ZEN_ASSERT(DataRawHash == NeedHash);
+ Pkg.AddAttachment(CbAttachment(Compressed, AttachHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
+ }
+ });
+
+ for (int Attempt = 0; Attempt < 2; ++Attempt)
+ {
+ WorkResponse = m_Http.Post(SubmitUrl, Pkg);
+ WorkResponseCode = WorkResponse.StatusCode;
+
+ if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0)
+ {
+ ZEN_WARN("remote {} returned FailedDependency for action {} — re-registering worker and retrying",
+ m_Http.GetBaseUri(),
+ ActionId);
- Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ (void)RegisterWorker(Action->Worker.Descriptor);
}
else
{
- // No such attachment
-
- return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)};
+ break;
}
}
+ }
+ else
+ {
+ for (int Attempt = 0; Attempt < 2; ++Attempt)
+ {
+ WorkResponse = m_Http.Post(SubmitUrl, ActionObj);
+ WorkResponseCode = WorkResponse.StatusCode;
- // Post resulting package
+ if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0)
+ {
+ ZEN_WARN("remote {} returned FailedDependency for action {} — re-registering worker and retrying",
+ m_Http.GetBaseUri(),
+ ActionId);
- HttpClient::Response PayloadResponse = m_Http.Post(SubmitUrl, Pkg);
+ (void)RegisterWorker(Action->Worker.Descriptor);
+ }
+ else
+ {
+ break;
+ }
+ }
- if (!PayloadResponse)
+ if (WorkResponseCode == HttpResponseCode::NotFound)
{
- ZEN_WARN("unable to register payloads for action {} at {}{}", ActionId, m_Http.GetBaseUri(), SubmitUrl);
+ // Remote needs attachments — resolve them and retry with a CbPackage
- // TODO: include more information about the failure in the response
+ CbPackage Pkg;
+ Pkg.SetObject(ActionObj);
- return {.IsAccepted = false, .Reason = "HTTP request failed"};
- }
- else if (PayloadResponse.StatusCode == HttpResponseCode::OK)
- {
- Result = PayloadResponse.AsObject();
- }
- else
- {
- // Unexpected response
-
- const int ResponseStatusCode = (int)PayloadResponse.StatusCode;
-
- ZEN_WARN("unable to register payloads for action {} at {}{} (error: {} {})",
- ActionId,
- m_Http.GetBaseUri(),
- SubmitUrl,
- ResponseStatusCode,
- ToString(ResponseStatusCode));
-
- return {.IsAccepted = false,
- .Reason = fmt::format("unexpected response code {} {} from {}{}",
- ResponseStatusCode,
- ToString(ResponseStatusCode),
- m_Http.GetBaseUri(),
- SubmitUrl)};
+ CbObject Response = WorkResponse.AsObject();
+
+ for (auto& Item : Response["need"sv])
+ {
+ const IoHash NeedHash = Item.AsHash();
+
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ {
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
+
+ ZEN_ASSERT(DataRawHash == NeedHash);
+
+ Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
+ }
+ else
+ {
+ return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)};
+ }
+ }
+
+ HttpClient::Response PayloadResponse = m_Http.Post(SubmitUrl, Pkg);
+
+ if (!PayloadResponse)
+ {
+ ZEN_WARN("unable to register payloads for action {} at {}{}", ActionId, m_Http.GetBaseUri(), SubmitUrl);
+ return {.IsAccepted = false, .Reason = "HTTP request failed"};
+ }
+
+ WorkResponse = std::move(PayloadResponse);
+ WorkResponseCode = WorkResponse.StatusCode;
}
}
+ if (WorkResponseCode == HttpResponseCode::OK)
+ {
+ Result = WorkResponse.AsObject();
+ }
+ else if (!WorkResponse)
+ {
+ ZEN_WARN("submit of action {} to {}{} failed", ActionId, m_Http.GetBaseUri(), SubmitUrl);
+ return {.IsAccepted = false, .Reason = "HTTP request failed"};
+ }
+ else if (!IsHttpSuccessCode(WorkResponseCode))
+ {
+ const int Code = static_cast<int>(WorkResponseCode);
+ ZEN_WARN("submit of action {} to {}{} returned {} {}", ActionId, m_Http.GetBaseUri(), SubmitUrl, Code, ToString(Code));
+ return {.IsAccepted = false,
+ .Reason = fmt::format("unexpected response code {} {} from {}{}", Code, ToString(Code), m_Http.GetBaseUri(), SubmitUrl)};
+ }
+
if (Result)
{
if (const int32_t LsnField = Result["lsn"].AsInt32(0))
@@ -512,82 +562,110 @@ RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vec
CbObjectWriter Body;
Body.BeginArray("actions"sv);
+ std::unordered_set<IoHash, IoHash::Hasher> AttachmentsSeen;
+
for (const Ref<RunnerAction>& Action : Actions)
{
Action->ExecutionLocation = m_HostName;
MaybeDumpAction(Action->ActionLsn, Action->ActionObj);
Body.AddObject(Action->ActionObj);
+
+ if (m_EagerAttach)
+ {
+ Action->ActionObj.IterateAttachments([&](CbFieldView Field) { AttachmentsSeen.insert(Field.AsHash()); });
+ }
}
Body.EndArray();
- // POST the batch
-
- HttpClient::Response Response = m_Http.Post(SubmitUrl, Body.Save());
-
- if (Response.StatusCode == HttpResponseCode::OK)
- {
- return ParseBatchResponse(Response, Actions);
- }
+ // In eager-attach mode, build a CbPackage with all referenced attachments
+ // so the remote can accept in a single round-trip. Otherwise POST a bare
+ // CbObject and handle the 404 need-list flow.
- if (Response.StatusCode == HttpResponseCode::NotFound)
+ if (m_EagerAttach)
{
- // Server needs attachments — resolve them and retry with a CbPackage
-
- CbObject NeedObj = Response.AsObject();
-
CbPackage Pkg;
Pkg.SetObject(Body.Save());
- for (auto& Item : NeedObj["need"sv])
+ for (const IoHash& AttachHash : AttachmentsSeen)
{
- const IoHash NeedHash = Item.AsHash();
-
- if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(AttachHash))
{
uint64_t DataRawSize = 0;
IoHash DataRawHash;
CompressedBuffer Compressed =
CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
- ZEN_ASSERT(DataRawHash == NeedHash);
-
- Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
- }
- else
- {
- ZEN_WARN("batch submit: missing attachment {} — falling back to individual submit", NeedHash);
- return FallbackToIndividualSubmit(Actions);
+ Pkg.AddAttachment(CbAttachment(Compressed, AttachHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
}
}
- HttpClient::Response RetryResponse = m_Http.Post(SubmitUrl, Pkg);
+ HttpClient::Response Response = m_Http.Post(SubmitUrl, Pkg);
+
+ if (Response.StatusCode == HttpResponseCode::OK)
+ {
+ return ParseBatchResponse(Response, Actions);
+ }
+ }
+ else
+ {
+ HttpClient::Response Response = m_Http.Post(SubmitUrl, Body.Save());
- if (RetryResponse.StatusCode == HttpResponseCode::OK)
+ if (Response.StatusCode == HttpResponseCode::OK)
{
- return ParseBatchResponse(RetryResponse, Actions);
+ return ParseBatchResponse(Response, Actions);
}
- ZEN_WARN("batch submit retry failed with {} {} — falling back to individual submit",
- (int)RetryResponse.StatusCode,
- ToString(RetryResponse.StatusCode));
- return FallbackToIndividualSubmit(Actions);
+ if (Response.StatusCode == HttpResponseCode::NotFound)
+ {
+ CbObject NeedObj = Response.AsObject();
+
+ CbPackage Pkg;
+ Pkg.SetObject(Body.Save());
+
+ for (auto& Item : NeedObj["need"sv])
+ {
+ const IoHash NeedHash = Item.AsHash();
+
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ {
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
+
+ ZEN_ASSERT(DataRawHash == NeedHash);
+
+ Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
+ }
+ else
+ {
+ ZEN_WARN("batch submit: missing attachment {} — falling back to individual submit", NeedHash);
+ return FallbackToIndividualSubmit(Actions);
+ }
+ }
+
+ HttpClient::Response RetryResponse = m_Http.Post(SubmitUrl, Pkg);
+
+ if (RetryResponse.StatusCode == HttpResponseCode::OK)
+ {
+ return ParseBatchResponse(RetryResponse, Actions);
+ }
+
+ ZEN_WARN("batch submit retry failed with {} {} — falling back to individual submit",
+ (int)RetryResponse.StatusCode,
+ ToString(RetryResponse.StatusCode));
+ return FallbackToIndividualSubmit(Actions);
+ }
}
// Unexpected status or connection error — fall back to individual submission
- if (Response)
- {
- ZEN_WARN("batch submit to {}{} returned {} {} — falling back to individual submit",
- m_Http.GetBaseUri(),
- SubmitUrl,
- (int)Response.StatusCode,
- ToString(Response.StatusCode));
- }
- else
- {
- ZEN_WARN("batch submit to {}{} failed — falling back to individual submit", m_Http.GetBaseUri(), SubmitUrl);
- }
+ ZEN_WARN("batch submit to {}{} failed — falling back to individual submit", m_Http.GetBaseUri(), SubmitUrl);
return FallbackToIndividualSubmit(Actions);
}
@@ -869,9 +947,10 @@ RemoteHttpRunner::SweepRunningActions()
{
for (auto& FieldIt : Completed["completed"sv])
{
- CbObjectView EntryObj = FieldIt.AsObjectView();
- const int32_t CompleteLsn = EntryObj["lsn"sv].AsInt32();
- std::string_view StateName = EntryObj["state"sv].AsString();
+ CbObjectView EntryObj = FieldIt.AsObjectView();
+ const int32_t CompleteLsn = EntryObj["lsn"sv].AsInt32();
+ std::string_view StateName = EntryObj["state"sv].AsString();
+ std::string_view FailureReason = EntryObj["reason"sv].AsString();
RunnerAction::State RemoteState = RunnerAction::FromString(StateName);
@@ -884,6 +963,7 @@ RemoteHttpRunner::SweepRunningActions()
{
HttpRunningAction CompletedAction = std::move(CompleteIt->second);
CompletedAction.RemoteState = RemoteState;
+ CompletedAction.FailureReason = std::string(FailureReason);
if (RemoteState == RunnerAction::State::Completed && ResponseJob)
{
@@ -927,16 +1007,44 @@ RemoteHttpRunner::SweepRunningActions()
{
const int ActionLsn = HttpAction.Action->ActionLsn;
- ZEN_DEBUG("action {} LSN {} (remote LSN {}) -> {}",
- HttpAction.Action->ActionId,
- ActionLsn,
- HttpAction.RemoteActionLsn,
- RunnerAction::ToString(HttpAction.RemoteState));
-
if (HttpAction.RemoteState == RunnerAction::State::Completed)
{
+ ZEN_DEBUG("action {} LSN {} (remote LSN {}) completed on {}",
+ HttpAction.Action->ActionId,
+ ActionLsn,
+ HttpAction.RemoteActionLsn,
+ m_HostName);
HttpAction.Action->SetResult(std::move(HttpAction.ActionResults));
}
+ else if (HttpAction.RemoteState == RunnerAction::State::Failed || HttpAction.RemoteState == RunnerAction::State::Abandoned)
+ {
+ HttpAction.Action->FailureReason = HttpAction.FailureReason;
+ if (HttpAction.FailureReason.empty())
+ {
+ ZEN_WARN("action {} ({}) {} on remote {}",
+ HttpAction.Action->ActionId,
+ ActionLsn,
+ RunnerAction::ToString(HttpAction.RemoteState),
+ m_HostName);
+ }
+ else
+ {
+ ZEN_WARN("action {} ({}) {} on remote {}: {}",
+ HttpAction.Action->ActionId,
+ ActionLsn,
+ RunnerAction::ToString(HttpAction.RemoteState),
+ m_HostName,
+ HttpAction.FailureReason);
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("action {} LSN {} (remote LSN {}) -> {}",
+ HttpAction.Action->ActionId,
+ ActionLsn,
+ HttpAction.RemoteActionLsn,
+ RunnerAction::ToString(HttpAction.RemoteState));
+ }
HttpAction.Action->SetActionState(HttpAction.RemoteState);
}
diff --git a/src/zencompute/runners/remotehttprunner.h b/src/zencompute/runners/remotehttprunner.h
index c17d0cf2a..fdf113c77 100644
--- a/src/zencompute/runners/remotehttprunner.h
+++ b/src/zencompute/runners/remotehttprunner.h
@@ -54,8 +54,10 @@ public:
[[nodiscard]] virtual size_t QueryCapacity() override;
[[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;
virtual void CancelRemoteQueue(int QueueId) override;
+ [[nodiscard]] virtual std::string_view GetDisplayName() const override { return m_DisplayName; }
std::string_view GetHostName() const { return m_HostName; }
+ void SetRemoteHostname(std::string_view Hostname);
protected:
LoggerRef Log() { return m_Log; }
@@ -65,12 +67,15 @@ private:
ChunkResolver& m_ChunkResolver;
WorkerThreadPool& m_WorkerPool;
std::string m_HostName;
+ std::string m_DisplayName;
std::string m_BaseUrl;
HttpClient m_Http;
- std::atomic<bool> m_AcceptNewActions{true};
- int32_t m_MaxRunningActions = 256; // arbitrary limit for testing
- int32_t m_MaxBatchSize = 50;
+ std::atomic<bool> m_AcceptNewActions{true};
+ int32_t m_MaxRunningActions = 256; // arbitrary limit for testing
+ int32_t m_MaxBatchSize = 50;
+ bool m_EagerAttach = true; ///< Send attachments with every submit instead of the two-step 404 retry
+ std::atomic<size_t> m_InFlightSubmissions{0}; // actions currently being submitted over HTTP
struct HttpRunningAction
{
@@ -78,6 +83,7 @@ private:
int RemoteActionLsn = 0; // Remote LSN
RunnerAction::State RemoteState = RunnerAction::State::Failed;
CbPackage ActionResults;
+ std::string FailureReason;
};
RwLock m_RunningLock;
diff --git a/src/zencompute/runners/windowsrunner.cpp b/src/zencompute/runners/windowsrunner.cpp
index 92ee65c2d..e643c9ce8 100644
--- a/src/zencompute/runners/windowsrunner.cpp
+++ b/src/zencompute/runners/windowsrunner.cpp
@@ -48,7 +48,9 @@ WindowsProcessRunner::WindowsProcessRunner(ChunkResolver& Resolver,
if (m_JobObject)
{
JOBOBJECT_EXTENDED_LIMIT_INFORMATION ExtLimits{};
- ExtLimits.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE | JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION;
+ ExtLimits.BasicLimitInformation.LimitFlags =
+ JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE | JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION | JOB_OBJECT_LIMIT_PRIORITY_CLASS;
+ ExtLimits.BasicLimitInformation.PriorityClass = BELOW_NORMAL_PRIORITY_CLASS;
SetInformationJobObject(m_JobObject, JobObjectExtendedLimitInformation, &ExtLimits, sizeof(ExtLimits));
JOBOBJECT_BASIC_UI_RESTRICTIONS UiRestrictions{};
diff --git a/src/zencompute/runners/winerunner.cpp b/src/zencompute/runners/winerunner.cpp
index b4fafb467..593b19e55 100644
--- a/src/zencompute/runners/winerunner.cpp
+++ b/src/zencompute/runners/winerunner.cpp
@@ -96,7 +96,9 @@ WineProcessRunner::SubmitAction(Ref<RunnerAction> Action)
if (ChildPid == 0)
{
- // Child process
+ // Child process — lower priority so workers don't starve the main server
+ nice(5);
+
if (chdir(SandboxPathStr.c_str()) != 0)
{
_exit(127);