diff options
| author | Stefan Boberg <[email protected]> | 2026-04-13 16:38:16 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-13 16:38:16 +0200 |
| commit | 795345e5fd7974a1f5227d507a58bb3ed75eafd5 (patch) | |
| tree | 7a0f142bf562c3590400586c82b0e7a1b5ad6493 /src/zencompute/computeservice.cpp | |
| parent | 5.8.4-pre2 (diff) | |
| download | zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.tar.xz zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.zip | |
Compute OIDC auth, async Horde agents, and orchestrator improvements (#913)
Rework of the Horde agent subsystem from synchronous per-thread I/O to an async ASIO-driven architecture, plus provisioner scale-down with graceful draining, OIDC authentication, scheduler improvements, and dashboard UI for provisioner control.
### Async Horde Agent Rewrite
- Replace synchronous `HordeAgent` (one thread per agent, blocking I/O) with `AsyncHordeAgent` — an ASIO state machine running on a shared `io_context` thread pool
- Replace `TcpComputeTransport`/`AesComputeTransport` with `AsyncTcpComputeTransport`/`AsyncAesComputeTransport`
- Replace `AgentMessageChannel` with `AsyncAgentMessageChannel` using frame queuing and ASIO timers
- Delete `ComputeBuffer` and `ComputeChannel` ring-buffer classes (no longer needed)
### Provisioner Drain / Scale-Down
- `HordeProvisioner` can now drain agents when target core count is lowered: queries each agent's `/compute/session/status` for workload, selects candidates by largest-fit/lowest-workload, and sends `/compute/session/drain`
- Configurable `--horde-drain-grace-period` (default 300s) before force-kill
- Implement `IProvisionerStateProvider` interface to expose provisioner state to the orchestrator HTTP layer
- Forward `--coordinator-session`, `--provision-clean`, and `--provision-tracehost` through both Horde and Nomad provisioners to spawned workers
### OIDC Authentication
- `HordeClient` accepts an `AccessTokenProvider` (refreshable token function) as alternative to static `--horde-token`
- Wire up `OidcToken.exe` auto-discovery via `httpclientauth::CreateFromOidcTokenExecutable` with `--HordeUrl` mode
- New `--horde-oidctoken-exe-path` CLI option for explicit path override
### Orchestrator & Scheduler
- Orchestrator generates a session ID at startup; workers include `coordinator_session` in announcements so the orchestrator can reject stale-session workers
- New `Rejected` action state — when a remote runner declines at capacity, the action is rescheduled without retry count increment
- Reduce scheduler lock contention: snapshot pending actions under shared lock, sort/trim outside the lock
- Parallelize remote action submission across runners via `WorkerThreadPool` with slow-submit warnings
- New action field `FailureReason` populated by all runner types (exit codes, sandbox failures, exceptions)
- New endpoints: `session/drain`, `session/status`, `session/sunset`, `provisioner/status`, `provisioner/target`
### Remote Execution
- Eager-attach mode for `RemoteHttpRunner` — bundles all attachments upfront in a `CbPackage` for single-roundtrip submits
- Track in-flight submissions to prevent over-queuing
- Show remote runner hostname in `GetDisplayName()`
- `--announce-url` to override the endpoint announced to the coordinator (e.g. relay-visible address)
### Frontend Dashboard
- Delete standalone `compute.html` (925 lines) and `orchestrator.html` (669 lines), consolidated into JS page modules
- Add provisioner panel to orchestrator dashboard: target/active/estimated core counts, draining agent count
- Editable target-cores input with debounced POST to `/orch/provisioner/target`
- Per-agent provisioning status badges (active / draining / deallocated) in the agents table
- Active vs total CPU counts in agents summary row
### CLI
- New `zen compute record-start` / `record-stop` subcommands
- `zen exec` progress bar with submit and completion phases, atomic work counters, `--progress` mode (Pretty/Plain/Quiet)
### Other
- `DataDir` supports environment variable expansion
- Worker manifest validation checks for `worker.zcb` marker to detect incomplete cached directories
- Linux/Mac runners `nice(5)` child processes to avoid starving the main server
- `ComputeService::SetShutdownCallback` wired to `RequestExit` via `session/sunset`
- Curl HTTP client logs effective URL on failure
- `MachineInfo` carries `Pool` and `Mode` from Horde response
- Horde bundle creation includes `.pdb` on Windows
Diffstat (limited to 'src/zencompute/computeservice.cpp')
| -rw-r--r-- | src/zencompute/computeservice.cpp | 164 |
1 files changed, 117 insertions, 47 deletions
diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp index aaf34cbe2..852e93fa0 100644 --- a/src/zencompute/computeservice.cpp +++ b/src/zencompute/computeservice.cpp @@ -121,6 +121,8 @@ struct ComputeServiceSession::Impl , m_LocalSubmitPool(GetLargeWorkerPool(EWorkloadType::Burst)) , m_RemoteSubmitPool(GetLargeWorkerPool(EWorkloadType::Burst)) { + m_RemoteRunnerGroup.SetWorkerPool(&m_RemoteSubmitPool); + // Create a non-expiring, non-deletable implicit queue for legacy endpoints auto Result = CreateQueue("implicit"sv, {}, {}); m_ImplicitQueueId = Result.QueueId; @@ -240,8 +242,9 @@ struct ComputeServiceSession::Impl // Recording - void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath); - void StopRecording(); + bool StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath); + bool StopRecording(); + bool IsRecording() const; std::unique_ptr<ActionRecorder> m_Recorder; @@ -615,6 +618,7 @@ ComputeServiceSession::Impl::UpdateCoordinatorState() m_KnownWorkerUris.insert(UriStr); auto* NewRunner = new RemoteHttpRunner(m_ChunkResolver, m_OrchestratorBasePath, UriStr, m_RemoteSubmitPool); + NewRunner->SetRemoteHostname(Hostname); SyncWorkersToRunner(*NewRunner); m_RemoteRunnerGroup.AddRunner(NewRunner); } @@ -716,24 +720,44 @@ ComputeServiceSession::Impl::ShutdownRunners() m_RemoteRunnerGroup.Shutdown(); } -void +bool ComputeServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath) { + if (m_Recorder) + { + ZEN_WARN("recording is already active"); + return false; + } + ZEN_INFO("starting recording to '{}'", RecordingPath); m_Recorder = std::make_unique<ActionRecorder>(InCidStore, RecordingPath); ZEN_INFO("started recording to '{}'", RecordingPath); + return true; } -void +bool ComputeServiceSession::Impl::StopRecording() { + if (!m_Recorder) + { + ZEN_WARN("no recording is active"); + return false; + } + ZEN_INFO("stopping recording"); m_Recorder = nullptr; ZEN_INFO("stopped recording"); + return true; +} + +bool +ComputeServiceSession::Impl::IsRecording() const +{ + return m_Recorder != nullptr; } std::vector<ComputeServiceSession::RunningActionInfo> @@ -1128,6 +1152,10 @@ ComputeServiceSession::Impl::GetCompleted(CbWriter& Cbo) Cbo.BeginObject(); Cbo << "lsn"sv << Lsn; Cbo << "state"sv << RunnerAction::ToString(Action->ActionState()); + if (!Action->FailureReason.empty()) + { + Cbo << "reason"sv << Action->FailureReason; + } Cbo.EndObject(); } }); @@ -1416,8 +1444,8 @@ ComputeServiceSession::Impl::GetQueueCompleted(int QueueId, CbWriter& Cbo) if (Queue) { - Queue->m_Lock.WithSharedLock([&] { - m_ActionMapLock.WithSharedLock([&] { + m_ActionMapLock.WithSharedLock([&] { + Queue->m_Lock.WithSharedLock([&] { for (int Lsn : Queue->FinishedLsns) { if (m_ResultsMap.contains(Lsn)) @@ -1530,12 +1558,12 @@ ComputeServiceSession::Impl::SchedulePendingActions() static Stopwatch DumpRunningTimer; auto _ = MakeGuard([&] { - ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results", - ScheduledCount, - RunningCount, - m_RetiredCount.load(), - PendingCount, - ResultCount); + ZEN_DEBUG("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results", + ScheduledCount, + RunningCount, + m_RetiredCount.load(), + PendingCount, + ResultCount); if (DumpRunningTimer.GetElapsedTimeMs() > 30000) { @@ -1584,13 +1612,13 @@ ComputeServiceSession::Impl::SchedulePendingActions() // Also note that the m_PendingActions list is not maintained // here, that's done periodically in SchedulePendingActions() - m_ActionMapLock.WithExclusiveLock([&] { - if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Paused) - { - return; - } + // Extract pending actions under a shared lock — we only need to read + // the map and take Ref copies. ActionState() is atomic so this is safe. + // Sorting and capacity trimming happen outside the lock to avoid + // blocking HTTP handlers on O(N log N) work with large pending queues. - if (m_PendingActions.empty()) + m_ActionMapLock.WithSharedLock([&] { + if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Paused) { return; } @@ -1610,6 +1638,7 @@ ComputeServiceSession::Impl::SchedulePendingActions() case RunnerAction::State::Completed: case RunnerAction::State::Failed: case RunnerAction::State::Abandoned: + case RunnerAction::State::Rejected: case RunnerAction::State::Cancelled: break; @@ -1620,30 +1649,30 @@ ComputeServiceSession::Impl::SchedulePendingActions() } } - // Sort by priority descending, then by LSN ascending (FIFO within same priority) - std::sort(ActionsToSchedule.begin(), ActionsToSchedule.end(), [](const Ref<RunnerAction>& A, const Ref<RunnerAction>& B) { - if (A->Priority != B->Priority) - { - return A->Priority > B->Priority; - } - return A->ActionLsn < B->ActionLsn; - }); + PendingCount = m_PendingActions.size(); + }); - if (ActionsToSchedule.size() > Capacity) + // Sort by priority descending, then by LSN ascending (FIFO within same priority) + std::sort(ActionsToSchedule.begin(), ActionsToSchedule.end(), [](const Ref<RunnerAction>& A, const Ref<RunnerAction>& B) { + if (A->Priority != B->Priority) { - ActionsToSchedule.resize(Capacity); + return A->Priority > B->Priority; } - - PendingCount = m_PendingActions.size(); + return A->ActionLsn < B->ActionLsn; }); + if (ActionsToSchedule.size() > Capacity) + { + ActionsToSchedule.resize(Capacity); + } + if (ActionsToSchedule.empty()) { _.Dismiss(); return; } - ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size()); + ZEN_DEBUG("attempting schedule of {} pending actions", ActionsToSchedule.size()); Stopwatch SubmitTimer; std::vector<SubmitResult> SubmitResults = SubmitActions(ActionsToSchedule); @@ -1663,10 +1692,10 @@ ComputeServiceSession::Impl::SchedulePendingActions() } } - ZEN_INFO("scheduled {} pending actions in {} ({} rejected)", - ScheduledActionCount, - NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), - NotAcceptedCount); + ZEN_DEBUG("scheduled {} pending actions in {} ({} rejected)", + ScheduledActionCount, + NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), + NotAcceptedCount); ScheduledCount += ScheduledActionCount; PendingCount -= ScheduledActionCount; @@ -1975,6 +2004,14 @@ ComputeServiceSession::Impl::HandleActionUpdates() break; } + // Rejected — runner was at capacity, reschedule without retry cost + case RunnerAction::State::Rejected: + { + Action->ResetActionStateToPending(); + ZEN_DEBUG("action {} ({}) rescheduled after runner rejection", Action->ActionId, ActionLsn); + break; + } + // Terminal states — move to results, record history, notify queue case RunnerAction::State::Completed: case RunnerAction::State::Failed: @@ -2009,6 +2046,14 @@ ComputeServiceSession::Impl::HandleActionUpdates() MaxRetries); break; } + else + { + ZEN_WARN("action {} ({}) {} after {} retries, not rescheduling", + Action->ActionId, + ActionLsn, + RunnerAction::ToString(TerminalState), + Action->RetryCount.load(std::memory_order_relaxed)); + } } m_ActionMapLock.WithExclusiveLock([&] { @@ -2101,10 +2146,9 @@ ComputeServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>& ZEN_TRACE_CPU("ComputeServiceSession::SubmitActions"); std::vector<SubmitResult> Results(Actions.size()); - // First try submitting the batch to local runners in parallel + // First try submitting the batch to local runners std::vector<SubmitResult> LocalResults = m_LocalRunnerGroup.SubmitActions(Actions); - std::vector<size_t> RemoteIndices; std::vector<Ref<RunnerAction>> RemoteActions; for (size_t i = 0; i < Actions.size(); ++i) @@ -2115,20 +2159,40 @@ ComputeServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>& } else { - RemoteIndices.push_back(i); RemoteActions.push_back(Actions[i]); + Results[i] = SubmitResult{.IsAccepted = true, .Reason = "dispatched to remote"}; } } - // Submit remaining actions to remote runners in parallel + // Dispatch remaining actions to remote runners asynchronously. + // Mark actions as Submitting so the scheduler won't re-pick them. + // The remote runner will transition them to Running on success, or + // we mark them Failed on rejection so HandleActionUpdates retries. if (!RemoteActions.empty()) { - std::vector<SubmitResult> RemoteResults = m_RemoteRunnerGroup.SubmitActions(RemoteActions); - - for (size_t j = 0; j < RemoteIndices.size(); ++j) + for (const Ref<RunnerAction>& Action : RemoteActions) { - Results[RemoteIndices[j]] = std::move(RemoteResults[j]); + Action->SetActionState(RunnerAction::State::Submitting); } + + m_RemoteSubmitPool.ScheduleWork( + [this, RemoteActions = std::move(RemoteActions)]() { + std::vector<SubmitResult> RemoteResults = m_RemoteRunnerGroup.SubmitActions(RemoteActions); + + for (size_t j = 0; j < RemoteResults.size(); ++j) + { + if (!RemoteResults[j].IsAccepted) + { + ZEN_DEBUG("remote submission rejected for action {} ({}): {}", + RemoteActions[j]->ActionId, + RemoteActions[j]->ActionLsn, + RemoteResults[j].Reason); + + RemoteActions[j]->SetActionState(RunnerAction::State::Rejected); + } + } + }, + WorkerThreadPool::EMode::EnableBacklog); } return Results; @@ -2194,16 +2258,22 @@ ComputeServiceSession::NotifyOrchestratorChanged() m_Impl->NotifyOrchestratorChanged(); } -void +bool ComputeServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath) { - m_Impl->StartRecording(InResolver, RecordingPath); + return m_Impl->StartRecording(InResolver, RecordingPath); } -void +bool ComputeServiceSession::StopRecording() { - m_Impl->StopRecording(); + return m_Impl->StopRecording(); +} + +bool +ComputeServiceSession::IsRecording() const +{ + return m_Impl->IsRecording(); } ComputeServiceSession::ActionCounts |