// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include "hordeagent.h" #include "hordebundle.h" #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #include #include #include namespace zen::horde { HordeProvisioner::HordeProvisioner(const HordeConfig& Config, const std::filesystem::path& BinariesPath, const std::filesystem::path& WorkingDir, std::string_view OrchestratorEndpoint, std::string_view CoordinatorSession, bool CleanStart, std::string_view TraceHost) : m_Config(Config) , m_BinariesPath(BinariesPath) , m_WorkingDir(WorkingDir) , m_OrchestratorEndpoint(OrchestratorEndpoint) , m_CoordinatorSession(CoordinatorSession) , m_CleanStart(CleanStart) , m_TraceHost(TraceHost) , m_Log(zen::logging::Get("horde.provisioner")) { m_IoContext = std::make_unique(); auto Work = asio::make_work_guard(*m_IoContext); for (int i = 0; i < IoThreadCount; ++i) { m_IoThreads.emplace_back([this, i, Work] { zen::SetCurrentThreadName(fmt::format("horde_io_{}", i)); m_IoContext->run(); }); } } HordeProvisioner::~HordeProvisioner() { m_AskForAgents.store(false); m_ShutdownEvent.Set(); // Shut down async agents and io_context { std::lock_guard Lock(m_AsyncAgentsLock); for (auto& Entry : m_AsyncAgents) { Entry.Agent->Cancel(); } m_AsyncAgents.clear(); } m_IoContext->stop(); for (auto& Thread : m_IoThreads) { if (Thread.joinable()) { Thread.join(); } } // Wait for all pool work items to finish before destroying members they reference if (m_PendingWorkItems.load() > 0) { m_AllWorkDone.Wait(); } } void HordeProvisioner::SetTargetCoreCount(uint32_t Count) { ZEN_TRACE_CPU("HordeProvisioner::SetTargetCoreCount"); const uint32_t ClampedCount = std::min(Count, static_cast(m_Config.MaxCores)); const uint32_t PreviousTarget = m_TargetCoreCount.exchange(ClampedCount); if (ClampedCount != PreviousTarget) { ZEN_INFO("target core count changed: {} -> {} (active={}, estimated={})", PreviousTarget, ClampedCount, m_ActiveCoreCount.load(), m_EstimatedCoreCount.load()); } // Only provision if the gap is at least one agent-sized chunk. Without // this, draining a 32-core agent to cover a 28-core excess would leave a // 4-core gap that triggers a 32-core provision, which triggers another // drain, ad infinitum. while (m_EstimatedCoreCount.load() + EstimatedCoresPerAgent <= m_TargetCoreCount.load()) { if (!m_AskForAgents.load()) { return; } RequestAgent(); } // Scale down async agents { std::lock_guard AsyncLock(m_AsyncAgentsLock); uint32_t AsyncActive = m_ActiveCoreCount.load(); uint32_t AsyncTarget = m_TargetCoreCount.load(); uint32_t AlreadyDrainingCores = 0; for (const auto& Entry : m_AsyncAgents) { if (Entry.Draining) { AlreadyDrainingCores += Entry.CoreCount; } } uint32_t EffectiveAsync = (AsyncActive > AlreadyDrainingCores) ? AsyncActive - AlreadyDrainingCores : 0; if (EffectiveAsync > AsyncTarget) { struct Candidate { AsyncAgentEntry* Entry; int Workload; }; std::vector Candidates; for (auto& Entry : m_AsyncAgents) { if (Entry.Draining || Entry.RemoteEndpoint.empty()) { continue; } int Workload = 0; bool Reachable = false; HttpClientSettings Settings; Settings.LogCategory = "horde.drain"; Settings.ConnectTimeout = std::chrono::milliseconds{2000}; Settings.Timeout = std::chrono::milliseconds{3000}; try { HttpClient Client(Entry.RemoteEndpoint, Settings); HttpClient::Response Resp = Client.Get("/compute/session/status"); if (Resp.IsSuccess()) { CbObject Status = Resp.AsObject(); Workload = Status["actions_pending"].AsInt32(0) + Status["actions_running"].AsInt32(0); Reachable = true; } } catch (const std::exception& Ex) { ZEN_DEBUG("agent lease={} not yet reachable for drain: {}", Entry.LeaseId, Ex.what()); } if (Reachable) { Candidates.push_back({&Entry, Workload}); } } const uint32_t ExcessCores = EffectiveAsync - AsyncTarget; uint32_t CoresDrained = 0; while (CoresDrained < ExcessCores && !Candidates.empty()) { const uint32_t Remaining = ExcessCores - CoresDrained; Candidates.erase(std::remove_if(Candidates.begin(), Candidates.end(), [Remaining](const Candidate& C) { return C.Entry->CoreCount > Remaining; }), Candidates.end()); if (Candidates.empty()) { break; } Candidate* Best = &Candidates[0]; for (auto& C : Candidates) { if (C.Entry->CoreCount > Best->Entry->CoreCount || (C.Entry->CoreCount == Best->Entry->CoreCount && C.Workload < Best->Workload)) { Best = &C; } } ZEN_INFO("draining async agent lease={} ({} cores, workload={})", Best->Entry->LeaseId, Best->Entry->CoreCount, Best->Workload); DrainAsyncAgent(*Best->Entry); CoresDrained += Best->Entry->CoreCount; AsyncAgentEntry* Drained = Best->Entry; Candidates.erase( std::remove_if(Candidates.begin(), Candidates.end(), [Drained](const Candidate& C) { return C.Entry == Drained; }), Candidates.end()); } } } } ProvisioningStats HordeProvisioner::GetStats() const { ProvisioningStats Stats; Stats.TargetCoreCount = m_TargetCoreCount.load(); Stats.EstimatedCoreCount = m_EstimatedCoreCount.load(); Stats.ActiveCoreCount = m_ActiveCoreCount.load(); Stats.AgentsActive = m_AgentsActive.load(); Stats.AgentsRequesting = m_AgentsRequesting.load(); return Stats; } uint32_t HordeProvisioner::GetAgentCount() const { std::lock_guard Lock(m_AsyncAgentsLock); return static_cast(m_AsyncAgents.size()); } compute::AgentProvisioningStatus HordeProvisioner::GetAgentStatus(std::string_view WorkerId) const { // Worker IDs are "horde-{LeaseId}" - strip the prefix to match lease ID constexpr std::string_view Prefix = "horde-"; if (!WorkerId.starts_with(Prefix)) { return compute::AgentProvisioningStatus::Unknown; } std::string_view LeaseId = WorkerId.substr(Prefix.size()); std::lock_guard AsyncLock(m_AsyncAgentsLock); for (const auto& Entry : m_AsyncAgents) { if (Entry.LeaseId == LeaseId) { if (Entry.Draining) { return compute::AgentProvisioningStatus::Draining; } return compute::AgentProvisioningStatus::Active; } } // Check recently-drained agents that have already been cleaned up std::string WorkerIdStr(WorkerId); if (m_RecentlyDrainedWorkerIds.erase(WorkerIdStr) > 0) { // Also remove from the ordering queue so size accounting stays consistent. auto It = std::find(m_RecentlyDrainedOrder.begin(), m_RecentlyDrainedOrder.end(), WorkerIdStr); if (It != m_RecentlyDrainedOrder.end()) { m_RecentlyDrainedOrder.erase(It); } return compute::AgentProvisioningStatus::Draining; } return compute::AgentProvisioningStatus::Unknown; } std::vector HordeProvisioner::BuildAgentArgs(const MachineInfo& Machine) const { std::vector Args; Args.emplace_back("compute"); Args.emplace_back("--http=asio"); Args.push_back(fmt::format("--port={}", m_Config.ZenServicePort)); Args.emplace_back("--data-dir=%UE_HORDE_SHARED_DIR%\\zen"); if (m_CleanStart) { Args.emplace_back("--clean"); } if (!m_OrchestratorEndpoint.empty()) { ExtendableStringBuilder<256> CoordArg; CoordArg << "--coordinator-endpoint=" << m_OrchestratorEndpoint; Args.emplace_back(CoordArg.ToView()); } { ExtendableStringBuilder<128> IdArg; IdArg << "--instance-id=horde-" << Machine.LeaseId; Args.emplace_back(IdArg.ToView()); } if (!m_CoordinatorSession.empty()) { ExtendableStringBuilder<128> SessionArg; SessionArg << "--coordinator-session=" << m_CoordinatorSession; Args.emplace_back(SessionArg.ToView()); } if (!m_TraceHost.empty()) { ExtendableStringBuilder<128> TraceArg; TraceArg << "--tracehost=" << m_TraceHost; Args.emplace_back(TraceArg.ToView()); } // In relay mode, the remote zenserver's local address is not reachable from the // orchestrator. Pass the relay-visible endpoint so it announces the correct URL. if (Machine.Mode == ConnectionMode::Relay) { const auto [Addr, Port] = Machine.GetZenServiceEndpoint(m_Config.ZenServicePort); if (Addr.find(':') != std::string::npos) { Args.push_back(fmt::format("--announce-url=http://[{}]:{}", Addr, Port)); } else { Args.push_back(fmt::format("--announce-url=http://{}:{}", Addr, Port)); } } return Args; } bool HordeProvisioner::InitializeHordeClient() { ZEN_TRACE_CPU("HordeProvisioner::InitializeHordeClient"); std::lock_guard BundleLock(m_BundleLock); if (!m_BundlesCreated) { const std::filesystem::path OutputDir = m_WorkingDir / "horde_bundles"; std::vector Files; #if ZEN_PLATFORM_WINDOWS Files.emplace_back(m_BinariesPath / "zenserver.exe", false); Files.emplace_back(m_BinariesPath / "zenserver.pdb", true); #elif ZEN_PLATFORM_LINUX Files.emplace_back(m_BinariesPath / "zenserver", false); Files.emplace_back(m_BinariesPath / "zenserver.debug", true); #elif ZEN_PLATFORM_MAC Files.emplace_back(m_BinariesPath / "zenserver", false); #endif BundleResult Result; if (!BundleCreator::CreateBundle(Files, OutputDir, Result)) { ZEN_WARN("failed to create bundle, cannot provision any agents!"); m_AskForAgents.store(false); m_ShutdownEvent.Set(); return false; } m_Bundles.emplace_back(Result.Locator, Result.BundleDir); m_BundlesCreated = true; } if (!m_HordeClient) { m_HordeClient = std::make_unique(m_Config); if (!m_HordeClient->Initialize()) { ZEN_WARN("failed to initialize Horde HTTP client, cannot provision any agents!"); m_AskForAgents.store(false); m_ShutdownEvent.Set(); return false; } } return true; } void HordeProvisioner::RequestAgent() { m_EstimatedCoreCount.fetch_add(EstimatedCoresPerAgent); if (m_PendingWorkItems.fetch_add(1) == 0) { m_AllWorkDone.Reset(); } GetSmallWorkerPool(EWorkloadType::Background) .ScheduleWork( [this] { ProvisionAgent(); if (m_PendingWorkItems.fetch_sub(1) == 1) { m_AllWorkDone.Set(); } }, WorkerThreadPool::EMode::EnableBacklog); } void HordeProvisioner::ProvisionAgent() { ZEN_TRACE_CPU("HordeProvisioner::ProvisionAgent"); // EstimatedCoreCount is incremented speculatively when the agent is requested // (in RequestAgent) so that SetTargetCoreCount doesn't over-provision. auto $ = MakeGuard([&] { m_EstimatedCoreCount.fetch_sub(EstimatedCoresPerAgent); }); if (!InitializeHordeClient()) { return; } if (!m_AskForAgents.load()) { return; } m_AgentsRequesting.fetch_add(1); auto ReqGuard = MakeGuard([this] { m_AgentsRequesting.fetch_sub(1); }); // Simple backoff: if the last machine request failed, wait up to 5 seconds // before trying again. // // Note however that it's possible that multiple threads enter this code at // the same time if multiple agents are requested at once, and they will all // see the same last failure time and back off accordingly. We might want to // use a semaphore or similar to limit the number of concurrent requests. if (const uint64_t LastFail = m_LastRequestFailTime.load(); LastFail != 0) { auto Now = static_cast(std::chrono::steady_clock::now().time_since_epoch().count()); const uint64_t ElapsedNs = Now - LastFail; const uint64_t ElapsedMs = ElapsedNs / 1'000'000; if (ElapsedMs < 5000) { // Wait on m_ShutdownEvent so shutdown wakes this pool thread immediately instead // of stalling for up to 5s in 100ms sleep chunks. Wait() returns true iff the // event was signaled (shutdown); false means the backoff elapsed normally. const uint64_t WaitMs = 5000 - ElapsedMs; if (m_ShutdownEvent.Wait(static_cast(WaitMs))) { return; } } } if (m_ActiveCoreCount.load() >= m_TargetCoreCount.load()) { return; } std::string RequestBody = m_HordeClient->BuildRequestBody(); // Resolve cluster if needed std::string ClusterId = m_Config.Cluster; if (ClusterId == HordeConfig::ClusterAuto) { ClusterInfo Cluster; if (!m_HordeClient->ResolveCluster(RequestBody, Cluster)) { ZEN_WARN("failed to resolve cluster"); m_LastRequestFailTime.store(static_cast(std::chrono::steady_clock::now().time_since_epoch().count())); return; } ClusterId = Cluster.ClusterId; } ZEN_INFO("requesting machine from Horde (cluster='{}', cores={}/{})", ClusterId.empty() ? "default" : ClusterId.c_str(), m_ActiveCoreCount.load(), m_TargetCoreCount.load()); MachineInfo Machine; if (!m_HordeClient->RequestMachine(RequestBody, ClusterId, /* out */ Machine) || !Machine.IsValid()) { m_LastRequestFailTime.store(static_cast(std::chrono::steady_clock::now().time_since_epoch().count())); return; } m_LastRequestFailTime.store(0); if (!m_AskForAgents.load()) { return; } AsyncAgentConfig AgentConfig; AgentConfig.Machine = Machine; AgentConfig.Bundles = m_Bundles; AgentConfig.Args = BuildAgentArgs(Machine); #if ZEN_PLATFORM_WINDOWS AgentConfig.UseWine = !Machine.IsWindows; AgentConfig.Executable = "zenserver.exe"; #else AgentConfig.UseWine = false; AgentConfig.Executable = "zenserver"; #endif auto AsyncAgent = std::make_shared(*m_IoContext); AsyncAgentEntry Entry; Entry.Agent = AsyncAgent; Entry.LeaseId = Machine.LeaseId; Entry.CoreCount = Machine.LogicalCores; const auto [EndpointAddr, EndpointPort] = Machine.GetZenServiceEndpoint(m_Config.ZenServicePort); if (EndpointAddr.find(':') != std::string::npos) { Entry.RemoteEndpoint = fmt::format("http://[{}]:{}", EndpointAddr, EndpointPort); } else { Entry.RemoteEndpoint = fmt::format("http://{}:{}", EndpointAddr, EndpointPort); } { std::lock_guard Lock(m_AsyncAgentsLock); m_AsyncAgents.push_back(std::move(Entry)); } AsyncAgent->Start(std::move(AgentConfig), [this, AsyncAgent](const AsyncAgentResult& Result) { if (Result.CoreCount > 0) { // Only subtract estimated cores if not already subtracted by DrainAsyncAgent bool WasDraining = false; { std::lock_guard Lock(m_AsyncAgentsLock); for (const auto& Entry : m_AsyncAgents) { if (Entry.Agent == AsyncAgent) { WasDraining = Entry.Draining; break; } } } if (!WasDraining) { m_EstimatedCoreCount.fetch_sub(Result.CoreCount); } m_ActiveCoreCount.fetch_sub(Result.CoreCount); m_AgentsActive.fetch_sub(1); } OnAsyncAgentDone(AsyncAgent); }); // Track active cores (estimated was already added by RequestAgent) m_EstimatedCoreCount.fetch_add(Machine.LogicalCores); m_ActiveCoreCount.fetch_add(Machine.LogicalCores); m_AgentsActive.fetch_add(1); } void HordeProvisioner::DrainAsyncAgent(AsyncAgentEntry& Entry) { Entry.Draining = true; m_EstimatedCoreCount.fetch_sub(Entry.CoreCount); m_AgentsDraining.fetch_add(1); HttpClientSettings Settings; Settings.LogCategory = "horde.drain"; Settings.ConnectTimeout = std::chrono::milliseconds{5000}; Settings.Timeout = std::chrono::milliseconds{10000}; try { HttpClient Client(Entry.RemoteEndpoint, Settings); HttpClient::Response Response = Client.Post("/compute/session/drain"); if (!Response.IsSuccess()) { ZEN_WARN("drain[{}]: POST session/drain failed: HTTP {}", Entry.LeaseId, static_cast(Response.StatusCode)); return; } ZEN_INFO("drain[{}]: session/drain accepted, sending sunset", Entry.LeaseId); (void)Client.Post("/compute/session/sunset"); } catch (const std::exception& Ex) { ZEN_WARN("drain[{}]: exception: {}", Entry.LeaseId, Ex.what()); } } void HordeProvisioner::OnAsyncAgentDone(std::shared_ptr Agent) { std::lock_guard Lock(m_AsyncAgentsLock); for (auto It = m_AsyncAgents.begin(); It != m_AsyncAgents.end(); ++It) { if (It->Agent == Agent) { if (It->Draining) { m_AgentsDraining.fetch_sub(1); std::string WorkerId = "horde-" + It->LeaseId; if (m_RecentlyDrainedWorkerIds.insert(WorkerId).second) { m_RecentlyDrainedOrder.push_back(WorkerId); while (m_RecentlyDrainedOrder.size() > RecentlyDrainedCapacity) { m_RecentlyDrainedWorkerIds.erase(m_RecentlyDrainedOrder.front()); m_RecentlyDrainedOrder.pop_front(); } } } m_AsyncAgents.erase(It); break; } } } } // namespace zen::horde