aboutsummaryrefslogtreecommitdiff
path: root/src/zenhorde/include
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-04 14:13:46 +0100
committerGitHub Enterprise <[email protected]>2026-03-04 14:13:46 +0100
commit0763d09a81e5a1d3df11763a7ec75e7860c9510a (patch)
tree074575ba6ea259044a179eab0bb396d37268fb09 /src/zenhorde/include
parentnative xmake toolchain definition for UE-clang (#805) (diff)
downloadzen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.tar.xz
zen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.zip
compute orchestration (#763)
- Added local process runners for Linux/Wine, Mac with some sandboxing support - Horde & Nomad provisioning for development and testing - Client session queues with lifecycle management (active/draining/cancelled), automatic retry with configurable limits, and manual reschedule API - Improved web UI for orchestrator, compute, and hub dashboards with WebSocket push updates - Some security hardening - Improved scalability and `zen exec` command Still experimental - compute support is disabled by default
Diffstat (limited to 'src/zenhorde/include')
-rw-r--r--src/zenhorde/include/zenhorde/hordeclient.h116
-rw-r--r--src/zenhorde/include/zenhorde/hordeconfig.h62
-rw-r--r--src/zenhorde/include/zenhorde/hordeprovisioner.h110
-rw-r--r--src/zenhorde/include/zenhorde/zenhorde.h9
4 files changed, 297 insertions, 0 deletions
diff --git a/src/zenhorde/include/zenhorde/hordeclient.h b/src/zenhorde/include/zenhorde/hordeclient.h
new file mode 100644
index 000000000..201d68b83
--- /dev/null
+++ b/src/zenhorde/include/zenhorde/hordeclient.h
@@ -0,0 +1,116 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenhorde/hordeconfig.h>
+
+#include <zencore/logbase.h>
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+namespace zen {
+class HttpClient;
+}
+
+namespace zen::horde {
+
+static constexpr size_t NonceSize = 64;
+static constexpr size_t KeySize = 32;
+
+/** Port mapping information returned by Horde for a provisioned machine. */
+struct PortInfo
+{
+ uint16_t Port = 0;
+ uint16_t AgentPort = 0;
+};
+
+/** Describes a provisioned compute machine returned by the Horde API.
+ *
+ * Contains the network address, encryption credentials, and capabilities
+ * needed to establish a compute transport connection to the machine.
+ */
+struct MachineInfo
+{
+ std::string Ip;
+ ConnectionMode Mode = ConnectionMode::Direct;
+ std::string ConnectionAddress; ///< Relay/tunnel address (used when Mode != Direct)
+ uint16_t Port = 0;
+ uint16_t LogicalCores = 0;
+ Encryption EncryptionMode = Encryption::None;
+ uint8_t Nonce[NonceSize] = {}; ///< 64-byte nonce sent during TCP handshake
+ uint8_t Key[KeySize] = {}; ///< 32-byte AES key (when EncryptionMode == AES)
+ bool IsWindows = false;
+ std::string LeaseId;
+
+ std::map<std::string, PortInfo> Ports;
+
+ /** Return the address to connect to, accounting for connection mode. */
+ const std::string& GetConnectionAddress() const { return Mode == ConnectionMode::Relay ? ConnectionAddress : Ip; }
+
+ /** Return the port to connect to, accounting for connection mode and port mapping. */
+ uint16_t GetConnectionPort() const
+ {
+ if (Mode == ConnectionMode::Relay)
+ {
+ auto It = Ports.find("_horde_compute");
+ if (It != Ports.end())
+ {
+ return It->second.Port;
+ }
+ }
+ return Port;
+ }
+
+ bool IsValid() const { return !Ip.empty() && Port != 0xFFFF; }
+};
+
+/** Result of cluster auto-resolution via the Horde API. */
+struct ClusterInfo
+{
+ std::string ClusterId = "default";
+};
+
+/** HTTP client for the Horde compute REST API.
+ *
+ * Handles cluster resolution and machine provisioning requests. Each call
+ * is synchronous and returns success/failure. Thread safety: individual
+ * methods are not thread-safe; callers must synchronize access.
+ */
+class HordeClient
+{
+public:
+ explicit HordeClient(const HordeConfig& Config);
+ ~HordeClient();
+
+ HordeClient(const HordeClient&) = delete;
+ HordeClient& operator=(const HordeClient&) = delete;
+
+ /** Initialize the underlying HTTP client. Must be called before other methods. */
+ bool Initialize();
+
+ /** Build the JSON request body for cluster resolution and machine requests.
+ * Encodes pool, condition, connection mode, encryption, and port requirements. */
+ std::string BuildRequestBody() const;
+
+ /** Resolve the best cluster for the given request via POST /api/v2/compute/_cluster. */
+ bool ResolveCluster(const std::string& RequestBody, ClusterInfo& OutCluster);
+
+ /** Request a compute machine from the given cluster via POST /api/v2/compute/{clusterId}.
+ * On success, populates OutMachine with connection details and credentials. */
+ bool RequestMachine(const std::string& RequestBody, const std::string& ClusterId, MachineInfo& OutMachine);
+
+ LoggerRef Log() { return m_Log; }
+
+private:
+ bool ParseHexBytes(std::string_view Hex, uint8_t* Out, size_t OutSize);
+
+ HordeConfig m_Config;
+ std::unique_ptr<zen::HttpClient> m_Http;
+ LoggerRef m_Log;
+};
+
+} // namespace zen::horde
diff --git a/src/zenhorde/include/zenhorde/hordeconfig.h b/src/zenhorde/include/zenhorde/hordeconfig.h
new file mode 100644
index 000000000..dd70f9832
--- /dev/null
+++ b/src/zenhorde/include/zenhorde/hordeconfig.h
@@ -0,0 +1,62 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenhorde/zenhorde.h>
+
+#include <string>
+
+namespace zen::horde {
+
+/** Transport connection mode for Horde compute agents. */
+enum class ConnectionMode
+{
+ Direct, ///< Connect directly to the agent IP
+ Tunnel, ///< Connect through a Horde tunnel relay
+ Relay, ///< Connect through a Horde relay with port mapping
+};
+
+/** Transport encryption mode for Horde compute channels. */
+enum class Encryption
+{
+ None, ///< No encryption
+ AES, ///< AES-256-GCM encryption (required for Relay mode)
+};
+
+/** Configuration for connecting to an Epic Horde compute cluster.
+ *
+ * Specifies the Horde server URL, authentication token, pool selection,
+ * connection mode, and resource limits. Used by HordeClient and HordeProvisioner.
+ */
+struct HordeConfig
+{
+ static constexpr const char* ClusterDefault = "default";
+ static constexpr const char* ClusterAuto = "_auto";
+
+ bool Enabled = false; ///< Whether Horde provisioning is active
+ std::string ServerUrl; ///< Horde server base URL (e.g. "https://horde.example.com")
+ std::string AuthToken; ///< Authentication token for the Horde API
+ std::string Pool; ///< Pool name to request machines from
+ std::string Cluster = ClusterDefault; ///< Cluster ID, or "_auto" to auto-resolve
+ std::string Condition; ///< Agent filter expression for machine selection
+ std::string HostAddress; ///< Address that provisioned agents use to connect back to us
+ std::string BinariesPath; ///< Path to directory containing zenserver binary for remote upload
+ uint16_t ZenServicePort = 8558; ///< Port number that provisioned agents should forward to us for Zen service communication
+
+ int MaxCores = 2048;
+ bool AllowWine = true; ///< Allow running Windows binaries under Wine on Linux agents
+ ConnectionMode Mode = ConnectionMode::Direct;
+ Encryption EncryptionMode = Encryption::None;
+
+ /** Validate the configuration. Returns false if the configuration is invalid
+ * (e.g. Relay mode without AES encryption). */
+ bool Validate() const;
+};
+
+const char* ToString(ConnectionMode Mode);
+const char* ToString(Encryption Enc);
+
+bool FromString(ConnectionMode& OutMode, std::string_view Str);
+bool FromString(Encryption& OutEnc, std::string_view Str);
+
+} // namespace zen::horde
diff --git a/src/zenhorde/include/zenhorde/hordeprovisioner.h b/src/zenhorde/include/zenhorde/hordeprovisioner.h
new file mode 100644
index 000000000..4e2e63bbd
--- /dev/null
+++ b/src/zenhorde/include/zenhorde/hordeprovisioner.h
@@ -0,0 +1,110 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenhorde/hordeconfig.h>
+
+#include <zencore/logbase.h>
+
+#include <atomic>
+#include <cstdint>
+#include <filesystem>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+namespace zen::horde {
+
+class HordeClient;
+
+/** Snapshot of the current provisioning state, returned by HordeProvisioner::GetStats(). */
+struct ProvisioningStats
+{
+ uint32_t TargetCoreCount = 0; ///< Requested number of cores (clamped to MaxCores)
+ uint32_t EstimatedCoreCount = 0; ///< Cores expected once pending requests complete
+ uint32_t ActiveCoreCount = 0; ///< Cores on machines that are currently running zenserver
+ uint32_t AgentsActive = 0; ///< Number of agents with a running remote process
+ uint32_t AgentsRequesting = 0; ///< Number of agents currently requesting a machine from Horde
+};
+
+/** Multi-agent lifecycle manager for Horde worker provisioning.
+ *
+ * Provisions remote compute workers by requesting machines from the Horde API,
+ * connecting via the Horde compute transport protocol, uploading the zenserver
+ * binary, and executing it remotely. Each provisioned machine runs zenserver
+ * in compute mode, which announces itself back to the orchestrator.
+ *
+ * Spawns one thread per agent. Each thread handles the full lifecycle:
+ * HTTP request -> TCP connect -> nonce handshake -> optional AES encryption ->
+ * channel setup -> binary upload -> remote execution -> poll until exit.
+ *
+ * Thread safety: SetTargetCoreCount and GetStats may be called from any thread.
+ */
+class HordeProvisioner
+{
+public:
+ /** Construct a provisioner.
+ * @param Config Horde connection and pool configuration.
+ * @param BinariesPath Directory containing the zenserver binary to upload.
+ * @param WorkingDir Local directory for bundle staging and working files.
+ * @param OrchestratorEndpoint URL of the orchestrator that remote workers announce to. */
+ HordeProvisioner(const HordeConfig& Config,
+ const std::filesystem::path& BinariesPath,
+ const std::filesystem::path& WorkingDir,
+ std::string_view OrchestratorEndpoint);
+
+ /** Signals all agent threads to exit and joins them. */
+ ~HordeProvisioner();
+
+ HordeProvisioner(const HordeProvisioner&) = delete;
+ HordeProvisioner& operator=(const HordeProvisioner&) = delete;
+
+ /** Set the target number of cores to provision.
+ * Clamped to HordeConfig::MaxCores. Spawns new agent threads if the
+ * estimated core count is below the target. Also joins any finished
+ * agent threads. */
+ void SetTargetCoreCount(uint32_t Count);
+
+ /** Return a snapshot of the current provisioning counters. */
+ ProvisioningStats GetStats() const;
+
+ uint32_t GetActiveCoreCount() const { return m_ActiveCoreCount.load(); }
+ uint32_t GetAgentCount() const;
+
+private:
+ LoggerRef Log() { return m_Log; }
+
+ struct AgentWrapper;
+
+ void RequestAgent();
+ void ThreadAgent(AgentWrapper& Wrapper);
+
+ HordeConfig m_Config;
+ std::filesystem::path m_BinariesPath;
+ std::filesystem::path m_WorkingDir;
+ std::string m_OrchestratorEndpoint;
+
+ std::unique_ptr<HordeClient> m_HordeClient;
+
+ std::mutex m_BundleLock;
+ std::vector<std::pair<std::string, std::filesystem::path>> m_Bundles; ///< (locator, bundleDir) pairs
+ bool m_BundlesCreated = false;
+
+ mutable std::mutex m_AgentsLock;
+ std::vector<std::unique_ptr<AgentWrapper>> m_Agents;
+
+ std::atomic<uint64_t> m_LastRequestFailTime{0};
+ std::atomic<uint32_t> m_TargetCoreCount{0};
+ std::atomic<uint32_t> m_EstimatedCoreCount{0};
+ std::atomic<uint32_t> m_ActiveCoreCount{0};
+ std::atomic<uint32_t> m_AgentsActive{0};
+ std::atomic<uint32_t> m_AgentsRequesting{0};
+ std::atomic<bool> m_AskForAgents{true};
+
+ LoggerRef m_Log;
+
+ static constexpr uint32_t EstimatedCoresPerAgent = 32;
+};
+
+} // namespace zen::horde
diff --git a/src/zenhorde/include/zenhorde/zenhorde.h b/src/zenhorde/include/zenhorde/zenhorde.h
new file mode 100644
index 000000000..35147ff75
--- /dev/null
+++ b/src/zenhorde/include/zenhorde/zenhorde.h
@@ -0,0 +1,9 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#if !defined(ZEN_WITH_HORDE)
+# define ZEN_WITH_HORDE 1
+#endif