aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/computeservice.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-13 16:38:16 +0200
committerGitHub Enterprise <[email protected]>2026-04-13 16:38:16 +0200
commit795345e5fd7974a1f5227d507a58bb3ed75eafd5 (patch)
tree7a0f142bf562c3590400586c82b0e7a1b5ad6493 /src/zencompute/computeservice.cpp
parent5.8.4-pre2 (diff)
downloadzen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.tar.xz
zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.zip
Compute OIDC auth, async Horde agents, and orchestrator improvements (#913)
Rework of the Horde agent subsystem from synchronous per-thread I/O to an async ASIO-driven architecture, plus provisioner scale-down with graceful draining, OIDC authentication, scheduler improvements, and dashboard UI for provisioner control. ### Async Horde Agent Rewrite - Replace synchronous `HordeAgent` (one thread per agent, blocking I/O) with `AsyncHordeAgent` — an ASIO state machine running on a shared `io_context` thread pool - Replace `TcpComputeTransport`/`AesComputeTransport` with `AsyncTcpComputeTransport`/`AsyncAesComputeTransport` - Replace `AgentMessageChannel` with `AsyncAgentMessageChannel` using frame queuing and ASIO timers - Delete `ComputeBuffer` and `ComputeChannel` ring-buffer classes (no longer needed) ### Provisioner Drain / Scale-Down - `HordeProvisioner` can now drain agents when target core count is lowered: queries each agent's `/compute/session/status` for workload, selects candidates by largest-fit/lowest-workload, and sends `/compute/session/drain` - Configurable `--horde-drain-grace-period` (default 300s) before force-kill - Implement `IProvisionerStateProvider` interface to expose provisioner state to the orchestrator HTTP layer - Forward `--coordinator-session`, `--provision-clean`, and `--provision-tracehost` through both Horde and Nomad provisioners to spawned workers ### OIDC Authentication - `HordeClient` accepts an `AccessTokenProvider` (refreshable token function) as alternative to static `--horde-token` - Wire up `OidcToken.exe` auto-discovery via `httpclientauth::CreateFromOidcTokenExecutable` with `--HordeUrl` mode - New `--horde-oidctoken-exe-path` CLI option for explicit path override ### Orchestrator & Scheduler - Orchestrator generates a session ID at startup; workers include `coordinator_session` in announcements so the orchestrator can reject stale-session workers - New `Rejected` action state — when a remote runner declines at capacity, the action is rescheduled without retry count increment - Reduce scheduler lock contention: snapshot pending actions under shared lock, sort/trim outside the lock - Parallelize remote action submission across runners via `WorkerThreadPool` with slow-submit warnings - New action field `FailureReason` populated by all runner types (exit codes, sandbox failures, exceptions) - New endpoints: `session/drain`, `session/status`, `session/sunset`, `provisioner/status`, `provisioner/target` ### Remote Execution - Eager-attach mode for `RemoteHttpRunner` — bundles all attachments upfront in a `CbPackage` for single-roundtrip submits - Track in-flight submissions to prevent over-queuing - Show remote runner hostname in `GetDisplayName()` - `--announce-url` to override the endpoint announced to the coordinator (e.g. relay-visible address) ### Frontend Dashboard - Delete standalone `compute.html` (925 lines) and `orchestrator.html` (669 lines), consolidated into JS page modules - Add provisioner panel to orchestrator dashboard: target/active/estimated core counts, draining agent count - Editable target-cores input with debounced POST to `/orch/provisioner/target` - Per-agent provisioning status badges (active / draining / deallocated) in the agents table - Active vs total CPU counts in agents summary row ### CLI - New `zen compute record-start` / `record-stop` subcommands - `zen exec` progress bar with submit and completion phases, atomic work counters, `--progress` mode (Pretty/Plain/Quiet) ### Other - `DataDir` supports environment variable expansion - Worker manifest validation checks for `worker.zcb` marker to detect incomplete cached directories - Linux/Mac runners `nice(5)` child processes to avoid starving the main server - `ComputeService::SetShutdownCallback` wired to `RequestExit` via `session/sunset` - Curl HTTP client logs effective URL on failure - `MachineInfo` carries `Pool` and `Mode` from Horde response - Horde bundle creation includes `.pdb` on Windows
Diffstat (limited to 'src/zencompute/computeservice.cpp')
-rw-r--r--src/zencompute/computeservice.cpp164
1 files changed, 117 insertions, 47 deletions
diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp
index aaf34cbe2..852e93fa0 100644
--- a/src/zencompute/computeservice.cpp
+++ b/src/zencompute/computeservice.cpp
@@ -121,6 +121,8 @@ struct ComputeServiceSession::Impl
, m_LocalSubmitPool(GetLargeWorkerPool(EWorkloadType::Burst))
, m_RemoteSubmitPool(GetLargeWorkerPool(EWorkloadType::Burst))
{
+ m_RemoteRunnerGroup.SetWorkerPool(&m_RemoteSubmitPool);
+
// Create a non-expiring, non-deletable implicit queue for legacy endpoints
auto Result = CreateQueue("implicit"sv, {}, {});
m_ImplicitQueueId = Result.QueueId;
@@ -240,8 +242,9 @@ struct ComputeServiceSession::Impl
// Recording
- void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath);
- void StopRecording();
+ bool StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath);
+ bool StopRecording();
+ bool IsRecording() const;
std::unique_ptr<ActionRecorder> m_Recorder;
@@ -615,6 +618,7 @@ ComputeServiceSession::Impl::UpdateCoordinatorState()
m_KnownWorkerUris.insert(UriStr);
auto* NewRunner = new RemoteHttpRunner(m_ChunkResolver, m_OrchestratorBasePath, UriStr, m_RemoteSubmitPool);
+ NewRunner->SetRemoteHostname(Hostname);
SyncWorkersToRunner(*NewRunner);
m_RemoteRunnerGroup.AddRunner(NewRunner);
}
@@ -716,24 +720,44 @@ ComputeServiceSession::Impl::ShutdownRunners()
m_RemoteRunnerGroup.Shutdown();
}
-void
+bool
ComputeServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath)
{
+ if (m_Recorder)
+ {
+ ZEN_WARN("recording is already active");
+ return false;
+ }
+
ZEN_INFO("starting recording to '{}'", RecordingPath);
m_Recorder = std::make_unique<ActionRecorder>(InCidStore, RecordingPath);
ZEN_INFO("started recording to '{}'", RecordingPath);
+ return true;
}
-void
+bool
ComputeServiceSession::Impl::StopRecording()
{
+ if (!m_Recorder)
+ {
+ ZEN_WARN("no recording is active");
+ return false;
+ }
+
ZEN_INFO("stopping recording");
m_Recorder = nullptr;
ZEN_INFO("stopped recording");
+ return true;
+}
+
+bool
+ComputeServiceSession::Impl::IsRecording() const
+{
+ return m_Recorder != nullptr;
}
std::vector<ComputeServiceSession::RunningActionInfo>
@@ -1128,6 +1152,10 @@ ComputeServiceSession::Impl::GetCompleted(CbWriter& Cbo)
Cbo.BeginObject();
Cbo << "lsn"sv << Lsn;
Cbo << "state"sv << RunnerAction::ToString(Action->ActionState());
+ if (!Action->FailureReason.empty())
+ {
+ Cbo << "reason"sv << Action->FailureReason;
+ }
Cbo.EndObject();
}
});
@@ -1416,8 +1444,8 @@ ComputeServiceSession::Impl::GetQueueCompleted(int QueueId, CbWriter& Cbo)
if (Queue)
{
- Queue->m_Lock.WithSharedLock([&] {
- m_ActionMapLock.WithSharedLock([&] {
+ m_ActionMapLock.WithSharedLock([&] {
+ Queue->m_Lock.WithSharedLock([&] {
for (int Lsn : Queue->FinishedLsns)
{
if (m_ResultsMap.contains(Lsn))
@@ -1530,12 +1558,12 @@ ComputeServiceSession::Impl::SchedulePendingActions()
static Stopwatch DumpRunningTimer;
auto _ = MakeGuard([&] {
- ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results",
- ScheduledCount,
- RunningCount,
- m_RetiredCount.load(),
- PendingCount,
- ResultCount);
+ ZEN_DEBUG("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results",
+ ScheduledCount,
+ RunningCount,
+ m_RetiredCount.load(),
+ PendingCount,
+ ResultCount);
if (DumpRunningTimer.GetElapsedTimeMs() > 30000)
{
@@ -1584,13 +1612,13 @@ ComputeServiceSession::Impl::SchedulePendingActions()
// Also note that the m_PendingActions list is not maintained
// here, that's done periodically in SchedulePendingActions()
- m_ActionMapLock.WithExclusiveLock([&] {
- if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Paused)
- {
- return;
- }
+ // Extract pending actions under a shared lock — we only need to read
+ // the map and take Ref copies. ActionState() is atomic so this is safe.
+ // Sorting and capacity trimming happen outside the lock to avoid
+ // blocking HTTP handlers on O(N log N) work with large pending queues.
- if (m_PendingActions.empty())
+ m_ActionMapLock.WithSharedLock([&] {
+ if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Paused)
{
return;
}
@@ -1610,6 +1638,7 @@ ComputeServiceSession::Impl::SchedulePendingActions()
case RunnerAction::State::Completed:
case RunnerAction::State::Failed:
case RunnerAction::State::Abandoned:
+ case RunnerAction::State::Rejected:
case RunnerAction::State::Cancelled:
break;
@@ -1620,30 +1649,30 @@ ComputeServiceSession::Impl::SchedulePendingActions()
}
}
- // Sort by priority descending, then by LSN ascending (FIFO within same priority)
- std::sort(ActionsToSchedule.begin(), ActionsToSchedule.end(), [](const Ref<RunnerAction>& A, const Ref<RunnerAction>& B) {
- if (A->Priority != B->Priority)
- {
- return A->Priority > B->Priority;
- }
- return A->ActionLsn < B->ActionLsn;
- });
+ PendingCount = m_PendingActions.size();
+ });
- if (ActionsToSchedule.size() > Capacity)
+ // Sort by priority descending, then by LSN ascending (FIFO within same priority)
+ std::sort(ActionsToSchedule.begin(), ActionsToSchedule.end(), [](const Ref<RunnerAction>& A, const Ref<RunnerAction>& B) {
+ if (A->Priority != B->Priority)
{
- ActionsToSchedule.resize(Capacity);
+ return A->Priority > B->Priority;
}
-
- PendingCount = m_PendingActions.size();
+ return A->ActionLsn < B->ActionLsn;
});
+ if (ActionsToSchedule.size() > Capacity)
+ {
+ ActionsToSchedule.resize(Capacity);
+ }
+
if (ActionsToSchedule.empty())
{
_.Dismiss();
return;
}
- ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size());
+ ZEN_DEBUG("attempting schedule of {} pending actions", ActionsToSchedule.size());
Stopwatch SubmitTimer;
std::vector<SubmitResult> SubmitResults = SubmitActions(ActionsToSchedule);
@@ -1663,10 +1692,10 @@ ComputeServiceSession::Impl::SchedulePendingActions()
}
}
- ZEN_INFO("scheduled {} pending actions in {} ({} rejected)",
- ScheduledActionCount,
- NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()),
- NotAcceptedCount);
+ ZEN_DEBUG("scheduled {} pending actions in {} ({} rejected)",
+ ScheduledActionCount,
+ NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()),
+ NotAcceptedCount);
ScheduledCount += ScheduledActionCount;
PendingCount -= ScheduledActionCount;
@@ -1975,6 +2004,14 @@ ComputeServiceSession::Impl::HandleActionUpdates()
break;
}
+ // Rejected — runner was at capacity, reschedule without retry cost
+ case RunnerAction::State::Rejected:
+ {
+ Action->ResetActionStateToPending();
+ ZEN_DEBUG("action {} ({}) rescheduled after runner rejection", Action->ActionId, ActionLsn);
+ break;
+ }
+
// Terminal states — move to results, record history, notify queue
case RunnerAction::State::Completed:
case RunnerAction::State::Failed:
@@ -2009,6 +2046,14 @@ ComputeServiceSession::Impl::HandleActionUpdates()
MaxRetries);
break;
}
+ else
+ {
+ ZEN_WARN("action {} ({}) {} after {} retries, not rescheduling",
+ Action->ActionId,
+ ActionLsn,
+ RunnerAction::ToString(TerminalState),
+ Action->RetryCount.load(std::memory_order_relaxed));
+ }
}
m_ActionMapLock.WithExclusiveLock([&] {
@@ -2101,10 +2146,9 @@ ComputeServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>&
ZEN_TRACE_CPU("ComputeServiceSession::SubmitActions");
std::vector<SubmitResult> Results(Actions.size());
- // First try submitting the batch to local runners in parallel
+ // First try submitting the batch to local runners
std::vector<SubmitResult> LocalResults = m_LocalRunnerGroup.SubmitActions(Actions);
- std::vector<size_t> RemoteIndices;
std::vector<Ref<RunnerAction>> RemoteActions;
for (size_t i = 0; i < Actions.size(); ++i)
@@ -2115,20 +2159,40 @@ ComputeServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>&
}
else
{
- RemoteIndices.push_back(i);
RemoteActions.push_back(Actions[i]);
+ Results[i] = SubmitResult{.IsAccepted = true, .Reason = "dispatched to remote"};
}
}
- // Submit remaining actions to remote runners in parallel
+ // Dispatch remaining actions to remote runners asynchronously.
+ // Mark actions as Submitting so the scheduler won't re-pick them.
+ // The remote runner will transition them to Running on success, or
+ // we mark them Failed on rejection so HandleActionUpdates retries.
if (!RemoteActions.empty())
{
- std::vector<SubmitResult> RemoteResults = m_RemoteRunnerGroup.SubmitActions(RemoteActions);
-
- for (size_t j = 0; j < RemoteIndices.size(); ++j)
+ for (const Ref<RunnerAction>& Action : RemoteActions)
{
- Results[RemoteIndices[j]] = std::move(RemoteResults[j]);
+ Action->SetActionState(RunnerAction::State::Submitting);
}
+
+ m_RemoteSubmitPool.ScheduleWork(
+ [this, RemoteActions = std::move(RemoteActions)]() {
+ std::vector<SubmitResult> RemoteResults = m_RemoteRunnerGroup.SubmitActions(RemoteActions);
+
+ for (size_t j = 0; j < RemoteResults.size(); ++j)
+ {
+ if (!RemoteResults[j].IsAccepted)
+ {
+ ZEN_DEBUG("remote submission rejected for action {} ({}): {}",
+ RemoteActions[j]->ActionId,
+ RemoteActions[j]->ActionLsn,
+ RemoteResults[j].Reason);
+
+ RemoteActions[j]->SetActionState(RunnerAction::State::Rejected);
+ }
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
return Results;
@@ -2194,16 +2258,22 @@ ComputeServiceSession::NotifyOrchestratorChanged()
m_Impl->NotifyOrchestratorChanged();
}
-void
+bool
ComputeServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath)
{
- m_Impl->StartRecording(InResolver, RecordingPath);
+ return m_Impl->StartRecording(InResolver, RecordingPath);
}
-void
+bool
ComputeServiceSession::StopRecording()
{
- m_Impl->StopRecording();
+ return m_Impl->StopRecording();
+}
+
+bool
+ComputeServiceSession::IsRecording() const
+{
+ return m_Impl->IsRecording();
}
ComputeServiceSession::ActionCounts