aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/include
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-13 16:38:16 +0200
committerGitHub Enterprise <[email protected]>2026-04-13 16:38:16 +0200
commit795345e5fd7974a1f5227d507a58bb3ed75eafd5 (patch)
tree7a0f142bf562c3590400586c82b0e7a1b5ad6493 /src/zencompute/include
parent5.8.4-pre2 (diff)
downloadarchived-zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.tar.xz
archived-zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.zip
Compute OIDC auth, async Horde agents, and orchestrator improvements (#913)
Rework of the Horde agent subsystem from synchronous per-thread I/O to an async ASIO-driven architecture, plus provisioner scale-down with graceful draining, OIDC authentication, scheduler improvements, and dashboard UI for provisioner control. ### Async Horde Agent Rewrite - Replace synchronous `HordeAgent` (one thread per agent, blocking I/O) with `AsyncHordeAgent` — an ASIO state machine running on a shared `io_context` thread pool - Replace `TcpComputeTransport`/`AesComputeTransport` with `AsyncTcpComputeTransport`/`AsyncAesComputeTransport` - Replace `AgentMessageChannel` with `AsyncAgentMessageChannel` using frame queuing and ASIO timers - Delete `ComputeBuffer` and `ComputeChannel` ring-buffer classes (no longer needed) ### Provisioner Drain / Scale-Down - `HordeProvisioner` can now drain agents when target core count is lowered: queries each agent's `/compute/session/status` for workload, selects candidates by largest-fit/lowest-workload, and sends `/compute/session/drain` - Configurable `--horde-drain-grace-period` (default 300s) before force-kill - Implement `IProvisionerStateProvider` interface to expose provisioner state to the orchestrator HTTP layer - Forward `--coordinator-session`, `--provision-clean`, and `--provision-tracehost` through both Horde and Nomad provisioners to spawned workers ### OIDC Authentication - `HordeClient` accepts an `AccessTokenProvider` (refreshable token function) as alternative to static `--horde-token` - Wire up `OidcToken.exe` auto-discovery via `httpclientauth::CreateFromOidcTokenExecutable` with `--HordeUrl` mode - New `--horde-oidctoken-exe-path` CLI option for explicit path override ### Orchestrator & Scheduler - Orchestrator generates a session ID at startup; workers include `coordinator_session` in announcements so the orchestrator can reject stale-session workers - New `Rejected` action state — when a remote runner declines at capacity, the action is rescheduled without retry count increment - Reduce scheduler lock contention: snapshot pending actions under shared lock, sort/trim outside the lock - Parallelize remote action submission across runners via `WorkerThreadPool` with slow-submit warnings - New action field `FailureReason` populated by all runner types (exit codes, sandbox failures, exceptions) - New endpoints: `session/drain`, `session/status`, `session/sunset`, `provisioner/status`, `provisioner/target` ### Remote Execution - Eager-attach mode for `RemoteHttpRunner` — bundles all attachments upfront in a `CbPackage` for single-roundtrip submits - Track in-flight submissions to prevent over-queuing - Show remote runner hostname in `GetDisplayName()` - `--announce-url` to override the endpoint announced to the coordinator (e.g. relay-visible address) ### Frontend Dashboard - Delete standalone `compute.html` (925 lines) and `orchestrator.html` (669 lines), consolidated into JS page modules - Add provisioner panel to orchestrator dashboard: target/active/estimated core counts, draining agent count - Editable target-cores input with debounced POST to `/orch/provisioner/target` - Per-agent provisioning status badges (active / draining / deallocated) in the agents table - Active vs total CPU counts in agents summary row ### CLI - New `zen compute record-start` / `record-stop` subcommands - `zen exec` progress bar with submit and completion phases, atomic work counters, `--progress` mode (Pretty/Plain/Quiet) ### Other - `DataDir` supports environment variable expansion - Worker manifest validation checks for `worker.zcb` marker to detect incomplete cached directories - Linux/Mac runners `nice(5)` child processes to avoid starving the main server - `ComputeService::SetShutdownCallback` wired to `RequestExit` via `session/sunset` - Curl HTTP client logs effective URL on failure - `MachineInfo` carries `Pool` and `Mode` from Horde response - Horde bundle creation includes `.pdb` on Windows
Diffstat (limited to 'src/zencompute/include')
-rw-r--r--src/zencompute/include/zencompute/computeservice.h7
-rw-r--r--src/zencompute/include/zencompute/httpcomputeservice.h4
-rw-r--r--src/zencompute/include/zencompute/httporchestrator.h17
-rw-r--r--src/zencompute/include/zencompute/orchestratorservice.h12
-rw-r--r--src/zencompute/include/zencompute/provisionerstate.h38
5 files changed, 74 insertions, 4 deletions
diff --git a/src/zencompute/include/zencompute/computeservice.h b/src/zencompute/include/zencompute/computeservice.h
index ad556f546..97de4321a 100644
--- a/src/zencompute/include/zencompute/computeservice.h
+++ b/src/zencompute/include/zencompute/computeservice.h
@@ -279,7 +279,7 @@ public:
// sized to match RunnerAction::State::_Count but we can't use the enum here
// for dependency reasons, so just use a fixed size array and static assert in
// the implementation file
- uint64_t Timestamps[9] = {};
+ uint64_t Timestamps[10] = {};
};
[[nodiscard]] std::vector<ActionHistoryEntry> GetActionHistory(int Limit = 100);
@@ -305,8 +305,9 @@ public:
// Recording
- void StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath);
- void StopRecording();
+ bool StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath);
+ bool StopRecording();
+ bool IsRecording() const;
private:
void PostUpdate(RunnerAction* Action);
diff --git a/src/zencompute/include/zencompute/httpcomputeservice.h b/src/zencompute/include/zencompute/httpcomputeservice.h
index db3fce3c2..32f54f293 100644
--- a/src/zencompute/include/zencompute/httpcomputeservice.h
+++ b/src/zencompute/include/zencompute/httpcomputeservice.h
@@ -35,6 +35,10 @@ public:
void Shutdown();
+ /** Set a callback to be invoked when the session/sunset endpoint is hit.
+ * Typically wired to HttpServer::RequestExit() to shut down the process. */
+ void SetShutdownCallback(std::function<void()> Callback);
+
[[nodiscard]] ComputeServiceSession::ActionCounts GetActionCounts();
const char* BaseUri() const override;
diff --git a/src/zencompute/include/zencompute/httporchestrator.h b/src/zencompute/include/zencompute/httporchestrator.h
index 58b2c9152..ef0a1269a 100644
--- a/src/zencompute/include/zencompute/httporchestrator.h
+++ b/src/zencompute/include/zencompute/httporchestrator.h
@@ -2,10 +2,12 @@
#pragma once
+#include <zencompute/provisionerstate.h>
#include <zencompute/zencompute.h>
#include <zencore/logging.h>
#include <zencore/thread.h>
+#include <zencore/uid.h>
#include <zenhttp/httpserver.h>
#include <zenhttp/websocket.h>
@@ -65,6 +67,16 @@ public:
*/
void Shutdown();
+ /** Return the session ID generated at construction time. Provisioners
+ * pass this to spawned workers so the orchestrator can reject stale
+ * announcements from previous sessions. */
+ Oid GetSessionId() const { return m_SessionId; }
+
+ /** Register a provisioner whose target core count can be read and changed
+ * via the orchestrator HTTP API and dashboard. Caller retains ownership;
+ * the provider must outlive this service. */
+ void SetProvisionerStateProvider(IProvisionerStateProvider* Provider);
+
virtual const char* BaseUri() const override;
virtual void HandleRequest(HttpServerRequest& Request) override;
@@ -81,6 +93,11 @@ private:
std::unique_ptr<OrchestratorService> m_Service;
std::string m_Hostname;
+ Oid m_SessionId;
+ bool ValidateCoordinatorSession(const CbObjectView& Data, std::string_view WorkerId);
+
+ std::atomic<IProvisionerStateProvider*> m_Provisioner{nullptr};
+
// WebSocket push
#if ZEN_WITH_WEBSOCKETS
diff --git a/src/zencompute/include/zencompute/orchestratorservice.h b/src/zencompute/include/zencompute/orchestratorservice.h
index 549ff8e3c..2c49e22df 100644
--- a/src/zencompute/include/zencompute/orchestratorservice.h
+++ b/src/zencompute/include/zencompute/orchestratorservice.h
@@ -6,6 +6,7 @@
#if ZEN_WITH_COMPUTE_SERVICES
+# include <zencompute/provisionerstate.h>
# include <zencore/compactbinary.h>
# include <zencore/compactbinarybuilder.h>
# include <zencore/logbase.h>
@@ -90,9 +91,16 @@ public:
std::string Hostname;
};
- CbObject GetWorkerList();
+ /** Per-worker callback invoked during GetWorkerList serialization.
+ * The callback receives the worker ID and a CbObjectWriter positioned
+ * inside the worker's object, allowing the caller to append extra fields. */
+ using WorkerAnnotator = std::function<void(std::string_view WorkerId, CbObjectWriter& Cbo)>;
+
+ CbObject GetWorkerList(const WorkerAnnotator& Annotate = {});
void AnnounceWorker(const WorkerAnnouncement& Announcement);
+ void SetProvisionerStateProvider(IProvisionerStateProvider* Provider);
+
bool IsWorkerWebSocketEnabled() const;
void SetWorkerWebSocketConnected(std::string_view WorkerId, bool Connected);
@@ -171,6 +179,8 @@ private:
LoggerRef m_Log{"compute.orchestrator"};
bool m_EnableWorkerWebSocket = false;
+ std::atomic<IProvisionerStateProvider*> m_Provisioner{nullptr};
+
std::thread m_ProbeThread;
std::atomic<bool> m_ProbeThreadEnabled{true};
Event m_ProbeThreadEvent;
diff --git a/src/zencompute/include/zencompute/provisionerstate.h b/src/zencompute/include/zencompute/provisionerstate.h
new file mode 100644
index 000000000..e9af8a635
--- /dev/null
+++ b/src/zencompute/include/zencompute/provisionerstate.h
@@ -0,0 +1,38 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <cstdint>
+#include <string_view>
+
+namespace zen::compute {
+
+/** Per-agent provisioning status as seen by the provisioner. */
+enum class AgentProvisioningStatus
+{
+ Unknown, ///< Not known to the provisioner
+ Active, ///< Running and allocated
+ Draining, ///< Being gracefully deprovisioned
+};
+
+/** Abstract interface for querying and controlling a provisioner from the HTTP layer.
+ * This decouples the orchestrator service from specific provisioner implementations. */
+class IProvisionerStateProvider
+{
+public:
+ virtual ~IProvisionerStateProvider() = default;
+
+ virtual std::string_view GetName() const = 0; ///< e.g. "horde", "nomad"
+ virtual uint32_t GetTargetCoreCount() const = 0;
+ virtual uint32_t GetEstimatedCoreCount() const = 0;
+ virtual uint32_t GetActiveCoreCount() const = 0;
+ virtual uint32_t GetAgentCount() const = 0;
+ virtual uint32_t GetDrainingAgentCount() const { return 0; }
+ virtual void SetTargetCoreCount(uint32_t Count) = 0;
+
+ /** Return the provisioning status for a worker by its orchestrator ID
+ * (e.g. "horde-{LeaseId}"). Returns Unknown if the ID is not recognized. */
+ virtual AgentProvisioningStatus GetAgentStatus(std::string_view /*WorkerId*/) const { return AgentProvisioningStatus::Unknown; }
+};
+
+} // namespace zen::compute