// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include "hordeagent.h" #include "hordebundle.h" #include #include #include #include #include #include #include namespace zen::horde { struct HordeProvisioner::AgentWrapper { std::thread Thread; std::atomic ShouldExit{false}; }; HordeProvisioner::HordeProvisioner(const HordeConfig& Config, const std::filesystem::path& BinariesPath, const std::filesystem::path& WorkingDir, std::string_view OrchestratorEndpoint) : m_Config(Config) , m_BinariesPath(BinariesPath) , m_WorkingDir(WorkingDir) , m_OrchestratorEndpoint(OrchestratorEndpoint) , m_Log(zen::logging::Get("horde.provisioner")) { } HordeProvisioner::~HordeProvisioner() { std::lock_guard Lock(m_AgentsLock); for (auto& Agent : m_Agents) { Agent->ShouldExit.store(true); } for (auto& Agent : m_Agents) { if (Agent->Thread.joinable()) { Agent->Thread.join(); } } } void HordeProvisioner::SetTargetCoreCount(uint32_t Count) { ZEN_TRACE_CPU("HordeProvisioner::SetTargetCoreCount"); m_TargetCoreCount.store(std::min(Count, static_cast(m_Config.MaxCores))); while (m_EstimatedCoreCount.load() < m_TargetCoreCount.load()) { if (!m_AskForAgents.load()) { return; } RequestAgent(); } // Clean up finished agent threads std::lock_guard Lock(m_AgentsLock); for (auto It = m_Agents.begin(); It != m_Agents.end();) { if ((*It)->ShouldExit.load()) { if ((*It)->Thread.joinable()) { (*It)->Thread.join(); } It = m_Agents.erase(It); } else { ++It; } } } ProvisioningStats HordeProvisioner::GetStats() const { ProvisioningStats Stats; Stats.TargetCoreCount = m_TargetCoreCount.load(); Stats.EstimatedCoreCount = m_EstimatedCoreCount.load(); Stats.ActiveCoreCount = m_ActiveCoreCount.load(); Stats.AgentsActive = m_AgentsActive.load(); Stats.AgentsRequesting = m_AgentsRequesting.load(); return Stats; } uint32_t HordeProvisioner::GetAgentCount() const { std::lock_guard Lock(m_AgentsLock); return static_cast(m_Agents.size()); } void HordeProvisioner::RequestAgent() { m_EstimatedCoreCount.fetch_add(EstimatedCoresPerAgent); std::lock_guard Lock(m_AgentsLock); auto Wrapper = std::make_unique(); AgentWrapper& Ref = *Wrapper; Wrapper->Thread = std::thread([this, &Ref] { ThreadAgent(Ref); }); m_Agents.push_back(std::move(Wrapper)); } void HordeProvisioner::ThreadAgent(AgentWrapper& Wrapper) { ZEN_TRACE_CPU("HordeProvisioner::ThreadAgent"); static std::atomic ThreadIndex{0}; const uint32_t CurrentIndex = ThreadIndex.fetch_add(1); zen::SetCurrentThreadName(fmt::format("horde_agent_{}", CurrentIndex)); std::unique_ptr Agent; uint32_t MachineCoreCount = 0; auto _ = MakeGuard([&] { if (Agent) { Agent->CloseConnection(); } Wrapper.ShouldExit.store(true); }); { // EstimatedCoreCount is incremented speculatively when the agent is requested // (in RequestAgent) so that SetTargetCoreCount doesn't over-provision. auto $ = MakeGuard([&] { m_EstimatedCoreCount.fetch_sub(EstimatedCoresPerAgent); }); { ZEN_TRACE_CPU("HordeProvisioner::CreateBundles"); std::lock_guard BundleLock(m_BundleLock); if (!m_BundlesCreated) { const std::filesystem::path OutputDir = m_WorkingDir / "horde_bundles"; std::vector Files; #if ZEN_PLATFORM_WINDOWS Files.emplace_back(m_BinariesPath / "zenserver.exe", false); #elif ZEN_PLATFORM_LINUX Files.emplace_back(m_BinariesPath / "zenserver", false); Files.emplace_back(m_BinariesPath / "zenserver.debug", true); #elif ZEN_PLATFORM_MAC Files.emplace_back(m_BinariesPath / "zenserver", false); #endif BundleResult Result; if (!BundleCreator::CreateBundle(Files, OutputDir, Result)) { ZEN_WARN("failed to create bundle, cannot provision any agents!"); m_AskForAgents.store(false); return; } m_Bundles.emplace_back(Result.Locator, Result.BundleDir); m_BundlesCreated = true; } if (!m_HordeClient) { m_HordeClient = std::make_unique(m_Config); if (!m_HordeClient->Initialize()) { ZEN_WARN("failed to initialize Horde HTTP client, cannot provision any agents!"); m_AskForAgents.store(false); return; } } } if (!m_AskForAgents.load()) { return; } m_AgentsRequesting.fetch_add(1); auto ReqGuard = MakeGuard([this] { m_AgentsRequesting.fetch_sub(1); }); // Simple backoff: if the last machine request failed, wait up to 5 seconds // before trying again. // // Note however that it's possible that multiple threads enter this code at // the same time if multiple agents are requested at once, and they will all // see the same last failure time and back off accordingly. We might want to // use a semaphore or similar to limit the number of concurrent requests. if (const uint64_t LastFail = m_LastRequestFailTime.load(); LastFail != 0) { auto Now = static_cast(std::chrono::steady_clock::now().time_since_epoch().count()); const uint64_t ElapsedNs = Now - LastFail; const uint64_t ElapsedMs = ElapsedNs / 1'000'000; if (ElapsedMs < 5000) { const uint64_t WaitMs = 5000 - ElapsedMs; for (uint64_t Waited = 0; Waited < WaitMs && !Wrapper.ShouldExit.load(); Waited += 100) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } if (Wrapper.ShouldExit.load()) { return; } } } if (m_ActiveCoreCount.load() >= m_TargetCoreCount.load()) { return; } std::string RequestBody = m_HordeClient->BuildRequestBody(); // Resolve cluster if needed std::string ClusterId = m_Config.Cluster; if (ClusterId == HordeConfig::ClusterAuto) { ClusterInfo Cluster; if (!m_HordeClient->ResolveCluster(RequestBody, Cluster)) { ZEN_WARN("failed to resolve cluster"); m_LastRequestFailTime.store(static_cast(std::chrono::steady_clock::now().time_since_epoch().count())); return; } ClusterId = Cluster.ClusterId; } MachineInfo Machine; if (!m_HordeClient->RequestMachine(RequestBody, ClusterId, /* out */ Machine) || !Machine.IsValid()) { m_LastRequestFailTime.store(static_cast(std::chrono::steady_clock::now().time_since_epoch().count())); return; } m_LastRequestFailTime.store(0); if (Wrapper.ShouldExit.load()) { return; } // Connect to agent and perform handshake Agent = std::make_unique(Machine); if (!Agent->IsValid()) { ZEN_WARN("agent creation failed for {}:{}", Machine.GetConnectionAddress(), Machine.GetConnectionPort()); return; } if (!Agent->BeginCommunication()) { ZEN_WARN("BeginCommunication failed"); return; } for (auto& [Locator, BundleDir] : m_Bundles) { if (Wrapper.ShouldExit.load()) { return; } if (!Agent->UploadBinaries(BundleDir, Locator)) { ZEN_WARN("UploadBinaries failed"); return; } } if (Wrapper.ShouldExit.load()) { return; } // Build command line for remote zenserver std::vector ArgStrings; ArgStrings.push_back("compute"); ArgStrings.push_back("--http=asio"); // TEMP HACK - these should be made fully dynamic // these are currently here to allow spawning the compute agent locally // for debugging purposes (i.e with a local Horde Server+Agent setup) ArgStrings.push_back(fmt::format("--port={}", m_Config.ZenServicePort)); ArgStrings.push_back("--data-dir=c:\\temp\\123"); if (!m_OrchestratorEndpoint.empty()) { ExtendableStringBuilder<256> CoordArg; CoordArg << "--coordinator-endpoint=" << m_OrchestratorEndpoint; ArgStrings.emplace_back(CoordArg.ToView()); } { ExtendableStringBuilder<128> IdArg; IdArg << "--instance-id=horde-" << Machine.LeaseId; ArgStrings.emplace_back(IdArg.ToView()); } std::vector Args; Args.reserve(ArgStrings.size()); for (const std::string& Arg : ArgStrings) { Args.push_back(Arg.c_str()); } #if ZEN_PLATFORM_WINDOWS const bool UseWine = !Machine.IsWindows; const char* AppName = "zenserver.exe"; #else const bool UseWine = false; const char* AppName = "zenserver"; #endif Agent->Execute(AppName, Args.data(), Args.size(), nullptr, nullptr, 0, UseWine); ZEN_INFO("remote execution started on [{}:{}] lease={}", Machine.GetConnectionAddress(), Machine.GetConnectionPort(), Machine.LeaseId); MachineCoreCount = Machine.LogicalCores; m_EstimatedCoreCount.fetch_add(MachineCoreCount); m_ActiveCoreCount.fetch_add(MachineCoreCount); m_AgentsActive.fetch_add(1); } // Agent poll loop auto ActiveGuard = MakeGuard([&]() { m_EstimatedCoreCount.fetch_sub(MachineCoreCount); m_ActiveCoreCount.fetch_sub(MachineCoreCount); m_AgentsActive.fetch_sub(1); }); while (Agent->IsValid() && !Wrapper.ShouldExit.load()) { const bool LogOutput = false; if (!Agent->Poll(LogOutput)) { break; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } } // namespace zen::horde