aboutsummaryrefslogtreecommitdiff
path: root/src/zenhorde/hordeprovisioner.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-23 18:16:57 +0200
committerStefan Boberg <[email protected]>2026-04-23 18:16:57 +0200
commit0232b991cd7d8e3a2114ea30e4591dd3e7b65c36 (patch)
tree94730e7594fd09ae1fa820391ce311f6daf13905 /src/zenhorde/hordeprovisioner.cpp
parentFix forward declaration order for s_GotSigWinch and SigWinchHandler (diff)
parenttrace: declare Region event name fields as AnsiString (#1012) (diff)
downloadarchived-zen-sb/zen-help.tar.xz
archived-zen-sb/zen-help.zip
Merge branch 'main' into sb/zen-helpsb/zen-help
- Combine HelpCommand (this branch) with HistoryCommand (main) in zen CLI dispatcher - Keep filter-aware TuiPickOne rewrite; adopt main's ASCII arrow glyphs in doc comment
Diffstat (limited to 'src/zenhorde/hordeprovisioner.cpp')
-rw-r--r--src/zenhorde/hordeprovisioner.cpp682
1 files changed, 473 insertions, 209 deletions
diff --git a/src/zenhorde/hordeprovisioner.cpp b/src/zenhorde/hordeprovisioner.cpp
index f88c95da2..ea0ea1e83 100644
--- a/src/zenhorde/hordeprovisioner.cpp
+++ b/src/zenhorde/hordeprovisioner.cpp
@@ -6,49 +6,83 @@
#include "hordeagent.h"
#include "hordebundle.h"
+#include <zencore/compactbinary.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/thread.h>
#include <zencore/trace.h>
+#include <zenhttp/httpclient.h>
+#include <zenutil/workerpools.h>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <algorithm>
#include <chrono>
#include <thread>
namespace zen::horde {
-struct HordeProvisioner::AgentWrapper
-{
- std::thread Thread;
- std::atomic<bool> ShouldExit{false};
-};
-
HordeProvisioner::HordeProvisioner(const HordeConfig& Config,
const std::filesystem::path& BinariesPath,
const std::filesystem::path& WorkingDir,
- std::string_view OrchestratorEndpoint)
+ std::string_view OrchestratorEndpoint,
+ std::string_view CoordinatorSession,
+ bool CleanStart,
+ std::string_view TraceHost)
: m_Config(Config)
, m_BinariesPath(BinariesPath)
, m_WorkingDir(WorkingDir)
, m_OrchestratorEndpoint(OrchestratorEndpoint)
+, m_CoordinatorSession(CoordinatorSession)
+, m_CleanStart(CleanStart)
+, m_TraceHost(TraceHost)
, m_Log(zen::logging::Get("horde.provisioner"))
{
+ m_IoContext = std::make_unique<asio::io_context>();
+
+ auto Work = asio::make_work_guard(*m_IoContext);
+ for (int i = 0; i < IoThreadCount; ++i)
+ {
+ m_IoThreads.emplace_back([this, i, Work] {
+ zen::SetCurrentThreadName(fmt::format("horde_io_{}", i));
+ m_IoContext->run();
+ });
+ }
}
HordeProvisioner::~HordeProvisioner()
{
- std::lock_guard<std::mutex> Lock(m_AgentsLock);
- for (auto& Agent : m_Agents)
+ m_AskForAgents.store(false);
+ m_ShutdownEvent.Set();
+
+ // Shut down async agents and io_context
{
- Agent->ShouldExit.store(true);
+ std::lock_guard<std::mutex> Lock(m_AsyncAgentsLock);
+ for (auto& Entry : m_AsyncAgents)
+ {
+ Entry.Agent->Cancel();
+ }
+ m_AsyncAgents.clear();
}
- for (auto& Agent : m_Agents)
+
+ m_IoContext->stop();
+
+ for (auto& Thread : m_IoThreads)
{
- if (Agent->Thread.joinable())
+ if (Thread.joinable())
{
- Agent->Thread.join();
+ Thread.join();
}
}
+
+ // Wait for all pool work items to finish before destroying members they reference
+ if (m_PendingWorkItems.load() > 0)
+ {
+ m_AllWorkDone.Wait();
+ }
}
void
@@ -56,9 +90,23 @@ HordeProvisioner::SetTargetCoreCount(uint32_t Count)
{
ZEN_TRACE_CPU("HordeProvisioner::SetTargetCoreCount");
- m_TargetCoreCount.store(std::min(Count, static_cast<uint32_t>(m_Config.MaxCores)));
+ const uint32_t ClampedCount = std::min(Count, static_cast<uint32_t>(m_Config.MaxCores));
+ const uint32_t PreviousTarget = m_TargetCoreCount.exchange(ClampedCount);
+
+ if (ClampedCount != PreviousTarget)
+ {
+ ZEN_INFO("target core count changed: {} -> {} (active={}, estimated={})",
+ PreviousTarget,
+ ClampedCount,
+ m_ActiveCoreCount.load(),
+ m_EstimatedCoreCount.load());
+ }
- while (m_EstimatedCoreCount.load() < m_TargetCoreCount.load())
+ // Only provision if the gap is at least one agent-sized chunk. Without
+ // this, draining a 32-core agent to cover a 28-core excess would leave a
+ // 4-core gap that triggers a 32-core provision, which triggers another
+ // drain, ad infinitum.
+ while (m_EstimatedCoreCount.load() + EstimatedCoresPerAgent <= m_TargetCoreCount.load())
{
if (!m_AskForAgents.load())
{
@@ -67,21 +115,108 @@ HordeProvisioner::SetTargetCoreCount(uint32_t Count)
RequestAgent();
}
- // Clean up finished agent threads
- std::lock_guard<std::mutex> Lock(m_AgentsLock);
- for (auto It = m_Agents.begin(); It != m_Agents.end();)
+ // Scale down async agents
{
- if ((*It)->ShouldExit.load())
+ std::lock_guard<std::mutex> AsyncLock(m_AsyncAgentsLock);
+
+ uint32_t AsyncActive = m_ActiveCoreCount.load();
+ uint32_t AsyncTarget = m_TargetCoreCount.load();
+
+ uint32_t AlreadyDrainingCores = 0;
+ for (const auto& Entry : m_AsyncAgents)
{
- if ((*It)->Thread.joinable())
+ if (Entry.Draining)
{
- (*It)->Thread.join();
+ AlreadyDrainingCores += Entry.CoreCount;
}
- It = m_Agents.erase(It);
}
- else
+
+ uint32_t EffectiveAsync = (AsyncActive > AlreadyDrainingCores) ? AsyncActive - AlreadyDrainingCores : 0;
+
+ if (EffectiveAsync > AsyncTarget)
{
- ++It;
+ struct Candidate
+ {
+ AsyncAgentEntry* Entry;
+ int Workload;
+ };
+ std::vector<Candidate> Candidates;
+
+ for (auto& Entry : m_AsyncAgents)
+ {
+ if (Entry.Draining || Entry.RemoteEndpoint.empty())
+ {
+ continue;
+ }
+
+ int Workload = 0;
+ bool Reachable = false;
+ HttpClientSettings Settings;
+ Settings.LogCategory = "horde.drain";
+ Settings.ConnectTimeout = std::chrono::milliseconds{2000};
+ Settings.Timeout = std::chrono::milliseconds{3000};
+ try
+ {
+ HttpClient Client(Entry.RemoteEndpoint, Settings);
+ HttpClient::Response Resp = Client.Get("/compute/session/status");
+ if (Resp.IsSuccess())
+ {
+ CbObject Status = Resp.AsObject();
+ Workload = Status["actions_pending"].AsInt32(0) + Status["actions_running"].AsInt32(0);
+ Reachable = true;
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_DEBUG("agent lease={} not yet reachable for drain: {}", Entry.LeaseId, Ex.what());
+ }
+
+ if (Reachable)
+ {
+ Candidates.push_back({&Entry, Workload});
+ }
+ }
+
+ const uint32_t ExcessCores = EffectiveAsync - AsyncTarget;
+ uint32_t CoresDrained = 0;
+
+ while (CoresDrained < ExcessCores && !Candidates.empty())
+ {
+ const uint32_t Remaining = ExcessCores - CoresDrained;
+
+ Candidates.erase(std::remove_if(Candidates.begin(),
+ Candidates.end(),
+ [Remaining](const Candidate& C) { return C.Entry->CoreCount > Remaining; }),
+ Candidates.end());
+
+ if (Candidates.empty())
+ {
+ break;
+ }
+
+ Candidate* Best = &Candidates[0];
+ for (auto& C : Candidates)
+ {
+ if (C.Entry->CoreCount > Best->Entry->CoreCount ||
+ (C.Entry->CoreCount == Best->Entry->CoreCount && C.Workload < Best->Workload))
+ {
+ Best = &C;
+ }
+ }
+
+ ZEN_INFO("draining async agent lease={} ({} cores, workload={})",
+ Best->Entry->LeaseId,
+ Best->Entry->CoreCount,
+ Best->Workload);
+
+ DrainAsyncAgent(*Best->Entry);
+ CoresDrained += Best->Entry->CoreCount;
+
+ AsyncAgentEntry* Drained = Best->Entry;
+ Candidates.erase(
+ std::remove_if(Candidates.begin(), Candidates.end(), [Drained](const Candidate& C) { return C.Entry == Drained; }),
+ Candidates.end());
+ }
}
}
}
@@ -101,266 +236,395 @@ HordeProvisioner::GetStats() const
uint32_t
HordeProvisioner::GetAgentCount() const
{
- std::lock_guard<std::mutex> Lock(m_AgentsLock);
- return static_cast<uint32_t>(m_Agents.size());
+ std::lock_guard<std::mutex> Lock(m_AsyncAgentsLock);
+ return static_cast<uint32_t>(m_AsyncAgents.size());
}
-void
-HordeProvisioner::RequestAgent()
+compute::AgentProvisioningStatus
+HordeProvisioner::GetAgentStatus(std::string_view WorkerId) const
{
- m_EstimatedCoreCount.fetch_add(EstimatedCoresPerAgent);
+ // Worker IDs are "horde-{LeaseId}" - strip the prefix to match lease ID
+ constexpr std::string_view Prefix = "horde-";
+ if (!WorkerId.starts_with(Prefix))
+ {
+ return compute::AgentProvisioningStatus::Unknown;
+ }
+ std::string_view LeaseId = WorkerId.substr(Prefix.size());
- std::lock_guard<std::mutex> Lock(m_AgentsLock);
+ std::lock_guard<std::mutex> AsyncLock(m_AsyncAgentsLock);
+ for (const auto& Entry : m_AsyncAgents)
+ {
+ if (Entry.LeaseId == LeaseId)
+ {
+ if (Entry.Draining)
+ {
+ return compute::AgentProvisioningStatus::Draining;
+ }
+ return compute::AgentProvisioningStatus::Active;
+ }
+ }
- auto Wrapper = std::make_unique<AgentWrapper>();
- AgentWrapper& Ref = *Wrapper;
- Wrapper->Thread = std::thread([this, &Ref] { ThreadAgent(Ref); });
+ // Check recently-drained agents that have already been cleaned up
+ std::string WorkerIdStr(WorkerId);
+ if (m_RecentlyDrainedWorkerIds.erase(WorkerIdStr) > 0)
+ {
+ // Also remove from the ordering queue so size accounting stays consistent.
+ auto It = std::find(m_RecentlyDrainedOrder.begin(), m_RecentlyDrainedOrder.end(), WorkerIdStr);
+ if (It != m_RecentlyDrainedOrder.end())
+ {
+ m_RecentlyDrainedOrder.erase(It);
+ }
+ return compute::AgentProvisioningStatus::Draining;
+ }
- m_Agents.push_back(std::move(Wrapper));
+ return compute::AgentProvisioningStatus::Unknown;
}
-void
-HordeProvisioner::ThreadAgent(AgentWrapper& Wrapper)
+std::vector<std::string>
+HordeProvisioner::BuildAgentArgs(const MachineInfo& Machine) const
{
- ZEN_TRACE_CPU("HordeProvisioner::ThreadAgent");
+ std::vector<std::string> Args;
+ Args.emplace_back("compute");
+ Args.emplace_back("--http=asio");
+ Args.push_back(fmt::format("--port={}", m_Config.ZenServicePort));
+ Args.emplace_back("--data-dir=%UE_HORDE_SHARED_DIR%\\zen");
- static std::atomic<uint32_t> ThreadIndex{0};
- const uint32_t CurrentIndex = ThreadIndex.fetch_add(1);
+ if (m_CleanStart)
+ {
+ Args.emplace_back("--clean");
+ }
- zen::SetCurrentThreadName(fmt::format("horde_agent_{}", CurrentIndex));
+ if (!m_OrchestratorEndpoint.empty())
+ {
+ ExtendableStringBuilder<256> CoordArg;
+ CoordArg << "--coordinator-endpoint=" << m_OrchestratorEndpoint;
+ Args.emplace_back(CoordArg.ToView());
+ }
- std::unique_ptr<HordeAgent> Agent;
- uint32_t MachineCoreCount = 0;
+ {
+ ExtendableStringBuilder<128> IdArg;
+ IdArg << "--instance-id=horde-" << Machine.LeaseId;
+ Args.emplace_back(IdArg.ToView());
+ }
- auto _ = MakeGuard([&] {
- if (Agent)
- {
- Agent->CloseConnection();
- }
- Wrapper.ShouldExit.store(true);
- });
+ if (!m_CoordinatorSession.empty())
+ {
+ ExtendableStringBuilder<128> SessionArg;
+ SessionArg << "--coordinator-session=" << m_CoordinatorSession;
+ Args.emplace_back(SessionArg.ToView());
+ }
+ if (!m_TraceHost.empty())
{
- // EstimatedCoreCount is incremented speculatively when the agent is requested
- // (in RequestAgent) so that SetTargetCoreCount doesn't over-provision.
- auto $ = MakeGuard([&] { m_EstimatedCoreCount.fetch_sub(EstimatedCoresPerAgent); });
+ ExtendableStringBuilder<128> TraceArg;
+ TraceArg << "--tracehost=" << m_TraceHost;
+ Args.emplace_back(TraceArg.ToView());
+ }
+ // In relay mode, the remote zenserver's local address is not reachable from the
+ // orchestrator. Pass the relay-visible endpoint so it announces the correct URL.
+ if (Machine.Mode == ConnectionMode::Relay)
+ {
+ const auto [Addr, Port] = Machine.GetZenServiceEndpoint(m_Config.ZenServicePort);
+ if (Addr.find(':') != std::string::npos)
+ {
+ Args.push_back(fmt::format("--announce-url=http://[{}]:{}", Addr, Port));
+ }
+ else
{
- ZEN_TRACE_CPU("HordeProvisioner::CreateBundles");
+ Args.push_back(fmt::format("--announce-url=http://{}:{}", Addr, Port));
+ }
+ }
- std::lock_guard<std::mutex> BundleLock(m_BundleLock);
+ return Args;
+}
- if (!m_BundlesCreated)
- {
- const std::filesystem::path OutputDir = m_WorkingDir / "horde_bundles";
+bool
+HordeProvisioner::InitializeHordeClient()
+{
+ ZEN_TRACE_CPU("HordeProvisioner::InitializeHordeClient");
+
+ std::lock_guard<std::mutex> BundleLock(m_BundleLock);
- std::vector<BundleFile> Files;
+ if (!m_BundlesCreated)
+ {
+ const std::filesystem::path OutputDir = m_WorkingDir / "horde_bundles";
+
+ std::vector<BundleFile> Files;
#if ZEN_PLATFORM_WINDOWS
- Files.emplace_back(m_BinariesPath / "zenserver.exe", false);
+ Files.emplace_back(m_BinariesPath / "zenserver.exe", false);
+ Files.emplace_back(m_BinariesPath / "zenserver.pdb", true);
#elif ZEN_PLATFORM_LINUX
- Files.emplace_back(m_BinariesPath / "zenserver", false);
- Files.emplace_back(m_BinariesPath / "zenserver.debug", true);
+ Files.emplace_back(m_BinariesPath / "zenserver", false);
+ Files.emplace_back(m_BinariesPath / "zenserver.debug", true);
#elif ZEN_PLATFORM_MAC
- Files.emplace_back(m_BinariesPath / "zenserver", false);
+ Files.emplace_back(m_BinariesPath / "zenserver", false);
#endif
- BundleResult Result;
- if (!BundleCreator::CreateBundle(Files, OutputDir, Result))
- {
- ZEN_WARN("failed to create bundle, cannot provision any agents!");
- m_AskForAgents.store(false);
- return;
- }
-
- m_Bundles.emplace_back(Result.Locator, Result.BundleDir);
- m_BundlesCreated = true;
- }
-
- if (!m_HordeClient)
- {
- m_HordeClient = std::make_unique<HordeClient>(m_Config);
- if (!m_HordeClient->Initialize())
- {
- ZEN_WARN("failed to initialize Horde HTTP client, cannot provision any agents!");
- m_AskForAgents.store(false);
- return;
- }
- }
+ BundleResult Result;
+ if (!BundleCreator::CreateBundle(Files, OutputDir, Result))
+ {
+ ZEN_WARN("failed to create bundle, cannot provision any agents!");
+ m_AskForAgents.store(false);
+ m_ShutdownEvent.Set();
+ return false;
}
- if (!m_AskForAgents.load())
+ m_Bundles.emplace_back(Result.Locator, Result.BundleDir);
+ m_BundlesCreated = true;
+ }
+
+ if (!m_HordeClient)
+ {
+ m_HordeClient = std::make_unique<HordeClient>(m_Config);
+ if (!m_HordeClient->Initialize())
{
- return;
+ ZEN_WARN("failed to initialize Horde HTTP client, cannot provision any agents!");
+ m_AskForAgents.store(false);
+ m_ShutdownEvent.Set();
+ return false;
}
+ }
- m_AgentsRequesting.fetch_add(1);
- auto ReqGuard = MakeGuard([this] { m_AgentsRequesting.fetch_sub(1); });
+ return true;
+}
- // Simple backoff: if the last machine request failed, wait up to 5 seconds
- // before trying again.
- //
- // Note however that it's possible that multiple threads enter this code at
- // the same time if multiple agents are requested at once, and they will all
- // see the same last failure time and back off accordingly. We might want to
- // use a semaphore or similar to limit the number of concurrent requests.
+void
+HordeProvisioner::RequestAgent()
+{
+ m_EstimatedCoreCount.fetch_add(EstimatedCoresPerAgent);
- if (const uint64_t LastFail = m_LastRequestFailTime.load(); LastFail != 0)
- {
- auto Now = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
- const uint64_t ElapsedNs = Now - LastFail;
- const uint64_t ElapsedMs = ElapsedNs / 1'000'000;
- if (ElapsedMs < 5000)
- {
- const uint64_t WaitMs = 5000 - ElapsedMs;
- for (uint64_t Waited = 0; Waited < WaitMs && !Wrapper.ShouldExit.load(); Waited += 100)
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- }
+ if (m_PendingWorkItems.fetch_add(1) == 0)
+ {
+ m_AllWorkDone.Reset();
+ }
- if (Wrapper.ShouldExit.load())
+ GetSmallWorkerPool(EWorkloadType::Background)
+ .ScheduleWork(
+ [this] {
+ ProvisionAgent();
+ if (m_PendingWorkItems.fetch_sub(1) == 1)
{
- return;
+ m_AllWorkDone.Set();
}
- }
- }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+}
- if (m_ActiveCoreCount.load() >= m_TargetCoreCount.load())
- {
- return;
- }
+void
+HordeProvisioner::ProvisionAgent()
+{
+ ZEN_TRACE_CPU("HordeProvisioner::ProvisionAgent");
+
+ // EstimatedCoreCount is incremented speculatively when the agent is requested
+ // (in RequestAgent) so that SetTargetCoreCount doesn't over-provision.
+ auto $ = MakeGuard([&] { m_EstimatedCoreCount.fetch_sub(EstimatedCoresPerAgent); });
- std::string RequestBody = m_HordeClient->BuildRequestBody();
+ if (!InitializeHordeClient())
+ {
+ return;
+ }
- // Resolve cluster if needed
- std::string ClusterId = m_Config.Cluster;
- if (ClusterId == HordeConfig::ClusterAuto)
+ if (!m_AskForAgents.load())
+ {
+ return;
+ }
+
+ m_AgentsRequesting.fetch_add(1);
+ auto ReqGuard = MakeGuard([this] { m_AgentsRequesting.fetch_sub(1); });
+
+ // Simple backoff: if the last machine request failed, wait up to 5 seconds
+ // before trying again.
+ //
+ // Note however that it's possible that multiple threads enter this code at
+ // the same time if multiple agents are requested at once, and they will all
+ // see the same last failure time and back off accordingly. We might want to
+ // use a semaphore or similar to limit the number of concurrent requests.
+
+ if (const uint64_t LastFail = m_LastRequestFailTime.load(); LastFail != 0)
+ {
+ auto Now = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
+ const uint64_t ElapsedNs = Now - LastFail;
+ const uint64_t ElapsedMs = ElapsedNs / 1'000'000;
+ if (ElapsedMs < 5000)
{
- ClusterInfo Cluster;
- if (!m_HordeClient->ResolveCluster(RequestBody, Cluster))
+ // Wait on m_ShutdownEvent so shutdown wakes this pool thread immediately instead
+ // of stalling for up to 5s in 100ms sleep chunks. Wait() returns true iff the
+ // event was signaled (shutdown); false means the backoff elapsed normally.
+ const uint64_t WaitMs = 5000 - ElapsedMs;
+ if (m_ShutdownEvent.Wait(static_cast<int>(WaitMs)))
{
- ZEN_WARN("failed to resolve cluster");
- m_LastRequestFailTime.store(static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
return;
}
- ClusterId = Cluster.ClusterId;
}
+ }
- MachineInfo Machine;
- if (!m_HordeClient->RequestMachine(RequestBody, ClusterId, /* out */ Machine) || !Machine.IsValid())
+ if (m_ActiveCoreCount.load() >= m_TargetCoreCount.load())
+ {
+ return;
+ }
+
+ std::string RequestBody = m_HordeClient->BuildRequestBody();
+
+ // Resolve cluster if needed
+ std::string ClusterId = m_Config.Cluster;
+ if (ClusterId == HordeConfig::ClusterAuto)
+ {
+ ClusterInfo Cluster;
+ if (!m_HordeClient->ResolveCluster(RequestBody, Cluster))
{
+ ZEN_WARN("failed to resolve cluster");
m_LastRequestFailTime.store(static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
return;
}
+ ClusterId = Cluster.ClusterId;
+ }
- m_LastRequestFailTime.store(0);
+ ZEN_INFO("requesting machine from Horde (cluster='{}', cores={}/{})",
+ ClusterId.empty() ? "default" : ClusterId.c_str(),
+ m_ActiveCoreCount.load(),
+ m_TargetCoreCount.load());
- if (Wrapper.ShouldExit.load())
- {
- return;
- }
+ MachineInfo Machine;
+ if (!m_HordeClient->RequestMachine(RequestBody, ClusterId, /* out */ Machine) || !Machine.IsValid())
+ {
+ m_LastRequestFailTime.store(static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
+ return;
+ }
- // Connect to agent and perform handshake
- Agent = std::make_unique<HordeAgent>(Machine);
- if (!Agent->IsValid())
- {
- ZEN_WARN("agent creation failed for {}:{}", Machine.GetConnectionAddress(), Machine.GetConnectionPort());
- return;
- }
+ m_LastRequestFailTime.store(0);
- if (!Agent->BeginCommunication())
- {
- ZEN_WARN("BeginCommunication failed");
- return;
- }
+ if (!m_AskForAgents.load())
+ {
+ return;
+ }
- for (auto& [Locator, BundleDir] : m_Bundles)
+ AsyncAgentConfig AgentConfig;
+ AgentConfig.Machine = Machine;
+ AgentConfig.Bundles = m_Bundles;
+ AgentConfig.Args = BuildAgentArgs(Machine);
+
+#if ZEN_PLATFORM_WINDOWS
+ AgentConfig.UseWine = !Machine.IsWindows;
+ AgentConfig.Executable = "zenserver.exe";
+#else
+ AgentConfig.UseWine = false;
+ AgentConfig.Executable = "zenserver";
+#endif
+
+ auto AsyncAgent = std::make_shared<AsyncHordeAgent>(*m_IoContext);
+
+ AsyncAgentEntry Entry;
+ Entry.Agent = AsyncAgent;
+ Entry.LeaseId = Machine.LeaseId;
+ Entry.CoreCount = Machine.LogicalCores;
+
+ const auto [EndpointAddr, EndpointPort] = Machine.GetZenServiceEndpoint(m_Config.ZenServicePort);
+ if (EndpointAddr.find(':') != std::string::npos)
+ {
+ Entry.RemoteEndpoint = fmt::format("http://[{}]:{}", EndpointAddr, EndpointPort);
+ }
+ else
+ {
+ Entry.RemoteEndpoint = fmt::format("http://{}:{}", EndpointAddr, EndpointPort);
+ }
+
+ {
+ std::lock_guard<std::mutex> Lock(m_AsyncAgentsLock);
+ m_AsyncAgents.push_back(std::move(Entry));
+ }
+
+ AsyncAgent->Start(std::move(AgentConfig), [this, AsyncAgent](const AsyncAgentResult& Result) {
+ if (Result.CoreCount > 0)
{
- if (Wrapper.ShouldExit.load())
+ // Only subtract estimated cores if not already subtracted by DrainAsyncAgent
+ bool WasDraining = false;
{
- return;
+ std::lock_guard<std::mutex> Lock(m_AsyncAgentsLock);
+ for (const auto& Entry : m_AsyncAgents)
+ {
+ if (Entry.Agent == AsyncAgent)
+ {
+ WasDraining = Entry.Draining;
+ break;
+ }
+ }
}
- if (!Agent->UploadBinaries(BundleDir, Locator))
+ if (!WasDraining)
{
- ZEN_WARN("UploadBinaries failed");
- return;
+ m_EstimatedCoreCount.fetch_sub(Result.CoreCount);
}
+ m_ActiveCoreCount.fetch_sub(Result.CoreCount);
+ m_AgentsActive.fetch_sub(1);
}
+ OnAsyncAgentDone(AsyncAgent);
+ });
- if (Wrapper.ShouldExit.load())
- {
- return;
- }
-
- // Build command line for remote zenserver
- std::vector<std::string> ArgStrings;
- ArgStrings.push_back("compute");
- ArgStrings.push_back("--http=asio");
+ // Track active cores (estimated was already added by RequestAgent)
+ m_EstimatedCoreCount.fetch_add(Machine.LogicalCores);
+ m_ActiveCoreCount.fetch_add(Machine.LogicalCores);
+ m_AgentsActive.fetch_add(1);
+}
- // TEMP HACK - these should be made fully dynamic
- // these are currently here to allow spawning the compute agent locally
- // for debugging purposes (i.e with a local Horde Server+Agent setup)
- ArgStrings.push_back(fmt::format("--port={}", m_Config.ZenServicePort));
- ArgStrings.push_back("--data-dir=c:\\temp\\123");
+void
+HordeProvisioner::DrainAsyncAgent(AsyncAgentEntry& Entry)
+{
+ Entry.Draining = true;
+ m_EstimatedCoreCount.fetch_sub(Entry.CoreCount);
+ m_AgentsDraining.fetch_add(1);
- if (!m_OrchestratorEndpoint.empty())
- {
- ExtendableStringBuilder<256> CoordArg;
- CoordArg << "--coordinator-endpoint=" << m_OrchestratorEndpoint;
- ArgStrings.emplace_back(CoordArg.ToView());
- }
+ HttpClientSettings Settings;
+ Settings.LogCategory = "horde.drain";
+ Settings.ConnectTimeout = std::chrono::milliseconds{5000};
+ Settings.Timeout = std::chrono::milliseconds{10000};
- {
- ExtendableStringBuilder<128> IdArg;
- IdArg << "--instance-id=horde-" << Machine.LeaseId;
- ArgStrings.emplace_back(IdArg.ToView());
- }
+ try
+ {
+ HttpClient Client(Entry.RemoteEndpoint, Settings);
- std::vector<const char*> Args;
- Args.reserve(ArgStrings.size());
- for (const std::string& Arg : ArgStrings)
+ HttpClient::Response Response = Client.Post("/compute/session/drain");
+ if (!Response.IsSuccess())
{
- Args.push_back(Arg.c_str());
+ ZEN_WARN("drain[{}]: POST session/drain failed: HTTP {}", Entry.LeaseId, static_cast<int>(Response.StatusCode));
+ return;
}
-#if ZEN_PLATFORM_WINDOWS
- const bool UseWine = !Machine.IsWindows;
- const char* AppName = "zenserver.exe";
-#else
- const bool UseWine = false;
- const char* AppName = "zenserver";
-#endif
-
- Agent->Execute(AppName, Args.data(), Args.size(), nullptr, nullptr, 0, UseWine);
-
- ZEN_INFO("remote execution started on [{}:{}] lease={}",
- Machine.GetConnectionAddress(),
- Machine.GetConnectionPort(),
- Machine.LeaseId);
-
- MachineCoreCount = Machine.LogicalCores;
- m_EstimatedCoreCount.fetch_add(MachineCoreCount);
- m_ActiveCoreCount.fetch_add(MachineCoreCount);
- m_AgentsActive.fetch_add(1);
+ ZEN_INFO("drain[{}]: session/drain accepted, sending sunset", Entry.LeaseId);
+ (void)Client.Post("/compute/session/sunset");
}
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("drain[{}]: exception: {}", Entry.LeaseId, Ex.what());
+ }
+}
- // Agent poll loop
-
- auto ActiveGuard = MakeGuard([&]() {
- m_EstimatedCoreCount.fetch_sub(MachineCoreCount);
- m_ActiveCoreCount.fetch_sub(MachineCoreCount);
- m_AgentsActive.fetch_sub(1);
- });
-
- while (Agent->IsValid() && !Wrapper.ShouldExit.load())
+void
+HordeProvisioner::OnAsyncAgentDone(std::shared_ptr<AsyncHordeAgent> Agent)
+{
+ std::lock_guard<std::mutex> Lock(m_AsyncAgentsLock);
+ for (auto It = m_AsyncAgents.begin(); It != m_AsyncAgents.end(); ++It)
{
- const bool LogOutput = false;
- if (!Agent->Poll(LogOutput))
+ if (It->Agent == Agent)
{
+ if (It->Draining)
+ {
+ m_AgentsDraining.fetch_sub(1);
+ std::string WorkerId = "horde-" + It->LeaseId;
+ if (m_RecentlyDrainedWorkerIds.insert(WorkerId).second)
+ {
+ m_RecentlyDrainedOrder.push_back(WorkerId);
+ while (m_RecentlyDrainedOrder.size() > RecentlyDrainedCapacity)
+ {
+ m_RecentlyDrainedWorkerIds.erase(m_RecentlyDrainedOrder.front());
+ m_RecentlyDrainedOrder.pop_front();
+ }
+ }
+ }
+ m_AsyncAgents.erase(It);
break;
}
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}