diff options
| author | Stefan Boberg <[email protected]> | 2026-03-04 14:13:46 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-04 14:13:46 +0100 |
| commit | 0763d09a81e5a1d3df11763a7ec75e7860c9510a (patch) | |
| tree | 074575ba6ea259044a179eab0bb396d37268fb09 /src/zenhorde/include | |
| parent | native xmake toolchain definition for UE-clang (#805) (diff) | |
| download | zen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.tar.xz zen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.zip | |
compute orchestration (#763)
- Added local process runners for Linux/Wine, Mac with some sandboxing support
- Horde & Nomad provisioning for development and testing
- Client session queues with lifecycle management (active/draining/cancelled), automatic retry with configurable limits, and manual reschedule API
- Improved web UI for orchestrator, compute, and hub dashboards with WebSocket push updates
- Some security hardening
- Improved scalability and `zen exec` command
Still experimental - compute support is disabled by default
Diffstat (limited to 'src/zenhorde/include')
| -rw-r--r-- | src/zenhorde/include/zenhorde/hordeclient.h | 116 | ||||
| -rw-r--r-- | src/zenhorde/include/zenhorde/hordeconfig.h | 62 | ||||
| -rw-r--r-- | src/zenhorde/include/zenhorde/hordeprovisioner.h | 110 | ||||
| -rw-r--r-- | src/zenhorde/include/zenhorde/zenhorde.h | 9 |
4 files changed, 297 insertions, 0 deletions
diff --git a/src/zenhorde/include/zenhorde/hordeclient.h b/src/zenhorde/include/zenhorde/hordeclient.h new file mode 100644 index 000000000..201d68b83 --- /dev/null +++ b/src/zenhorde/include/zenhorde/hordeclient.h @@ -0,0 +1,116 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhorde/hordeconfig.h> + +#include <zencore/logbase.h> + +#include <cstdint> +#include <map> +#include <memory> +#include <string> +#include <vector> + +namespace zen { +class HttpClient; +} + +namespace zen::horde { + +static constexpr size_t NonceSize = 64; +static constexpr size_t KeySize = 32; + +/** Port mapping information returned by Horde for a provisioned machine. */ +struct PortInfo +{ + uint16_t Port = 0; + uint16_t AgentPort = 0; +}; + +/** Describes a provisioned compute machine returned by the Horde API. + * + * Contains the network address, encryption credentials, and capabilities + * needed to establish a compute transport connection to the machine. + */ +struct MachineInfo +{ + std::string Ip; + ConnectionMode Mode = ConnectionMode::Direct; + std::string ConnectionAddress; ///< Relay/tunnel address (used when Mode != Direct) + uint16_t Port = 0; + uint16_t LogicalCores = 0; + Encryption EncryptionMode = Encryption::None; + uint8_t Nonce[NonceSize] = {}; ///< 64-byte nonce sent during TCP handshake + uint8_t Key[KeySize] = {}; ///< 32-byte AES key (when EncryptionMode == AES) + bool IsWindows = false; + std::string LeaseId; + + std::map<std::string, PortInfo> Ports; + + /** Return the address to connect to, accounting for connection mode. */ + const std::string& GetConnectionAddress() const { return Mode == ConnectionMode::Relay ? ConnectionAddress : Ip; } + + /** Return the port to connect to, accounting for connection mode and port mapping. */ + uint16_t GetConnectionPort() const + { + if (Mode == ConnectionMode::Relay) + { + auto It = Ports.find("_horde_compute"); + if (It != Ports.end()) + { + return It->second.Port; + } + } + return Port; + } + + bool IsValid() const { return !Ip.empty() && Port != 0xFFFF; } +}; + +/** Result of cluster auto-resolution via the Horde API. */ +struct ClusterInfo +{ + std::string ClusterId = "default"; +}; + +/** HTTP client for the Horde compute REST API. + * + * Handles cluster resolution and machine provisioning requests. Each call + * is synchronous and returns success/failure. Thread safety: individual + * methods are not thread-safe; callers must synchronize access. + */ +class HordeClient +{ +public: + explicit HordeClient(const HordeConfig& Config); + ~HordeClient(); + + HordeClient(const HordeClient&) = delete; + HordeClient& operator=(const HordeClient&) = delete; + + /** Initialize the underlying HTTP client. Must be called before other methods. */ + bool Initialize(); + + /** Build the JSON request body for cluster resolution and machine requests. + * Encodes pool, condition, connection mode, encryption, and port requirements. */ + std::string BuildRequestBody() const; + + /** Resolve the best cluster for the given request via POST /api/v2/compute/_cluster. */ + bool ResolveCluster(const std::string& RequestBody, ClusterInfo& OutCluster); + + /** Request a compute machine from the given cluster via POST /api/v2/compute/{clusterId}. + * On success, populates OutMachine with connection details and credentials. */ + bool RequestMachine(const std::string& RequestBody, const std::string& ClusterId, MachineInfo& OutMachine); + + LoggerRef Log() { return m_Log; } + +private: + bool ParseHexBytes(std::string_view Hex, uint8_t* Out, size_t OutSize); + + HordeConfig m_Config; + std::unique_ptr<zen::HttpClient> m_Http; + LoggerRef m_Log; +}; + +} // namespace zen::horde diff --git a/src/zenhorde/include/zenhorde/hordeconfig.h b/src/zenhorde/include/zenhorde/hordeconfig.h new file mode 100644 index 000000000..dd70f9832 --- /dev/null +++ b/src/zenhorde/include/zenhorde/hordeconfig.h @@ -0,0 +1,62 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhorde/zenhorde.h> + +#include <string> + +namespace zen::horde { + +/** Transport connection mode for Horde compute agents. */ +enum class ConnectionMode +{ + Direct, ///< Connect directly to the agent IP + Tunnel, ///< Connect through a Horde tunnel relay + Relay, ///< Connect through a Horde relay with port mapping +}; + +/** Transport encryption mode for Horde compute channels. */ +enum class Encryption +{ + None, ///< No encryption + AES, ///< AES-256-GCM encryption (required for Relay mode) +}; + +/** Configuration for connecting to an Epic Horde compute cluster. + * + * Specifies the Horde server URL, authentication token, pool selection, + * connection mode, and resource limits. Used by HordeClient and HordeProvisioner. + */ +struct HordeConfig +{ + static constexpr const char* ClusterDefault = "default"; + static constexpr const char* ClusterAuto = "_auto"; + + bool Enabled = false; ///< Whether Horde provisioning is active + std::string ServerUrl; ///< Horde server base URL (e.g. "https://horde.example.com") + std::string AuthToken; ///< Authentication token for the Horde API + std::string Pool; ///< Pool name to request machines from + std::string Cluster = ClusterDefault; ///< Cluster ID, or "_auto" to auto-resolve + std::string Condition; ///< Agent filter expression for machine selection + std::string HostAddress; ///< Address that provisioned agents use to connect back to us + std::string BinariesPath; ///< Path to directory containing zenserver binary for remote upload + uint16_t ZenServicePort = 8558; ///< Port number that provisioned agents should forward to us for Zen service communication + + int MaxCores = 2048; + bool AllowWine = true; ///< Allow running Windows binaries under Wine on Linux agents + ConnectionMode Mode = ConnectionMode::Direct; + Encryption EncryptionMode = Encryption::None; + + /** Validate the configuration. Returns false if the configuration is invalid + * (e.g. Relay mode without AES encryption). */ + bool Validate() const; +}; + +const char* ToString(ConnectionMode Mode); +const char* ToString(Encryption Enc); + +bool FromString(ConnectionMode& OutMode, std::string_view Str); +bool FromString(Encryption& OutEnc, std::string_view Str); + +} // namespace zen::horde diff --git a/src/zenhorde/include/zenhorde/hordeprovisioner.h b/src/zenhorde/include/zenhorde/hordeprovisioner.h new file mode 100644 index 000000000..4e2e63bbd --- /dev/null +++ b/src/zenhorde/include/zenhorde/hordeprovisioner.h @@ -0,0 +1,110 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhorde/hordeconfig.h> + +#include <zencore/logbase.h> + +#include <atomic> +#include <cstdint> +#include <filesystem> +#include <memory> +#include <mutex> +#include <string> +#include <vector> + +namespace zen::horde { + +class HordeClient; + +/** Snapshot of the current provisioning state, returned by HordeProvisioner::GetStats(). */ +struct ProvisioningStats +{ + uint32_t TargetCoreCount = 0; ///< Requested number of cores (clamped to MaxCores) + uint32_t EstimatedCoreCount = 0; ///< Cores expected once pending requests complete + uint32_t ActiveCoreCount = 0; ///< Cores on machines that are currently running zenserver + uint32_t AgentsActive = 0; ///< Number of agents with a running remote process + uint32_t AgentsRequesting = 0; ///< Number of agents currently requesting a machine from Horde +}; + +/** Multi-agent lifecycle manager for Horde worker provisioning. + * + * Provisions remote compute workers by requesting machines from the Horde API, + * connecting via the Horde compute transport protocol, uploading the zenserver + * binary, and executing it remotely. Each provisioned machine runs zenserver + * in compute mode, which announces itself back to the orchestrator. + * + * Spawns one thread per agent. Each thread handles the full lifecycle: + * HTTP request -> TCP connect -> nonce handshake -> optional AES encryption -> + * channel setup -> binary upload -> remote execution -> poll until exit. + * + * Thread safety: SetTargetCoreCount and GetStats may be called from any thread. + */ +class HordeProvisioner +{ +public: + /** Construct a provisioner. + * @param Config Horde connection and pool configuration. + * @param BinariesPath Directory containing the zenserver binary to upload. + * @param WorkingDir Local directory for bundle staging and working files. + * @param OrchestratorEndpoint URL of the orchestrator that remote workers announce to. */ + HordeProvisioner(const HordeConfig& Config, + const std::filesystem::path& BinariesPath, + const std::filesystem::path& WorkingDir, + std::string_view OrchestratorEndpoint); + + /** Signals all agent threads to exit and joins them. */ + ~HordeProvisioner(); + + HordeProvisioner(const HordeProvisioner&) = delete; + HordeProvisioner& operator=(const HordeProvisioner&) = delete; + + /** Set the target number of cores to provision. + * Clamped to HordeConfig::MaxCores. Spawns new agent threads if the + * estimated core count is below the target. Also joins any finished + * agent threads. */ + void SetTargetCoreCount(uint32_t Count); + + /** Return a snapshot of the current provisioning counters. */ + ProvisioningStats GetStats() const; + + uint32_t GetActiveCoreCount() const { return m_ActiveCoreCount.load(); } + uint32_t GetAgentCount() const; + +private: + LoggerRef Log() { return m_Log; } + + struct AgentWrapper; + + void RequestAgent(); + void ThreadAgent(AgentWrapper& Wrapper); + + HordeConfig m_Config; + std::filesystem::path m_BinariesPath; + std::filesystem::path m_WorkingDir; + std::string m_OrchestratorEndpoint; + + std::unique_ptr<HordeClient> m_HordeClient; + + std::mutex m_BundleLock; + std::vector<std::pair<std::string, std::filesystem::path>> m_Bundles; ///< (locator, bundleDir) pairs + bool m_BundlesCreated = false; + + mutable std::mutex m_AgentsLock; + std::vector<std::unique_ptr<AgentWrapper>> m_Agents; + + std::atomic<uint64_t> m_LastRequestFailTime{0}; + std::atomic<uint32_t> m_TargetCoreCount{0}; + std::atomic<uint32_t> m_EstimatedCoreCount{0}; + std::atomic<uint32_t> m_ActiveCoreCount{0}; + std::atomic<uint32_t> m_AgentsActive{0}; + std::atomic<uint32_t> m_AgentsRequesting{0}; + std::atomic<bool> m_AskForAgents{true}; + + LoggerRef m_Log; + + static constexpr uint32_t EstimatedCoresPerAgent = 32; +}; + +} // namespace zen::horde diff --git a/src/zenhorde/include/zenhorde/zenhorde.h b/src/zenhorde/include/zenhorde/zenhorde.h new file mode 100644 index 000000000..35147ff75 --- /dev/null +++ b/src/zenhorde/include/zenhorde/zenhorde.h @@ -0,0 +1,9 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#if !defined(ZEN_WITH_HORDE) +# define ZEN_WITH_HORDE 1 +#endif |