aboutsummaryrefslogtreecommitdiff
path: root/src/zenhorde/hordeprovisioner.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-04 14:13:46 +0100
committerGitHub Enterprise <[email protected]>2026-03-04 14:13:46 +0100
commit0763d09a81e5a1d3df11763a7ec75e7860c9510a (patch)
tree074575ba6ea259044a179eab0bb396d37268fb09 /src/zenhorde/hordeprovisioner.cpp
parentnative xmake toolchain definition for UE-clang (#805) (diff)
downloadzen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.tar.xz
zen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.zip
compute orchestration (#763)
- Added local process runners for Linux/Wine, Mac with some sandboxing support - Horde & Nomad provisioning for development and testing - Client session queues with lifecycle management (active/draining/cancelled), automatic retry with configurable limits, and manual reschedule API - Improved web UI for orchestrator, compute, and hub dashboards with WebSocket push updates - Some security hardening - Improved scalability and `zen exec` command Still experimental - compute support is disabled by default
Diffstat (limited to 'src/zenhorde/hordeprovisioner.cpp')
-rw-r--r--src/zenhorde/hordeprovisioner.cpp367
1 files changed, 367 insertions, 0 deletions
diff --git a/src/zenhorde/hordeprovisioner.cpp b/src/zenhorde/hordeprovisioner.cpp
new file mode 100644
index 000000000..f88c95da2
--- /dev/null
+++ b/src/zenhorde/hordeprovisioner.cpp
@@ -0,0 +1,367 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenhorde/hordeclient.h>
+#include <zenhorde/hordeprovisioner.h>
+
+#include "hordeagent.h"
+#include "hordebundle.h"
+
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/thread.h>
+#include <zencore/trace.h>
+
+#include <chrono>
+#include <thread>
+
+namespace zen::horde {
+
+struct HordeProvisioner::AgentWrapper
+{
+ std::thread Thread;
+ std::atomic<bool> ShouldExit{false};
+};
+
+HordeProvisioner::HordeProvisioner(const HordeConfig& Config,
+ const std::filesystem::path& BinariesPath,
+ const std::filesystem::path& WorkingDir,
+ std::string_view OrchestratorEndpoint)
+: m_Config(Config)
+, m_BinariesPath(BinariesPath)
+, m_WorkingDir(WorkingDir)
+, m_OrchestratorEndpoint(OrchestratorEndpoint)
+, m_Log(zen::logging::Get("horde.provisioner"))
+{
+}
+
+HordeProvisioner::~HordeProvisioner()
+{
+ std::lock_guard<std::mutex> Lock(m_AgentsLock);
+ for (auto& Agent : m_Agents)
+ {
+ Agent->ShouldExit.store(true);
+ }
+ for (auto& Agent : m_Agents)
+ {
+ if (Agent->Thread.joinable())
+ {
+ Agent->Thread.join();
+ }
+ }
+}
+
+void
+HordeProvisioner::SetTargetCoreCount(uint32_t Count)
+{
+ ZEN_TRACE_CPU("HordeProvisioner::SetTargetCoreCount");
+
+ m_TargetCoreCount.store(std::min(Count, static_cast<uint32_t>(m_Config.MaxCores)));
+
+ while (m_EstimatedCoreCount.load() < m_TargetCoreCount.load())
+ {
+ if (!m_AskForAgents.load())
+ {
+ return;
+ }
+ RequestAgent();
+ }
+
+ // Clean up finished agent threads
+ std::lock_guard<std::mutex> Lock(m_AgentsLock);
+ for (auto It = m_Agents.begin(); It != m_Agents.end();)
+ {
+ if ((*It)->ShouldExit.load())
+ {
+ if ((*It)->Thread.joinable())
+ {
+ (*It)->Thread.join();
+ }
+ It = m_Agents.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+ProvisioningStats
+HordeProvisioner::GetStats() const
+{
+ ProvisioningStats Stats;
+ Stats.TargetCoreCount = m_TargetCoreCount.load();
+ Stats.EstimatedCoreCount = m_EstimatedCoreCount.load();
+ Stats.ActiveCoreCount = m_ActiveCoreCount.load();
+ Stats.AgentsActive = m_AgentsActive.load();
+ Stats.AgentsRequesting = m_AgentsRequesting.load();
+ return Stats;
+}
+
+uint32_t
+HordeProvisioner::GetAgentCount() const
+{
+ std::lock_guard<std::mutex> Lock(m_AgentsLock);
+ return static_cast<uint32_t>(m_Agents.size());
+}
+
+void
+HordeProvisioner::RequestAgent()
+{
+ m_EstimatedCoreCount.fetch_add(EstimatedCoresPerAgent);
+
+ std::lock_guard<std::mutex> Lock(m_AgentsLock);
+
+ auto Wrapper = std::make_unique<AgentWrapper>();
+ AgentWrapper& Ref = *Wrapper;
+ Wrapper->Thread = std::thread([this, &Ref] { ThreadAgent(Ref); });
+
+ m_Agents.push_back(std::move(Wrapper));
+}
+
+void
+HordeProvisioner::ThreadAgent(AgentWrapper& Wrapper)
+{
+ ZEN_TRACE_CPU("HordeProvisioner::ThreadAgent");
+
+ static std::atomic<uint32_t> ThreadIndex{0};
+ const uint32_t CurrentIndex = ThreadIndex.fetch_add(1);
+
+ zen::SetCurrentThreadName(fmt::format("horde_agent_{}", CurrentIndex));
+
+ std::unique_ptr<HordeAgent> Agent;
+ uint32_t MachineCoreCount = 0;
+
+ auto _ = MakeGuard([&] {
+ if (Agent)
+ {
+ Agent->CloseConnection();
+ }
+ Wrapper.ShouldExit.store(true);
+ });
+
+ {
+ // EstimatedCoreCount is incremented speculatively when the agent is requested
+ // (in RequestAgent) so that SetTargetCoreCount doesn't over-provision.
+ auto $ = MakeGuard([&] { m_EstimatedCoreCount.fetch_sub(EstimatedCoresPerAgent); });
+
+ {
+ ZEN_TRACE_CPU("HordeProvisioner::CreateBundles");
+
+ std::lock_guard<std::mutex> BundleLock(m_BundleLock);
+
+ if (!m_BundlesCreated)
+ {
+ const std::filesystem::path OutputDir = m_WorkingDir / "horde_bundles";
+
+ std::vector<BundleFile> Files;
+
+#if ZEN_PLATFORM_WINDOWS
+ Files.emplace_back(m_BinariesPath / "zenserver.exe", false);
+#elif ZEN_PLATFORM_LINUX
+ Files.emplace_back(m_BinariesPath / "zenserver", false);
+ Files.emplace_back(m_BinariesPath / "zenserver.debug", true);
+#elif ZEN_PLATFORM_MAC
+ Files.emplace_back(m_BinariesPath / "zenserver", false);
+#endif
+
+ BundleResult Result;
+ if (!BundleCreator::CreateBundle(Files, OutputDir, Result))
+ {
+ ZEN_WARN("failed to create bundle, cannot provision any agents!");
+ m_AskForAgents.store(false);
+ return;
+ }
+
+ m_Bundles.emplace_back(Result.Locator, Result.BundleDir);
+ m_BundlesCreated = true;
+ }
+
+ if (!m_HordeClient)
+ {
+ m_HordeClient = std::make_unique<HordeClient>(m_Config);
+ if (!m_HordeClient->Initialize())
+ {
+ ZEN_WARN("failed to initialize Horde HTTP client, cannot provision any agents!");
+ m_AskForAgents.store(false);
+ return;
+ }
+ }
+ }
+
+ if (!m_AskForAgents.load())
+ {
+ return;
+ }
+
+ m_AgentsRequesting.fetch_add(1);
+ auto ReqGuard = MakeGuard([this] { m_AgentsRequesting.fetch_sub(1); });
+
+ // Simple backoff: if the last machine request failed, wait up to 5 seconds
+ // before trying again.
+ //
+ // Note however that it's possible that multiple threads enter this code at
+ // the same time if multiple agents are requested at once, and they will all
+ // see the same last failure time and back off accordingly. We might want to
+ // use a semaphore or similar to limit the number of concurrent requests.
+
+ if (const uint64_t LastFail = m_LastRequestFailTime.load(); LastFail != 0)
+ {
+ auto Now = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
+ const uint64_t ElapsedNs = Now - LastFail;
+ const uint64_t ElapsedMs = ElapsedNs / 1'000'000;
+ if (ElapsedMs < 5000)
+ {
+ const uint64_t WaitMs = 5000 - ElapsedMs;
+ for (uint64_t Waited = 0; Waited < WaitMs && !Wrapper.ShouldExit.load(); Waited += 100)
+ {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+
+ if (Wrapper.ShouldExit.load())
+ {
+ return;
+ }
+ }
+ }
+
+ if (m_ActiveCoreCount.load() >= m_TargetCoreCount.load())
+ {
+ return;
+ }
+
+ std::string RequestBody = m_HordeClient->BuildRequestBody();
+
+ // Resolve cluster if needed
+ std::string ClusterId = m_Config.Cluster;
+ if (ClusterId == HordeConfig::ClusterAuto)
+ {
+ ClusterInfo Cluster;
+ if (!m_HordeClient->ResolveCluster(RequestBody, Cluster))
+ {
+ ZEN_WARN("failed to resolve cluster");
+ m_LastRequestFailTime.store(static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
+ return;
+ }
+ ClusterId = Cluster.ClusterId;
+ }
+
+ MachineInfo Machine;
+ if (!m_HordeClient->RequestMachine(RequestBody, ClusterId, /* out */ Machine) || !Machine.IsValid())
+ {
+ m_LastRequestFailTime.store(static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
+ return;
+ }
+
+ m_LastRequestFailTime.store(0);
+
+ if (Wrapper.ShouldExit.load())
+ {
+ return;
+ }
+
+ // Connect to agent and perform handshake
+ Agent = std::make_unique<HordeAgent>(Machine);
+ if (!Agent->IsValid())
+ {
+ ZEN_WARN("agent creation failed for {}:{}", Machine.GetConnectionAddress(), Machine.GetConnectionPort());
+ return;
+ }
+
+ if (!Agent->BeginCommunication())
+ {
+ ZEN_WARN("BeginCommunication failed");
+ return;
+ }
+
+ for (auto& [Locator, BundleDir] : m_Bundles)
+ {
+ if (Wrapper.ShouldExit.load())
+ {
+ return;
+ }
+
+ if (!Agent->UploadBinaries(BundleDir, Locator))
+ {
+ ZEN_WARN("UploadBinaries failed");
+ return;
+ }
+ }
+
+ if (Wrapper.ShouldExit.load())
+ {
+ return;
+ }
+
+ // Build command line for remote zenserver
+ std::vector<std::string> ArgStrings;
+ ArgStrings.push_back("compute");
+ ArgStrings.push_back("--http=asio");
+
+ // TEMP HACK - these should be made fully dynamic
+ // these are currently here to allow spawning the compute agent locally
+ // for debugging purposes (i.e with a local Horde Server+Agent setup)
+ ArgStrings.push_back(fmt::format("--port={}", m_Config.ZenServicePort));
+ ArgStrings.push_back("--data-dir=c:\\temp\\123");
+
+ if (!m_OrchestratorEndpoint.empty())
+ {
+ ExtendableStringBuilder<256> CoordArg;
+ CoordArg << "--coordinator-endpoint=" << m_OrchestratorEndpoint;
+ ArgStrings.emplace_back(CoordArg.ToView());
+ }
+
+ {
+ ExtendableStringBuilder<128> IdArg;
+ IdArg << "--instance-id=horde-" << Machine.LeaseId;
+ ArgStrings.emplace_back(IdArg.ToView());
+ }
+
+ std::vector<const char*> Args;
+ Args.reserve(ArgStrings.size());
+ for (const std::string& Arg : ArgStrings)
+ {
+ Args.push_back(Arg.c_str());
+ }
+
+#if ZEN_PLATFORM_WINDOWS
+ const bool UseWine = !Machine.IsWindows;
+ const char* AppName = "zenserver.exe";
+#else
+ const bool UseWine = false;
+ const char* AppName = "zenserver";
+#endif
+
+ Agent->Execute(AppName, Args.data(), Args.size(), nullptr, nullptr, 0, UseWine);
+
+ ZEN_INFO("remote execution started on [{}:{}] lease={}",
+ Machine.GetConnectionAddress(),
+ Machine.GetConnectionPort(),
+ Machine.LeaseId);
+
+ MachineCoreCount = Machine.LogicalCores;
+ m_EstimatedCoreCount.fetch_add(MachineCoreCount);
+ m_ActiveCoreCount.fetch_add(MachineCoreCount);
+ m_AgentsActive.fetch_add(1);
+ }
+
+ // Agent poll loop
+
+ auto ActiveGuard = MakeGuard([&]() {
+ m_EstimatedCoreCount.fetch_sub(MachineCoreCount);
+ m_ActiveCoreCount.fetch_sub(MachineCoreCount);
+ m_AgentsActive.fetch_sub(1);
+ });
+
+ while (Agent->IsValid() && !Wrapper.ShouldExit.load())
+ {
+ const bool LogOutput = false;
+ if (!Agent->Poll(LogOutput))
+ {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+}
+
+} // namespace zen::horde