diff options
Diffstat (limited to 'src/zenhorde/hordeprovisioner.cpp')
| -rw-r--r-- | src/zenhorde/hordeprovisioner.cpp | 682 |
1 files changed, 473 insertions, 209 deletions
diff --git a/src/zenhorde/hordeprovisioner.cpp b/src/zenhorde/hordeprovisioner.cpp index f88c95da2..ea0ea1e83 100644 --- a/src/zenhorde/hordeprovisioner.cpp +++ b/src/zenhorde/hordeprovisioner.cpp @@ -6,49 +6,83 @@ #include "hordeagent.h" #include "hordebundle.h" +#include <zencore/compactbinary.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/thread.h> #include <zencore/trace.h> +#include <zenhttp/httpclient.h> +#include <zenutil/workerpools.h> +ZEN_THIRD_PARTY_INCLUDES_START +#include <asio.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <algorithm> #include <chrono> #include <thread> namespace zen::horde { -struct HordeProvisioner::AgentWrapper -{ - std::thread Thread; - std::atomic<bool> ShouldExit{false}; -}; - HordeProvisioner::HordeProvisioner(const HordeConfig& Config, const std::filesystem::path& BinariesPath, const std::filesystem::path& WorkingDir, - std::string_view OrchestratorEndpoint) + std::string_view OrchestratorEndpoint, + std::string_view CoordinatorSession, + bool CleanStart, + std::string_view TraceHost) : m_Config(Config) , m_BinariesPath(BinariesPath) , m_WorkingDir(WorkingDir) , m_OrchestratorEndpoint(OrchestratorEndpoint) +, m_CoordinatorSession(CoordinatorSession) +, m_CleanStart(CleanStart) +, m_TraceHost(TraceHost) , m_Log(zen::logging::Get("horde.provisioner")) { + m_IoContext = std::make_unique<asio::io_context>(); + + auto Work = asio::make_work_guard(*m_IoContext); + for (int i = 0; i < IoThreadCount; ++i) + { + m_IoThreads.emplace_back([this, i, Work] { + zen::SetCurrentThreadName(fmt::format("horde_io_{}", i)); + m_IoContext->run(); + }); + } } HordeProvisioner::~HordeProvisioner() { - std::lock_guard<std::mutex> Lock(m_AgentsLock); - for (auto& Agent : m_Agents) + m_AskForAgents.store(false); + m_ShutdownEvent.Set(); + + // Shut down async agents and io_context { - Agent->ShouldExit.store(true); + std::lock_guard<std::mutex> Lock(m_AsyncAgentsLock); + for (auto& Entry : m_AsyncAgents) + { + Entry.Agent->Cancel(); + } + m_AsyncAgents.clear(); } - for (auto& Agent : m_Agents) + + m_IoContext->stop(); + + for (auto& Thread : m_IoThreads) { - if (Agent->Thread.joinable()) + if (Thread.joinable()) { - Agent->Thread.join(); + Thread.join(); } } + + // Wait for all pool work items to finish before destroying members they reference + if (m_PendingWorkItems.load() > 0) + { + m_AllWorkDone.Wait(); + } } void @@ -56,9 +90,23 @@ HordeProvisioner::SetTargetCoreCount(uint32_t Count) { ZEN_TRACE_CPU("HordeProvisioner::SetTargetCoreCount"); - m_TargetCoreCount.store(std::min(Count, static_cast<uint32_t>(m_Config.MaxCores))); + const uint32_t ClampedCount = std::min(Count, static_cast<uint32_t>(m_Config.MaxCores)); + const uint32_t PreviousTarget = m_TargetCoreCount.exchange(ClampedCount); + + if (ClampedCount != PreviousTarget) + { + ZEN_INFO("target core count changed: {} -> {} (active={}, estimated={})", + PreviousTarget, + ClampedCount, + m_ActiveCoreCount.load(), + m_EstimatedCoreCount.load()); + } - while (m_EstimatedCoreCount.load() < m_TargetCoreCount.load()) + // Only provision if the gap is at least one agent-sized chunk. Without + // this, draining a 32-core agent to cover a 28-core excess would leave a + // 4-core gap that triggers a 32-core provision, which triggers another + // drain, ad infinitum. + while (m_EstimatedCoreCount.load() + EstimatedCoresPerAgent <= m_TargetCoreCount.load()) { if (!m_AskForAgents.load()) { @@ -67,21 +115,108 @@ HordeProvisioner::SetTargetCoreCount(uint32_t Count) RequestAgent(); } - // Clean up finished agent threads - std::lock_guard<std::mutex> Lock(m_AgentsLock); - for (auto It = m_Agents.begin(); It != m_Agents.end();) + // Scale down async agents { - if ((*It)->ShouldExit.load()) + std::lock_guard<std::mutex> AsyncLock(m_AsyncAgentsLock); + + uint32_t AsyncActive = m_ActiveCoreCount.load(); + uint32_t AsyncTarget = m_TargetCoreCount.load(); + + uint32_t AlreadyDrainingCores = 0; + for (const auto& Entry : m_AsyncAgents) { - if ((*It)->Thread.joinable()) + if (Entry.Draining) { - (*It)->Thread.join(); + AlreadyDrainingCores += Entry.CoreCount; } - It = m_Agents.erase(It); } - else + + uint32_t EffectiveAsync = (AsyncActive > AlreadyDrainingCores) ? AsyncActive - AlreadyDrainingCores : 0; + + if (EffectiveAsync > AsyncTarget) { - ++It; + struct Candidate + { + AsyncAgentEntry* Entry; + int Workload; + }; + std::vector<Candidate> Candidates; + + for (auto& Entry : m_AsyncAgents) + { + if (Entry.Draining || Entry.RemoteEndpoint.empty()) + { + continue; + } + + int Workload = 0; + bool Reachable = false; + HttpClientSettings Settings; + Settings.LogCategory = "horde.drain"; + Settings.ConnectTimeout = std::chrono::milliseconds{2000}; + Settings.Timeout = std::chrono::milliseconds{3000}; + try + { + HttpClient Client(Entry.RemoteEndpoint, Settings); + HttpClient::Response Resp = Client.Get("/compute/session/status"); + if (Resp.IsSuccess()) + { + CbObject Status = Resp.AsObject(); + Workload = Status["actions_pending"].AsInt32(0) + Status["actions_running"].AsInt32(0); + Reachable = true; + } + } + catch (const std::exception& Ex) + { + ZEN_DEBUG("agent lease={} not yet reachable for drain: {}", Entry.LeaseId, Ex.what()); + } + + if (Reachable) + { + Candidates.push_back({&Entry, Workload}); + } + } + + const uint32_t ExcessCores = EffectiveAsync - AsyncTarget; + uint32_t CoresDrained = 0; + + while (CoresDrained < ExcessCores && !Candidates.empty()) + { + const uint32_t Remaining = ExcessCores - CoresDrained; + + Candidates.erase(std::remove_if(Candidates.begin(), + Candidates.end(), + [Remaining](const Candidate& C) { return C.Entry->CoreCount > Remaining; }), + Candidates.end()); + + if (Candidates.empty()) + { + break; + } + + Candidate* Best = &Candidates[0]; + for (auto& C : Candidates) + { + if (C.Entry->CoreCount > Best->Entry->CoreCount || + (C.Entry->CoreCount == Best->Entry->CoreCount && C.Workload < Best->Workload)) + { + Best = &C; + } + } + + ZEN_INFO("draining async agent lease={} ({} cores, workload={})", + Best->Entry->LeaseId, + Best->Entry->CoreCount, + Best->Workload); + + DrainAsyncAgent(*Best->Entry); + CoresDrained += Best->Entry->CoreCount; + + AsyncAgentEntry* Drained = Best->Entry; + Candidates.erase( + std::remove_if(Candidates.begin(), Candidates.end(), [Drained](const Candidate& C) { return C.Entry == Drained; }), + Candidates.end()); + } } } } @@ -101,266 +236,395 @@ HordeProvisioner::GetStats() const uint32_t HordeProvisioner::GetAgentCount() const { - std::lock_guard<std::mutex> Lock(m_AgentsLock); - return static_cast<uint32_t>(m_Agents.size()); + std::lock_guard<std::mutex> Lock(m_AsyncAgentsLock); + return static_cast<uint32_t>(m_AsyncAgents.size()); } -void -HordeProvisioner::RequestAgent() +compute::AgentProvisioningStatus +HordeProvisioner::GetAgentStatus(std::string_view WorkerId) const { - m_EstimatedCoreCount.fetch_add(EstimatedCoresPerAgent); + // Worker IDs are "horde-{LeaseId}" - strip the prefix to match lease ID + constexpr std::string_view Prefix = "horde-"; + if (!WorkerId.starts_with(Prefix)) + { + return compute::AgentProvisioningStatus::Unknown; + } + std::string_view LeaseId = WorkerId.substr(Prefix.size()); - std::lock_guard<std::mutex> Lock(m_AgentsLock); + std::lock_guard<std::mutex> AsyncLock(m_AsyncAgentsLock); + for (const auto& Entry : m_AsyncAgents) + { + if (Entry.LeaseId == LeaseId) + { + if (Entry.Draining) + { + return compute::AgentProvisioningStatus::Draining; + } + return compute::AgentProvisioningStatus::Active; + } + } - auto Wrapper = std::make_unique<AgentWrapper>(); - AgentWrapper& Ref = *Wrapper; - Wrapper->Thread = std::thread([this, &Ref] { ThreadAgent(Ref); }); + // Check recently-drained agents that have already been cleaned up + std::string WorkerIdStr(WorkerId); + if (m_RecentlyDrainedWorkerIds.erase(WorkerIdStr) > 0) + { + // Also remove from the ordering queue so size accounting stays consistent. + auto It = std::find(m_RecentlyDrainedOrder.begin(), m_RecentlyDrainedOrder.end(), WorkerIdStr); + if (It != m_RecentlyDrainedOrder.end()) + { + m_RecentlyDrainedOrder.erase(It); + } + return compute::AgentProvisioningStatus::Draining; + } - m_Agents.push_back(std::move(Wrapper)); + return compute::AgentProvisioningStatus::Unknown; } -void -HordeProvisioner::ThreadAgent(AgentWrapper& Wrapper) +std::vector<std::string> +HordeProvisioner::BuildAgentArgs(const MachineInfo& Machine) const { - ZEN_TRACE_CPU("HordeProvisioner::ThreadAgent"); + std::vector<std::string> Args; + Args.emplace_back("compute"); + Args.emplace_back("--http=asio"); + Args.push_back(fmt::format("--port={}", m_Config.ZenServicePort)); + Args.emplace_back("--data-dir=%UE_HORDE_SHARED_DIR%\\zen"); - static std::atomic<uint32_t> ThreadIndex{0}; - const uint32_t CurrentIndex = ThreadIndex.fetch_add(1); + if (m_CleanStart) + { + Args.emplace_back("--clean"); + } - zen::SetCurrentThreadName(fmt::format("horde_agent_{}", CurrentIndex)); + if (!m_OrchestratorEndpoint.empty()) + { + ExtendableStringBuilder<256> CoordArg; + CoordArg << "--coordinator-endpoint=" << m_OrchestratorEndpoint; + Args.emplace_back(CoordArg.ToView()); + } - std::unique_ptr<HordeAgent> Agent; - uint32_t MachineCoreCount = 0; + { + ExtendableStringBuilder<128> IdArg; + IdArg << "--instance-id=horde-" << Machine.LeaseId; + Args.emplace_back(IdArg.ToView()); + } - auto _ = MakeGuard([&] { - if (Agent) - { - Agent->CloseConnection(); - } - Wrapper.ShouldExit.store(true); - }); + if (!m_CoordinatorSession.empty()) + { + ExtendableStringBuilder<128> SessionArg; + SessionArg << "--coordinator-session=" << m_CoordinatorSession; + Args.emplace_back(SessionArg.ToView()); + } + if (!m_TraceHost.empty()) { - // EstimatedCoreCount is incremented speculatively when the agent is requested - // (in RequestAgent) so that SetTargetCoreCount doesn't over-provision. - auto $ = MakeGuard([&] { m_EstimatedCoreCount.fetch_sub(EstimatedCoresPerAgent); }); + ExtendableStringBuilder<128> TraceArg; + TraceArg << "--tracehost=" << m_TraceHost; + Args.emplace_back(TraceArg.ToView()); + } + // In relay mode, the remote zenserver's local address is not reachable from the + // orchestrator. Pass the relay-visible endpoint so it announces the correct URL. + if (Machine.Mode == ConnectionMode::Relay) + { + const auto [Addr, Port] = Machine.GetZenServiceEndpoint(m_Config.ZenServicePort); + if (Addr.find(':') != std::string::npos) + { + Args.push_back(fmt::format("--announce-url=http://[{}]:{}", Addr, Port)); + } + else { - ZEN_TRACE_CPU("HordeProvisioner::CreateBundles"); + Args.push_back(fmt::format("--announce-url=http://{}:{}", Addr, Port)); + } + } - std::lock_guard<std::mutex> BundleLock(m_BundleLock); + return Args; +} - if (!m_BundlesCreated) - { - const std::filesystem::path OutputDir = m_WorkingDir / "horde_bundles"; +bool +HordeProvisioner::InitializeHordeClient() +{ + ZEN_TRACE_CPU("HordeProvisioner::InitializeHordeClient"); + + std::lock_guard<std::mutex> BundleLock(m_BundleLock); - std::vector<BundleFile> Files; + if (!m_BundlesCreated) + { + const std::filesystem::path OutputDir = m_WorkingDir / "horde_bundles"; + + std::vector<BundleFile> Files; #if ZEN_PLATFORM_WINDOWS - Files.emplace_back(m_BinariesPath / "zenserver.exe", false); + Files.emplace_back(m_BinariesPath / "zenserver.exe", false); + Files.emplace_back(m_BinariesPath / "zenserver.pdb", true); #elif ZEN_PLATFORM_LINUX - Files.emplace_back(m_BinariesPath / "zenserver", false); - Files.emplace_back(m_BinariesPath / "zenserver.debug", true); + Files.emplace_back(m_BinariesPath / "zenserver", false); + Files.emplace_back(m_BinariesPath / "zenserver.debug", true); #elif ZEN_PLATFORM_MAC - Files.emplace_back(m_BinariesPath / "zenserver", false); + Files.emplace_back(m_BinariesPath / "zenserver", false); #endif - BundleResult Result; - if (!BundleCreator::CreateBundle(Files, OutputDir, Result)) - { - ZEN_WARN("failed to create bundle, cannot provision any agents!"); - m_AskForAgents.store(false); - return; - } - - m_Bundles.emplace_back(Result.Locator, Result.BundleDir); - m_BundlesCreated = true; - } - - if (!m_HordeClient) - { - m_HordeClient = std::make_unique<HordeClient>(m_Config); - if (!m_HordeClient->Initialize()) - { - ZEN_WARN("failed to initialize Horde HTTP client, cannot provision any agents!"); - m_AskForAgents.store(false); - return; - } - } + BundleResult Result; + if (!BundleCreator::CreateBundle(Files, OutputDir, Result)) + { + ZEN_WARN("failed to create bundle, cannot provision any agents!"); + m_AskForAgents.store(false); + m_ShutdownEvent.Set(); + return false; } - if (!m_AskForAgents.load()) + m_Bundles.emplace_back(Result.Locator, Result.BundleDir); + m_BundlesCreated = true; + } + + if (!m_HordeClient) + { + m_HordeClient = std::make_unique<HordeClient>(m_Config); + if (!m_HordeClient->Initialize()) { - return; + ZEN_WARN("failed to initialize Horde HTTP client, cannot provision any agents!"); + m_AskForAgents.store(false); + m_ShutdownEvent.Set(); + return false; } + } - m_AgentsRequesting.fetch_add(1); - auto ReqGuard = MakeGuard([this] { m_AgentsRequesting.fetch_sub(1); }); + return true; +} - // Simple backoff: if the last machine request failed, wait up to 5 seconds - // before trying again. - // - // Note however that it's possible that multiple threads enter this code at - // the same time if multiple agents are requested at once, and they will all - // see the same last failure time and back off accordingly. We might want to - // use a semaphore or similar to limit the number of concurrent requests. +void +HordeProvisioner::RequestAgent() +{ + m_EstimatedCoreCount.fetch_add(EstimatedCoresPerAgent); - if (const uint64_t LastFail = m_LastRequestFailTime.load(); LastFail != 0) - { - auto Now = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()); - const uint64_t ElapsedNs = Now - LastFail; - const uint64_t ElapsedMs = ElapsedNs / 1'000'000; - if (ElapsedMs < 5000) - { - const uint64_t WaitMs = 5000 - ElapsedMs; - for (uint64_t Waited = 0; Waited < WaitMs && !Wrapper.ShouldExit.load(); Waited += 100) - { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } + if (m_PendingWorkItems.fetch_add(1) == 0) + { + m_AllWorkDone.Reset(); + } - if (Wrapper.ShouldExit.load()) + GetSmallWorkerPool(EWorkloadType::Background) + .ScheduleWork( + [this] { + ProvisionAgent(); + if (m_PendingWorkItems.fetch_sub(1) == 1) { - return; + m_AllWorkDone.Set(); } - } - } + }, + WorkerThreadPool::EMode::EnableBacklog); +} - if (m_ActiveCoreCount.load() >= m_TargetCoreCount.load()) - { - return; - } +void +HordeProvisioner::ProvisionAgent() +{ + ZEN_TRACE_CPU("HordeProvisioner::ProvisionAgent"); + + // EstimatedCoreCount is incremented speculatively when the agent is requested + // (in RequestAgent) so that SetTargetCoreCount doesn't over-provision. + auto $ = MakeGuard([&] { m_EstimatedCoreCount.fetch_sub(EstimatedCoresPerAgent); }); - std::string RequestBody = m_HordeClient->BuildRequestBody(); + if (!InitializeHordeClient()) + { + return; + } - // Resolve cluster if needed - std::string ClusterId = m_Config.Cluster; - if (ClusterId == HordeConfig::ClusterAuto) + if (!m_AskForAgents.load()) + { + return; + } + + m_AgentsRequesting.fetch_add(1); + auto ReqGuard = MakeGuard([this] { m_AgentsRequesting.fetch_sub(1); }); + + // Simple backoff: if the last machine request failed, wait up to 5 seconds + // before trying again. + // + // Note however that it's possible that multiple threads enter this code at + // the same time if multiple agents are requested at once, and they will all + // see the same last failure time and back off accordingly. We might want to + // use a semaphore or similar to limit the number of concurrent requests. + + if (const uint64_t LastFail = m_LastRequestFailTime.load(); LastFail != 0) + { + auto Now = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()); + const uint64_t ElapsedNs = Now - LastFail; + const uint64_t ElapsedMs = ElapsedNs / 1'000'000; + if (ElapsedMs < 5000) { - ClusterInfo Cluster; - if (!m_HordeClient->ResolveCluster(RequestBody, Cluster)) + // Wait on m_ShutdownEvent so shutdown wakes this pool thread immediately instead + // of stalling for up to 5s in 100ms sleep chunks. Wait() returns true iff the + // event was signaled (shutdown); false means the backoff elapsed normally. + const uint64_t WaitMs = 5000 - ElapsedMs; + if (m_ShutdownEvent.Wait(static_cast<int>(WaitMs))) { - ZEN_WARN("failed to resolve cluster"); - m_LastRequestFailTime.store(static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count())); return; } - ClusterId = Cluster.ClusterId; } + } - MachineInfo Machine; - if (!m_HordeClient->RequestMachine(RequestBody, ClusterId, /* out */ Machine) || !Machine.IsValid()) + if (m_ActiveCoreCount.load() >= m_TargetCoreCount.load()) + { + return; + } + + std::string RequestBody = m_HordeClient->BuildRequestBody(); + + // Resolve cluster if needed + std::string ClusterId = m_Config.Cluster; + if (ClusterId == HordeConfig::ClusterAuto) + { + ClusterInfo Cluster; + if (!m_HordeClient->ResolveCluster(RequestBody, Cluster)) { + ZEN_WARN("failed to resolve cluster"); m_LastRequestFailTime.store(static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count())); return; } + ClusterId = Cluster.ClusterId; + } - m_LastRequestFailTime.store(0); + ZEN_INFO("requesting machine from Horde (cluster='{}', cores={}/{})", + ClusterId.empty() ? "default" : ClusterId.c_str(), + m_ActiveCoreCount.load(), + m_TargetCoreCount.load()); - if (Wrapper.ShouldExit.load()) - { - return; - } + MachineInfo Machine; + if (!m_HordeClient->RequestMachine(RequestBody, ClusterId, /* out */ Machine) || !Machine.IsValid()) + { + m_LastRequestFailTime.store(static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count())); + return; + } - // Connect to agent and perform handshake - Agent = std::make_unique<HordeAgent>(Machine); - if (!Agent->IsValid()) - { - ZEN_WARN("agent creation failed for {}:{}", Machine.GetConnectionAddress(), Machine.GetConnectionPort()); - return; - } + m_LastRequestFailTime.store(0); - if (!Agent->BeginCommunication()) - { - ZEN_WARN("BeginCommunication failed"); - return; - } + if (!m_AskForAgents.load()) + { + return; + } - for (auto& [Locator, BundleDir] : m_Bundles) + AsyncAgentConfig AgentConfig; + AgentConfig.Machine = Machine; + AgentConfig.Bundles = m_Bundles; + AgentConfig.Args = BuildAgentArgs(Machine); + +#if ZEN_PLATFORM_WINDOWS + AgentConfig.UseWine = !Machine.IsWindows; + AgentConfig.Executable = "zenserver.exe"; +#else + AgentConfig.UseWine = false; + AgentConfig.Executable = "zenserver"; +#endif + + auto AsyncAgent = std::make_shared<AsyncHordeAgent>(*m_IoContext); + + AsyncAgentEntry Entry; + Entry.Agent = AsyncAgent; + Entry.LeaseId = Machine.LeaseId; + Entry.CoreCount = Machine.LogicalCores; + + const auto [EndpointAddr, EndpointPort] = Machine.GetZenServiceEndpoint(m_Config.ZenServicePort); + if (EndpointAddr.find(':') != std::string::npos) + { + Entry.RemoteEndpoint = fmt::format("http://[{}]:{}", EndpointAddr, EndpointPort); + } + else + { + Entry.RemoteEndpoint = fmt::format("http://{}:{}", EndpointAddr, EndpointPort); + } + + { + std::lock_guard<std::mutex> Lock(m_AsyncAgentsLock); + m_AsyncAgents.push_back(std::move(Entry)); + } + + AsyncAgent->Start(std::move(AgentConfig), [this, AsyncAgent](const AsyncAgentResult& Result) { + if (Result.CoreCount > 0) { - if (Wrapper.ShouldExit.load()) + // Only subtract estimated cores if not already subtracted by DrainAsyncAgent + bool WasDraining = false; { - return; + std::lock_guard<std::mutex> Lock(m_AsyncAgentsLock); + for (const auto& Entry : m_AsyncAgents) + { + if (Entry.Agent == AsyncAgent) + { + WasDraining = Entry.Draining; + break; + } + } } - if (!Agent->UploadBinaries(BundleDir, Locator)) + if (!WasDraining) { - ZEN_WARN("UploadBinaries failed"); - return; + m_EstimatedCoreCount.fetch_sub(Result.CoreCount); } + m_ActiveCoreCount.fetch_sub(Result.CoreCount); + m_AgentsActive.fetch_sub(1); } + OnAsyncAgentDone(AsyncAgent); + }); - if (Wrapper.ShouldExit.load()) - { - return; - } - - // Build command line for remote zenserver - std::vector<std::string> ArgStrings; - ArgStrings.push_back("compute"); - ArgStrings.push_back("--http=asio"); + // Track active cores (estimated was already added by RequestAgent) + m_EstimatedCoreCount.fetch_add(Machine.LogicalCores); + m_ActiveCoreCount.fetch_add(Machine.LogicalCores); + m_AgentsActive.fetch_add(1); +} - // TEMP HACK - these should be made fully dynamic - // these are currently here to allow spawning the compute agent locally - // for debugging purposes (i.e with a local Horde Server+Agent setup) - ArgStrings.push_back(fmt::format("--port={}", m_Config.ZenServicePort)); - ArgStrings.push_back("--data-dir=c:\\temp\\123"); +void +HordeProvisioner::DrainAsyncAgent(AsyncAgentEntry& Entry) +{ + Entry.Draining = true; + m_EstimatedCoreCount.fetch_sub(Entry.CoreCount); + m_AgentsDraining.fetch_add(1); - if (!m_OrchestratorEndpoint.empty()) - { - ExtendableStringBuilder<256> CoordArg; - CoordArg << "--coordinator-endpoint=" << m_OrchestratorEndpoint; - ArgStrings.emplace_back(CoordArg.ToView()); - } + HttpClientSettings Settings; + Settings.LogCategory = "horde.drain"; + Settings.ConnectTimeout = std::chrono::milliseconds{5000}; + Settings.Timeout = std::chrono::milliseconds{10000}; - { - ExtendableStringBuilder<128> IdArg; - IdArg << "--instance-id=horde-" << Machine.LeaseId; - ArgStrings.emplace_back(IdArg.ToView()); - } + try + { + HttpClient Client(Entry.RemoteEndpoint, Settings); - std::vector<const char*> Args; - Args.reserve(ArgStrings.size()); - for (const std::string& Arg : ArgStrings) + HttpClient::Response Response = Client.Post("/compute/session/drain"); + if (!Response.IsSuccess()) { - Args.push_back(Arg.c_str()); + ZEN_WARN("drain[{}]: POST session/drain failed: HTTP {}", Entry.LeaseId, static_cast<int>(Response.StatusCode)); + return; } -#if ZEN_PLATFORM_WINDOWS - const bool UseWine = !Machine.IsWindows; - const char* AppName = "zenserver.exe"; -#else - const bool UseWine = false; - const char* AppName = "zenserver"; -#endif - - Agent->Execute(AppName, Args.data(), Args.size(), nullptr, nullptr, 0, UseWine); - - ZEN_INFO("remote execution started on [{}:{}] lease={}", - Machine.GetConnectionAddress(), - Machine.GetConnectionPort(), - Machine.LeaseId); - - MachineCoreCount = Machine.LogicalCores; - m_EstimatedCoreCount.fetch_add(MachineCoreCount); - m_ActiveCoreCount.fetch_add(MachineCoreCount); - m_AgentsActive.fetch_add(1); + ZEN_INFO("drain[{}]: session/drain accepted, sending sunset", Entry.LeaseId); + (void)Client.Post("/compute/session/sunset"); } + catch (const std::exception& Ex) + { + ZEN_WARN("drain[{}]: exception: {}", Entry.LeaseId, Ex.what()); + } +} - // Agent poll loop - - auto ActiveGuard = MakeGuard([&]() { - m_EstimatedCoreCount.fetch_sub(MachineCoreCount); - m_ActiveCoreCount.fetch_sub(MachineCoreCount); - m_AgentsActive.fetch_sub(1); - }); - - while (Agent->IsValid() && !Wrapper.ShouldExit.load()) +void +HordeProvisioner::OnAsyncAgentDone(std::shared_ptr<AsyncHordeAgent> Agent) +{ + std::lock_guard<std::mutex> Lock(m_AsyncAgentsLock); + for (auto It = m_AsyncAgents.begin(); It != m_AsyncAgents.end(); ++It) { - const bool LogOutput = false; - if (!Agent->Poll(LogOutput)) + if (It->Agent == Agent) { + if (It->Draining) + { + m_AgentsDraining.fetch_sub(1); + std::string WorkerId = "horde-" + It->LeaseId; + if (m_RecentlyDrainedWorkerIds.insert(WorkerId).second) + { + m_RecentlyDrainedOrder.push_back(WorkerId); + while (m_RecentlyDrainedOrder.size() > RecentlyDrainedCapacity) + { + m_RecentlyDrainedWorkerIds.erase(m_RecentlyDrainedOrder.front()); + m_RecentlyDrainedOrder.pop_front(); + } + } + } + m_AsyncAgents.erase(It); break; } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } |