// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include namespace zen::nomad { NomadProvisioner::NomadProvisioner(const NomadConfig& Config, std::string_view OrchestratorEndpoint) : m_Config(Config) , m_OrchestratorEndpoint(OrchestratorEndpoint) , m_ProcessId(static_cast(zen::GetCurrentProcessId())) , m_Log(zen::logging::Get("nomad.provisioner")) { ZEN_DEBUG("initializing provisioner (server: {}, driver: {}, max_cores: {}, cores_per_job: {}, max_jobs: {})", m_Config.ServerUrl, ToString(m_Config.TaskDriver), m_Config.MaxCores, m_Config.CoresPerJob, m_Config.MaxJobs); m_Client = std::make_unique(m_Config); if (!m_Client->Initialize()) { ZEN_ERROR("failed to initialize Nomad HTTP client"); return; } ZEN_DEBUG("Nomad HTTP client initialized, starting management thread"); m_Thread = std::thread([this] { ManagementThread(); }); } NomadProvisioner::~NomadProvisioner() { ZEN_DEBUG("provisioner shutting down"); m_ShouldExit.store(true); m_WakeCV.notify_all(); if (m_Thread.joinable()) { m_Thread.join(); } StopAllJobs(); ZEN_DEBUG("provisioner shutdown complete"); } void NomadProvisioner::SetTargetCoreCount(uint32_t Count) { const uint32_t Clamped = std::min(Count, static_cast(m_Config.MaxCores)); const uint32_t Previous = m_TargetCoreCount.exchange(Clamped); if (Clamped != Previous) { ZEN_DEBUG("target core count changed: {} -> {}", Previous, Clamped); } m_WakeCV.notify_all(); } NomadProvisioningStats NomadProvisioner::GetStats() const { NomadProvisioningStats Stats; Stats.TargetCoreCount = m_TargetCoreCount.load(); Stats.EstimatedCoreCount = m_EstimatedCoreCount.load(); Stats.RunningJobCount = m_RunningJobCount.load(); { std::lock_guard Lock(m_JobsLock); Stats.ActiveJobCount = static_cast(m_Jobs.size()); } return Stats; } std::string NomadProvisioner::GenerateJobId() { const uint32_t Index = m_JobIndex.fetch_add(1); ExtendableStringBuilder<128> Builder; Builder << m_Config.JobPrefix << "-" << m_ProcessId << "-" << Index; return std::string(Builder.ToView()); } void NomadProvisioner::ManagementThread() { ZEN_TRACE_CPU("Nomad_Mgmt"); zen::SetCurrentThreadName("nomad_mgmt"); ZEN_INFO("Nomad management thread started"); while (!m_ShouldExit.load()) { ZEN_DEBUG("management cycle: target={} estimated={} running={} active={}", m_TargetCoreCount.load(), m_EstimatedCoreCount.load(), m_RunningJobCount.load(), [this] { std::lock_guard Lock(m_JobsLock); return m_Jobs.size(); }()); SubmitNewJobs(); PollExistingJobs(); CleanupDeadJobs(); // Wait up to 5 seconds or until woken std::unique_lock Lock(m_WakeMutex); m_WakeCV.wait_for(Lock, std::chrono::seconds(5), [this] { return m_ShouldExit.load(); }); } ZEN_INFO("Nomad management thread exiting"); } void NomadProvisioner::SubmitNewJobs() { ZEN_TRACE_CPU("NomadProvisioner::SubmitNewJobs"); const uint32_t CoresPerJob = static_cast(m_Config.CoresPerJob); while (m_EstimatedCoreCount.load() < m_TargetCoreCount.load()) { { std::lock_guard Lock(m_JobsLock); if (static_cast(m_Jobs.size()) >= m_Config.MaxJobs) { ZEN_INFO("Nomad max jobs limit reached ({})", m_Config.MaxJobs); break; } } if (m_ShouldExit.load()) { break; } const std::string JobId = GenerateJobId(); ZEN_DEBUG("submitting job '{}' (estimated: {}, target: {})", JobId, m_EstimatedCoreCount.load(), m_TargetCoreCount.load()); const std::string JobJson = m_Client->BuildJobJson(JobId, m_OrchestratorEndpoint); NomadJobInfo JobInfo; JobInfo.Id = JobId; if (!m_Client->SubmitJob(JobJson, JobInfo)) { ZEN_WARN("failed to submit Nomad job '{}'", JobId); break; } TrackedJob Tracked; Tracked.JobId = JobId; Tracked.Status = "pending"; Tracked.Cores = static_cast(CoresPerJob); { std::lock_guard Lock(m_JobsLock); m_Jobs.push_back(std::move(Tracked)); } m_EstimatedCoreCount.fetch_add(CoresPerJob); ZEN_INFO("Nomad job '{}' submitted (estimated cores: {})", JobId, m_EstimatedCoreCount.load()); } } void NomadProvisioner::PollExistingJobs() { ZEN_TRACE_CPU("NomadProvisioner::PollExistingJobs"); std::lock_guard Lock(m_JobsLock); for (auto& Job : m_Jobs) { if (m_ShouldExit.load()) { break; } NomadJobInfo Info; if (!m_Client->GetJobStatus(Job.JobId, Info)) { ZEN_DEBUG("failed to poll status for job '{}'", Job.JobId); continue; } const std::string PrevStatus = Job.Status; Job.Status = Info.Status; if (PrevStatus != Job.Status) { ZEN_INFO("Nomad job '{}' status changed: {} -> {}", Job.JobId, PrevStatus, Job.Status); if (Job.Status == "running" && PrevStatus != "running") { m_RunningJobCount.fetch_add(1); } else if (Job.Status != "running" && PrevStatus == "running") { m_RunningJobCount.fetch_sub(1); } } } } void NomadProvisioner::CleanupDeadJobs() { ZEN_TRACE_CPU("NomadProvisioner::CleanupDeadJobs"); std::lock_guard Lock(m_JobsLock); for (auto It = m_Jobs.begin(); It != m_Jobs.end();) { if (It->Status == "dead") { ZEN_INFO("Nomad job '{}' is dead, removing from tracked jobs", It->JobId); m_EstimatedCoreCount.fetch_sub(static_cast(It->Cores)); It = m_Jobs.erase(It); } else { ++It; } } } void NomadProvisioner::StopAllJobs() { ZEN_TRACE_CPU("NomadProvisioner::StopAllJobs"); std::lock_guard Lock(m_JobsLock); for (const auto& Job : m_Jobs) { ZEN_INFO("stopping Nomad job '{}' during shutdown", Job.JobId); m_Client->StopJob(Job.JobId); } m_Jobs.clear(); m_EstimatedCoreCount.store(0); m_RunningJobCount.store(0); } } // namespace zen::nomad