diff options
| author | Stefan Boberg <[email protected]> | 2026-04-13 16:38:16 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-13 16:38:16 +0200 |
| commit | 795345e5fd7974a1f5227d507a58bb3ed75eafd5 (patch) | |
| tree | 7a0f142bf562c3590400586c82b0e7a1b5ad6493 /src/zencompute/include | |
| parent | 5.8.4-pre2 (diff) | |
| download | archived-zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.tar.xz archived-zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.zip | |
Compute OIDC auth, async Horde agents, and orchestrator improvements (#913)
Rework of the Horde agent subsystem from synchronous per-thread I/O to an async ASIO-driven architecture, plus provisioner scale-down with graceful draining, OIDC authentication, scheduler improvements, and dashboard UI for provisioner control.
### Async Horde Agent Rewrite
- Replace synchronous `HordeAgent` (one thread per agent, blocking I/O) with `AsyncHordeAgent` — an ASIO state machine running on a shared `io_context` thread pool
- Replace `TcpComputeTransport`/`AesComputeTransport` with `AsyncTcpComputeTransport`/`AsyncAesComputeTransport`
- Replace `AgentMessageChannel` with `AsyncAgentMessageChannel` using frame queuing and ASIO timers
- Delete `ComputeBuffer` and `ComputeChannel` ring-buffer classes (no longer needed)
### Provisioner Drain / Scale-Down
- `HordeProvisioner` can now drain agents when target core count is lowered: queries each agent's `/compute/session/status` for workload, selects candidates by largest-fit/lowest-workload, and sends `/compute/session/drain`
- Configurable `--horde-drain-grace-period` (default 300s) before force-kill
- Implement `IProvisionerStateProvider` interface to expose provisioner state to the orchestrator HTTP layer
- Forward `--coordinator-session`, `--provision-clean`, and `--provision-tracehost` through both Horde and Nomad provisioners to spawned workers
### OIDC Authentication
- `HordeClient` accepts an `AccessTokenProvider` (refreshable token function) as alternative to static `--horde-token`
- Wire up `OidcToken.exe` auto-discovery via `httpclientauth::CreateFromOidcTokenExecutable` with `--HordeUrl` mode
- New `--horde-oidctoken-exe-path` CLI option for explicit path override
### Orchestrator & Scheduler
- Orchestrator generates a session ID at startup; workers include `coordinator_session` in announcements so the orchestrator can reject stale-session workers
- New `Rejected` action state — when a remote runner declines at capacity, the action is rescheduled without retry count increment
- Reduce scheduler lock contention: snapshot pending actions under shared lock, sort/trim outside the lock
- Parallelize remote action submission across runners via `WorkerThreadPool` with slow-submit warnings
- New action field `FailureReason` populated by all runner types (exit codes, sandbox failures, exceptions)
- New endpoints: `session/drain`, `session/status`, `session/sunset`, `provisioner/status`, `provisioner/target`
### Remote Execution
- Eager-attach mode for `RemoteHttpRunner` — bundles all attachments upfront in a `CbPackage` for single-roundtrip submits
- Track in-flight submissions to prevent over-queuing
- Show remote runner hostname in `GetDisplayName()`
- `--announce-url` to override the endpoint announced to the coordinator (e.g. relay-visible address)
### Frontend Dashboard
- Delete standalone `compute.html` (925 lines) and `orchestrator.html` (669 lines), consolidated into JS page modules
- Add provisioner panel to orchestrator dashboard: target/active/estimated core counts, draining agent count
- Editable target-cores input with debounced POST to `/orch/provisioner/target`
- Per-agent provisioning status badges (active / draining / deallocated) in the agents table
- Active vs total CPU counts in agents summary row
### CLI
- New `zen compute record-start` / `record-stop` subcommands
- `zen exec` progress bar with submit and completion phases, atomic work counters, `--progress` mode (Pretty/Plain/Quiet)
### Other
- `DataDir` supports environment variable expansion
- Worker manifest validation checks for `worker.zcb` marker to detect incomplete cached directories
- Linux/Mac runners `nice(5)` child processes to avoid starving the main server
- `ComputeService::SetShutdownCallback` wired to `RequestExit` via `session/sunset`
- Curl HTTP client logs effective URL on failure
- `MachineInfo` carries `Pool` and `Mode` from Horde response
- Horde bundle creation includes `.pdb` on Windows
Diffstat (limited to 'src/zencompute/include')
5 files changed, 74 insertions, 4 deletions
diff --git a/src/zencompute/include/zencompute/computeservice.h b/src/zencompute/include/zencompute/computeservice.h index ad556f546..97de4321a 100644 --- a/src/zencompute/include/zencompute/computeservice.h +++ b/src/zencompute/include/zencompute/computeservice.h @@ -279,7 +279,7 @@ public: // sized to match RunnerAction::State::_Count but we can't use the enum here // for dependency reasons, so just use a fixed size array and static assert in // the implementation file - uint64_t Timestamps[9] = {}; + uint64_t Timestamps[10] = {}; }; [[nodiscard]] std::vector<ActionHistoryEntry> GetActionHistory(int Limit = 100); @@ -305,8 +305,9 @@ public: // Recording - void StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath); - void StopRecording(); + bool StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath); + bool StopRecording(); + bool IsRecording() const; private: void PostUpdate(RunnerAction* Action); diff --git a/src/zencompute/include/zencompute/httpcomputeservice.h b/src/zencompute/include/zencompute/httpcomputeservice.h index db3fce3c2..32f54f293 100644 --- a/src/zencompute/include/zencompute/httpcomputeservice.h +++ b/src/zencompute/include/zencompute/httpcomputeservice.h @@ -35,6 +35,10 @@ public: void Shutdown(); + /** Set a callback to be invoked when the session/sunset endpoint is hit. + * Typically wired to HttpServer::RequestExit() to shut down the process. */ + void SetShutdownCallback(std::function<void()> Callback); + [[nodiscard]] ComputeServiceSession::ActionCounts GetActionCounts(); const char* BaseUri() const override; diff --git a/src/zencompute/include/zencompute/httporchestrator.h b/src/zencompute/include/zencompute/httporchestrator.h index 58b2c9152..ef0a1269a 100644 --- a/src/zencompute/include/zencompute/httporchestrator.h +++ b/src/zencompute/include/zencompute/httporchestrator.h @@ -2,10 +2,12 @@ #pragma once +#include <zencompute/provisionerstate.h> #include <zencompute/zencompute.h> #include <zencore/logging.h> #include <zencore/thread.h> +#include <zencore/uid.h> #include <zenhttp/httpserver.h> #include <zenhttp/websocket.h> @@ -65,6 +67,16 @@ public: */ void Shutdown(); + /** Return the session ID generated at construction time. Provisioners + * pass this to spawned workers so the orchestrator can reject stale + * announcements from previous sessions. */ + Oid GetSessionId() const { return m_SessionId; } + + /** Register a provisioner whose target core count can be read and changed + * via the orchestrator HTTP API and dashboard. Caller retains ownership; + * the provider must outlive this service. */ + void SetProvisionerStateProvider(IProvisionerStateProvider* Provider); + virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& Request) override; @@ -81,6 +93,11 @@ private: std::unique_ptr<OrchestratorService> m_Service; std::string m_Hostname; + Oid m_SessionId; + bool ValidateCoordinatorSession(const CbObjectView& Data, std::string_view WorkerId); + + std::atomic<IProvisionerStateProvider*> m_Provisioner{nullptr}; + // WebSocket push #if ZEN_WITH_WEBSOCKETS diff --git a/src/zencompute/include/zencompute/orchestratorservice.h b/src/zencompute/include/zencompute/orchestratorservice.h index 549ff8e3c..2c49e22df 100644 --- a/src/zencompute/include/zencompute/orchestratorservice.h +++ b/src/zencompute/include/zencompute/orchestratorservice.h @@ -6,6 +6,7 @@ #if ZEN_WITH_COMPUTE_SERVICES +# include <zencompute/provisionerstate.h> # include <zencore/compactbinary.h> # include <zencore/compactbinarybuilder.h> # include <zencore/logbase.h> @@ -90,9 +91,16 @@ public: std::string Hostname; }; - CbObject GetWorkerList(); + /** Per-worker callback invoked during GetWorkerList serialization. + * The callback receives the worker ID and a CbObjectWriter positioned + * inside the worker's object, allowing the caller to append extra fields. */ + using WorkerAnnotator = std::function<void(std::string_view WorkerId, CbObjectWriter& Cbo)>; + + CbObject GetWorkerList(const WorkerAnnotator& Annotate = {}); void AnnounceWorker(const WorkerAnnouncement& Announcement); + void SetProvisionerStateProvider(IProvisionerStateProvider* Provider); + bool IsWorkerWebSocketEnabled() const; void SetWorkerWebSocketConnected(std::string_view WorkerId, bool Connected); @@ -171,6 +179,8 @@ private: LoggerRef m_Log{"compute.orchestrator"}; bool m_EnableWorkerWebSocket = false; + std::atomic<IProvisionerStateProvider*> m_Provisioner{nullptr}; + std::thread m_ProbeThread; std::atomic<bool> m_ProbeThreadEnabled{true}; Event m_ProbeThreadEvent; diff --git a/src/zencompute/include/zencompute/provisionerstate.h b/src/zencompute/include/zencompute/provisionerstate.h new file mode 100644 index 000000000..e9af8a635 --- /dev/null +++ b/src/zencompute/include/zencompute/provisionerstate.h @@ -0,0 +1,38 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <cstdint> +#include <string_view> + +namespace zen::compute { + +/** Per-agent provisioning status as seen by the provisioner. */ +enum class AgentProvisioningStatus +{ + Unknown, ///< Not known to the provisioner + Active, ///< Running and allocated + Draining, ///< Being gracefully deprovisioned +}; + +/** Abstract interface for querying and controlling a provisioner from the HTTP layer. + * This decouples the orchestrator service from specific provisioner implementations. */ +class IProvisionerStateProvider +{ +public: + virtual ~IProvisionerStateProvider() = default; + + virtual std::string_view GetName() const = 0; ///< e.g. "horde", "nomad" + virtual uint32_t GetTargetCoreCount() const = 0; + virtual uint32_t GetEstimatedCoreCount() const = 0; + virtual uint32_t GetActiveCoreCount() const = 0; + virtual uint32_t GetAgentCount() const = 0; + virtual uint32_t GetDrainingAgentCount() const { return 0; } + virtual void SetTargetCoreCount(uint32_t Count) = 0; + + /** Return the provisioning status for a worker by its orchestrator ID + * (e.g. "horde-{LeaseId}"). Returns Unknown if the ID is not recognized. */ + virtual AgentProvisioningStatus GetAgentStatus(std::string_view /*WorkerId*/) const { return AgentProvisioningStatus::Unknown; } +}; + +} // namespace zen::compute |