diff options
| author | Stefan Boberg <[email protected]> | 2026-03-04 14:13:46 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-04 14:13:46 +0100 |
| commit | 0763d09a81e5a1d3df11763a7ec75e7860c9510a (patch) | |
| tree | 074575ba6ea259044a179eab0bb396d37268fb09 /src/zenhorde/hordeprovisioner.cpp | |
| parent | native xmake toolchain definition for UE-clang (#805) (diff) | |
| download | zen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.tar.xz zen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.zip | |
compute orchestration (#763)
- Added local process runners for Linux/Wine, Mac with some sandboxing support
- Horde & Nomad provisioning for development and testing
- Client session queues with lifecycle management (active/draining/cancelled), automatic retry with configurable limits, and manual reschedule API
- Improved web UI for orchestrator, compute, and hub dashboards with WebSocket push updates
- Some security hardening
- Improved scalability and `zen exec` command
Still experimental - compute support is disabled by default
Diffstat (limited to 'src/zenhorde/hordeprovisioner.cpp')
| -rw-r--r-- | src/zenhorde/hordeprovisioner.cpp | 367 |
1 files changed, 367 insertions, 0 deletions
diff --git a/src/zenhorde/hordeprovisioner.cpp b/src/zenhorde/hordeprovisioner.cpp new file mode 100644 index 000000000..f88c95da2 --- /dev/null +++ b/src/zenhorde/hordeprovisioner.cpp @@ -0,0 +1,367 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenhorde/hordeclient.h> +#include <zenhorde/hordeprovisioner.h> + +#include "hordeagent.h" +#include "hordebundle.h" + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/thread.h> +#include <zencore/trace.h> + +#include <chrono> +#include <thread> + +namespace zen::horde { + +struct HordeProvisioner::AgentWrapper +{ + std::thread Thread; + std::atomic<bool> ShouldExit{false}; +}; + +HordeProvisioner::HordeProvisioner(const HordeConfig& Config, + const std::filesystem::path& BinariesPath, + const std::filesystem::path& WorkingDir, + std::string_view OrchestratorEndpoint) +: m_Config(Config) +, m_BinariesPath(BinariesPath) +, m_WorkingDir(WorkingDir) +, m_OrchestratorEndpoint(OrchestratorEndpoint) +, m_Log(zen::logging::Get("horde.provisioner")) +{ +} + +HordeProvisioner::~HordeProvisioner() +{ + std::lock_guard<std::mutex> Lock(m_AgentsLock); + for (auto& Agent : m_Agents) + { + Agent->ShouldExit.store(true); + } + for (auto& Agent : m_Agents) + { + if (Agent->Thread.joinable()) + { + Agent->Thread.join(); + } + } +} + +void +HordeProvisioner::SetTargetCoreCount(uint32_t Count) +{ + ZEN_TRACE_CPU("HordeProvisioner::SetTargetCoreCount"); + + m_TargetCoreCount.store(std::min(Count, static_cast<uint32_t>(m_Config.MaxCores))); + + while (m_EstimatedCoreCount.load() < m_TargetCoreCount.load()) + { + if (!m_AskForAgents.load()) + { + return; + } + RequestAgent(); + } + + // Clean up finished agent threads + std::lock_guard<std::mutex> Lock(m_AgentsLock); + for (auto It = m_Agents.begin(); It != m_Agents.end();) + { + if ((*It)->ShouldExit.load()) + { + if ((*It)->Thread.joinable()) + { + (*It)->Thread.join(); + } + It = m_Agents.erase(It); + } + else + { + ++It; + } + } +} + +ProvisioningStats +HordeProvisioner::GetStats() const +{ + ProvisioningStats Stats; + Stats.TargetCoreCount = m_TargetCoreCount.load(); + Stats.EstimatedCoreCount = m_EstimatedCoreCount.load(); + Stats.ActiveCoreCount = m_ActiveCoreCount.load(); + Stats.AgentsActive = m_AgentsActive.load(); + Stats.AgentsRequesting = m_AgentsRequesting.load(); + return Stats; +} + +uint32_t +HordeProvisioner::GetAgentCount() const +{ + std::lock_guard<std::mutex> Lock(m_AgentsLock); + return static_cast<uint32_t>(m_Agents.size()); +} + +void +HordeProvisioner::RequestAgent() +{ + m_EstimatedCoreCount.fetch_add(EstimatedCoresPerAgent); + + std::lock_guard<std::mutex> Lock(m_AgentsLock); + + auto Wrapper = std::make_unique<AgentWrapper>(); + AgentWrapper& Ref = *Wrapper; + Wrapper->Thread = std::thread([this, &Ref] { ThreadAgent(Ref); }); + + m_Agents.push_back(std::move(Wrapper)); +} + +void +HordeProvisioner::ThreadAgent(AgentWrapper& Wrapper) +{ + ZEN_TRACE_CPU("HordeProvisioner::ThreadAgent"); + + static std::atomic<uint32_t> ThreadIndex{0}; + const uint32_t CurrentIndex = ThreadIndex.fetch_add(1); + + zen::SetCurrentThreadName(fmt::format("horde_agent_{}", CurrentIndex)); + + std::unique_ptr<HordeAgent> Agent; + uint32_t MachineCoreCount = 0; + + auto _ = MakeGuard([&] { + if (Agent) + { + Agent->CloseConnection(); + } + Wrapper.ShouldExit.store(true); + }); + + { + // EstimatedCoreCount is incremented speculatively when the agent is requested + // (in RequestAgent) so that SetTargetCoreCount doesn't over-provision. + auto $ = MakeGuard([&] { m_EstimatedCoreCount.fetch_sub(EstimatedCoresPerAgent); }); + + { + ZEN_TRACE_CPU("HordeProvisioner::CreateBundles"); + + std::lock_guard<std::mutex> BundleLock(m_BundleLock); + + if (!m_BundlesCreated) + { + const std::filesystem::path OutputDir = m_WorkingDir / "horde_bundles"; + + std::vector<BundleFile> Files; + +#if ZEN_PLATFORM_WINDOWS + Files.emplace_back(m_BinariesPath / "zenserver.exe", false); +#elif ZEN_PLATFORM_LINUX + Files.emplace_back(m_BinariesPath / "zenserver", false); + Files.emplace_back(m_BinariesPath / "zenserver.debug", true); +#elif ZEN_PLATFORM_MAC + Files.emplace_back(m_BinariesPath / "zenserver", false); +#endif + + BundleResult Result; + if (!BundleCreator::CreateBundle(Files, OutputDir, Result)) + { + ZEN_WARN("failed to create bundle, cannot provision any agents!"); + m_AskForAgents.store(false); + return; + } + + m_Bundles.emplace_back(Result.Locator, Result.BundleDir); + m_BundlesCreated = true; + } + + if (!m_HordeClient) + { + m_HordeClient = std::make_unique<HordeClient>(m_Config); + if (!m_HordeClient->Initialize()) + { + ZEN_WARN("failed to initialize Horde HTTP client, cannot provision any agents!"); + m_AskForAgents.store(false); + return; + } + } + } + + if (!m_AskForAgents.load()) + { + return; + } + + m_AgentsRequesting.fetch_add(1); + auto ReqGuard = MakeGuard([this] { m_AgentsRequesting.fetch_sub(1); }); + + // Simple backoff: if the last machine request failed, wait up to 5 seconds + // before trying again. + // + // Note however that it's possible that multiple threads enter this code at + // the same time if multiple agents are requested at once, and they will all + // see the same last failure time and back off accordingly. We might want to + // use a semaphore or similar to limit the number of concurrent requests. + + if (const uint64_t LastFail = m_LastRequestFailTime.load(); LastFail != 0) + { + auto Now = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()); + const uint64_t ElapsedNs = Now - LastFail; + const uint64_t ElapsedMs = ElapsedNs / 1'000'000; + if (ElapsedMs < 5000) + { + const uint64_t WaitMs = 5000 - ElapsedMs; + for (uint64_t Waited = 0; Waited < WaitMs && !Wrapper.ShouldExit.load(); Waited += 100) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + if (Wrapper.ShouldExit.load()) + { + return; + } + } + } + + if (m_ActiveCoreCount.load() >= m_TargetCoreCount.load()) + { + return; + } + + std::string RequestBody = m_HordeClient->BuildRequestBody(); + + // Resolve cluster if needed + std::string ClusterId = m_Config.Cluster; + if (ClusterId == HordeConfig::ClusterAuto) + { + ClusterInfo Cluster; + if (!m_HordeClient->ResolveCluster(RequestBody, Cluster)) + { + ZEN_WARN("failed to resolve cluster"); + m_LastRequestFailTime.store(static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count())); + return; + } + ClusterId = Cluster.ClusterId; + } + + MachineInfo Machine; + if (!m_HordeClient->RequestMachine(RequestBody, ClusterId, /* out */ Machine) || !Machine.IsValid()) + { + m_LastRequestFailTime.store(static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count())); + return; + } + + m_LastRequestFailTime.store(0); + + if (Wrapper.ShouldExit.load()) + { + return; + } + + // Connect to agent and perform handshake + Agent = std::make_unique<HordeAgent>(Machine); + if (!Agent->IsValid()) + { + ZEN_WARN("agent creation failed for {}:{}", Machine.GetConnectionAddress(), Machine.GetConnectionPort()); + return; + } + + if (!Agent->BeginCommunication()) + { + ZEN_WARN("BeginCommunication failed"); + return; + } + + for (auto& [Locator, BundleDir] : m_Bundles) + { + if (Wrapper.ShouldExit.load()) + { + return; + } + + if (!Agent->UploadBinaries(BundleDir, Locator)) + { + ZEN_WARN("UploadBinaries failed"); + return; + } + } + + if (Wrapper.ShouldExit.load()) + { + return; + } + + // Build command line for remote zenserver + std::vector<std::string> ArgStrings; + ArgStrings.push_back("compute"); + ArgStrings.push_back("--http=asio"); + + // TEMP HACK - these should be made fully dynamic + // these are currently here to allow spawning the compute agent locally + // for debugging purposes (i.e with a local Horde Server+Agent setup) + ArgStrings.push_back(fmt::format("--port={}", m_Config.ZenServicePort)); + ArgStrings.push_back("--data-dir=c:\\temp\\123"); + + if (!m_OrchestratorEndpoint.empty()) + { + ExtendableStringBuilder<256> CoordArg; + CoordArg << "--coordinator-endpoint=" << m_OrchestratorEndpoint; + ArgStrings.emplace_back(CoordArg.ToView()); + } + + { + ExtendableStringBuilder<128> IdArg; + IdArg << "--instance-id=horde-" << Machine.LeaseId; + ArgStrings.emplace_back(IdArg.ToView()); + } + + std::vector<const char*> Args; + Args.reserve(ArgStrings.size()); + for (const std::string& Arg : ArgStrings) + { + Args.push_back(Arg.c_str()); + } + +#if ZEN_PLATFORM_WINDOWS + const bool UseWine = !Machine.IsWindows; + const char* AppName = "zenserver.exe"; +#else + const bool UseWine = false; + const char* AppName = "zenserver"; +#endif + + Agent->Execute(AppName, Args.data(), Args.size(), nullptr, nullptr, 0, UseWine); + + ZEN_INFO("remote execution started on [{}:{}] lease={}", + Machine.GetConnectionAddress(), + Machine.GetConnectionPort(), + Machine.LeaseId); + + MachineCoreCount = Machine.LogicalCores; + m_EstimatedCoreCount.fetch_add(MachineCoreCount); + m_ActiveCoreCount.fetch_add(MachineCoreCount); + m_AgentsActive.fetch_add(1); + } + + // Agent poll loop + + auto ActiveGuard = MakeGuard([&]() { + m_EstimatedCoreCount.fetch_sub(MachineCoreCount); + m_ActiveCoreCount.fetch_sub(MachineCoreCount); + m_AgentsActive.fetch_sub(1); + }); + + while (Agent->IsValid() && !Wrapper.ShouldExit.load()) + { + const bool LogOutput = false; + if (!Agent->Poll(LogOutput)) + { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } +} + +} // namespace zen::horde |