diff options
| author | Stefan Boberg <[email protected]> | 2026-04-13 16:38:16 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-13 16:38:16 +0200 |
| commit | 795345e5fd7974a1f5227d507a58bb3ed75eafd5 (patch) | |
| tree | 7a0f142bf562c3590400586c82b0e7a1b5ad6493 /src/zencompute/httpcomputeservice.cpp | |
| parent | 5.8.4-pre2 (diff) | |
| download | zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.tar.xz zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.zip | |
Compute OIDC auth, async Horde agents, and orchestrator improvements (#913)
Rework of the Horde agent subsystem from synchronous per-thread I/O to an async ASIO-driven architecture, plus provisioner scale-down with graceful draining, OIDC authentication, scheduler improvements, and dashboard UI for provisioner control.
### Async Horde Agent Rewrite
- Replace synchronous `HordeAgent` (one thread per agent, blocking I/O) with `AsyncHordeAgent` — an ASIO state machine running on a shared `io_context` thread pool
- Replace `TcpComputeTransport`/`AesComputeTransport` with `AsyncTcpComputeTransport`/`AsyncAesComputeTransport`
- Replace `AgentMessageChannel` with `AsyncAgentMessageChannel` using frame queuing and ASIO timers
- Delete `ComputeBuffer` and `ComputeChannel` ring-buffer classes (no longer needed)
### Provisioner Drain / Scale-Down
- `HordeProvisioner` can now drain agents when target core count is lowered: queries each agent's `/compute/session/status` for workload, selects candidates by largest-fit/lowest-workload, and sends `/compute/session/drain`
- Configurable `--horde-drain-grace-period` (default 300s) before force-kill
- Implement `IProvisionerStateProvider` interface to expose provisioner state to the orchestrator HTTP layer
- Forward `--coordinator-session`, `--provision-clean`, and `--provision-tracehost` through both Horde and Nomad provisioners to spawned workers
### OIDC Authentication
- `HordeClient` accepts an `AccessTokenProvider` (refreshable token function) as alternative to static `--horde-token`
- Wire up `OidcToken.exe` auto-discovery via `httpclientauth::CreateFromOidcTokenExecutable` with `--HordeUrl` mode
- New `--horde-oidctoken-exe-path` CLI option for explicit path override
### Orchestrator & Scheduler
- Orchestrator generates a session ID at startup; workers include `coordinator_session` in announcements so the orchestrator can reject stale-session workers
- New `Rejected` action state — when a remote runner declines at capacity, the action is rescheduled without retry count increment
- Reduce scheduler lock contention: snapshot pending actions under shared lock, sort/trim outside the lock
- Parallelize remote action submission across runners via `WorkerThreadPool` with slow-submit warnings
- New action field `FailureReason` populated by all runner types (exit codes, sandbox failures, exceptions)
- New endpoints: `session/drain`, `session/status`, `session/sunset`, `provisioner/status`, `provisioner/target`
### Remote Execution
- Eager-attach mode for `RemoteHttpRunner` — bundles all attachments upfront in a `CbPackage` for single-roundtrip submits
- Track in-flight submissions to prevent over-queuing
- Show remote runner hostname in `GetDisplayName()`
- `--announce-url` to override the endpoint announced to the coordinator (e.g. relay-visible address)
### Frontend Dashboard
- Delete standalone `compute.html` (925 lines) and `orchestrator.html` (669 lines), consolidated into JS page modules
- Add provisioner panel to orchestrator dashboard: target/active/estimated core counts, draining agent count
- Editable target-cores input with debounced POST to `/orch/provisioner/target`
- Per-agent provisioning status badges (active / draining / deallocated) in the agents table
- Active vs total CPU counts in agents summary row
### CLI
- New `zen compute record-start` / `record-stop` subcommands
- `zen exec` progress bar with submit and completion phases, atomic work counters, `--progress` mode (Pretty/Plain/Quiet)
### Other
- `DataDir` supports environment variable expansion
- Worker manifest validation checks for `worker.zcb` marker to detect incomplete cached directories
- Linux/Mac runners `nice(5)` child processes to avoid starving the main server
- `ComputeService::SetShutdownCallback` wired to `RequestExit` via `session/sunset`
- Curl HTTP client logs effective URL on failure
- `MachineInfo` carries `Pool` and `Mode` from Horde response
- Horde bundle creation includes `.pdb` on Windows
Diffstat (limited to 'src/zencompute/httpcomputeservice.cpp')
| -rw-r--r-- | src/zencompute/httpcomputeservice.cpp | 95 |
1 files changed, 91 insertions, 4 deletions
diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp index 6cb975dd3..8cbb25afd 100644 --- a/src/zencompute/httpcomputeservice.cpp +++ b/src/zencompute/httpcomputeservice.cpp @@ -62,6 +62,8 @@ struct HttpComputeService::Impl RwLock m_WsConnectionsLock; std::vector<Ref<WebSocketConnection>> m_WsConnections; + std::function<void()> m_ShutdownCallback; + // Metrics metrics::OperationTiming m_HttpRequests; @@ -190,6 +192,65 @@ HttpComputeService::Impl::RegisterRoutes() HttpVerb::kPost); m_Router.RegisterRoute( + "session/drain", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + if (m_ComputeService.RequestStateTransition(ComputeServiceSession::SessionState::Draining)) + { + CbObjectWriter Cbo; + Cbo << "state"sv << ToString(m_ComputeService.GetSessionState()); + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } + + CbObjectWriter Cbo; + Cbo << "error"sv + << "Cannot transition to Draining from current state"sv; + HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "session/status", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObjectWriter Cbo; + Cbo << "state"sv << ToString(m_ComputeService.GetSessionState()); + auto Counts = m_ComputeService.GetActionCounts(); + Cbo << "actions_pending"sv << Counts.Pending; + Cbo << "actions_running"sv << Counts.Running; + Cbo << "actions_completed"sv << Counts.Completed; + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "session/sunset", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + if (m_ComputeService.RequestStateTransition(ComputeServiceSession::SessionState::Sunset)) + { + CbObjectWriter Cbo; + Cbo << "state"sv << ToString(m_ComputeService.GetSessionState()); + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + + if (m_ShutdownCallback) + { + m_ShutdownCallback(); + } + return; + } + + CbObjectWriter Cbo; + Cbo << "error"sv + << "Cannot transition to Sunset from current state"sv; + HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( "workers", [this](HttpRouterRequest& Req) { HandleWorkersGet(Req.ServerRequest()); }, HttpVerb::kGet); @@ -506,9 +567,19 @@ HttpComputeService::Impl::RegisterRoutes() return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } - m_ComputeService.StartRecording(m_CombinedResolver, m_BaseDir / "recording"); + std::filesystem::path RecordingPath = m_BaseDir / "recording"; - return HttpReq.WriteResponse(HttpResponseCode::OK); + if (!m_ComputeService.StartRecording(m_CombinedResolver, RecordingPath)) + { + CbObjectWriter Cbo; + Cbo << "error" + << "recording is already active"; + return HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); + } + + CbObjectWriter Cbo; + Cbo << "path" << RecordingPath.string(); + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kPost); @@ -522,9 +593,19 @@ HttpComputeService::Impl::RegisterRoutes() return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } - m_ComputeService.StopRecording(); + std::filesystem::path RecordingPath = m_BaseDir / "recording"; + + if (!m_ComputeService.StopRecording()) + { + CbObjectWriter Cbo; + Cbo << "error" + << "no recording is active"; + return HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); + } - return HttpReq.WriteResponse(HttpResponseCode::OK); + CbObjectWriter Cbo; + Cbo << "path" << RecordingPath.string(); + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kPost); @@ -1066,6 +1147,12 @@ HttpComputeService::GetActionCounts() return m_Impl->m_ComputeService.GetActionCounts(); } +void +HttpComputeService::SetShutdownCallback(std::function<void()> Callback) +{ + m_Impl->m_ShutdownCallback = std::move(Callback); +} + const char* HttpComputeService::BaseUri() const { |