diff options
Diffstat (limited to 'src/zennomad/nomadprovisioner.cpp')
| -rw-r--r-- | src/zennomad/nomadprovisioner.cpp | 264 |
1 files changed, 264 insertions, 0 deletions
diff --git a/src/zennomad/nomadprovisioner.cpp b/src/zennomad/nomadprovisioner.cpp new file mode 100644 index 000000000..3fe9c0ac3 --- /dev/null +++ b/src/zennomad/nomadprovisioner.cpp @@ -0,0 +1,264 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zennomad/nomadclient.h> +#include <zennomad/nomadprovisioner.h> + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/process.h> +#include <zencore/scopeguard.h> +#include <zencore/thread.h> +#include <zencore/trace.h> + +#include <chrono> + +namespace zen::nomad { + +NomadProvisioner::NomadProvisioner(const NomadConfig& Config, std::string_view OrchestratorEndpoint) +: m_Config(Config) +, m_OrchestratorEndpoint(OrchestratorEndpoint) +, m_ProcessId(static_cast<uint32_t>(zen::GetCurrentProcessId())) +, m_Log(zen::logging::Get("nomad.provisioner")) +{ + ZEN_DEBUG("initializing provisioner (server: {}, driver: {}, max_cores: {}, cores_per_job: {}, max_jobs: {})", + m_Config.ServerUrl, + ToString(m_Config.TaskDriver), + m_Config.MaxCores, + m_Config.CoresPerJob, + m_Config.MaxJobs); + + m_Client = std::make_unique<NomadClient>(m_Config); + if (!m_Client->Initialize()) + { + ZEN_ERROR("failed to initialize Nomad HTTP client"); + return; + } + + ZEN_DEBUG("Nomad HTTP client initialized, starting management thread"); + + m_Thread = std::thread([this] { ManagementThread(); }); +} + +NomadProvisioner::~NomadProvisioner() +{ + ZEN_DEBUG("provisioner shutting down"); + + m_ShouldExit.store(true); + m_WakeCV.notify_all(); + + if (m_Thread.joinable()) + { + m_Thread.join(); + } + + StopAllJobs(); + + ZEN_DEBUG("provisioner shutdown complete"); +} + +void +NomadProvisioner::SetTargetCoreCount(uint32_t Count) +{ + const uint32_t Clamped = std::min(Count, static_cast<uint32_t>(m_Config.MaxCores)); + const uint32_t Previous = m_TargetCoreCount.exchange(Clamped); + + if (Clamped != Previous) + { + ZEN_DEBUG("target core count changed: {} -> {}", Previous, Clamped); + } + + m_WakeCV.notify_all(); +} + +NomadProvisioningStats +NomadProvisioner::GetStats() const +{ + NomadProvisioningStats Stats; + Stats.TargetCoreCount = m_TargetCoreCount.load(); + Stats.EstimatedCoreCount = m_EstimatedCoreCount.load(); + Stats.RunningJobCount = m_RunningJobCount.load(); + + { + std::lock_guard<std::mutex> Lock(m_JobsLock); + Stats.ActiveJobCount = static_cast<uint32_t>(m_Jobs.size()); + } + + return Stats; +} + +std::string +NomadProvisioner::GenerateJobId() +{ + const uint32_t Index = m_JobIndex.fetch_add(1); + + ExtendableStringBuilder<128> Builder; + Builder << m_Config.JobPrefix << "-" << m_ProcessId << "-" << Index; + return std::string(Builder.ToView()); +} + +void +NomadProvisioner::ManagementThread() +{ + ZEN_TRACE_CPU("Nomad_Mgmt"); + zen::SetCurrentThreadName("nomad_mgmt"); + + ZEN_INFO("Nomad management thread started"); + + while (!m_ShouldExit.load()) + { + ZEN_DEBUG("management cycle: target={} estimated={} running={} active={}", + m_TargetCoreCount.load(), + m_EstimatedCoreCount.load(), + m_RunningJobCount.load(), + [this] { + std::lock_guard<std::mutex> Lock(m_JobsLock); + return m_Jobs.size(); + }()); + + SubmitNewJobs(); + PollExistingJobs(); + CleanupDeadJobs(); + + // Wait up to 5 seconds or until woken + std::unique_lock<std::mutex> Lock(m_WakeMutex); + m_WakeCV.wait_for(Lock, std::chrono::seconds(5), [this] { return m_ShouldExit.load(); }); + } + + ZEN_INFO("Nomad management thread exiting"); +} + +void +NomadProvisioner::SubmitNewJobs() +{ + ZEN_TRACE_CPU("NomadProvisioner::SubmitNewJobs"); + + const uint32_t CoresPerJob = static_cast<uint32_t>(m_Config.CoresPerJob); + + while (m_EstimatedCoreCount.load() < m_TargetCoreCount.load()) + { + { + std::lock_guard<std::mutex> Lock(m_JobsLock); + if (static_cast<int>(m_Jobs.size()) >= m_Config.MaxJobs) + { + ZEN_INFO("Nomad max jobs limit reached ({})", m_Config.MaxJobs); + break; + } + } + + if (m_ShouldExit.load()) + { + break; + } + + const std::string JobId = GenerateJobId(); + + ZEN_DEBUG("submitting job '{}' (estimated: {}, target: {})", JobId, m_EstimatedCoreCount.load(), m_TargetCoreCount.load()); + + const std::string JobJson = m_Client->BuildJobJson(JobId, m_OrchestratorEndpoint); + + NomadJobInfo JobInfo; + JobInfo.Id = JobId; + + if (!m_Client->SubmitJob(JobJson, JobInfo)) + { + ZEN_WARN("failed to submit Nomad job '{}'", JobId); + break; + } + + TrackedJob Tracked; + Tracked.JobId = JobId; + Tracked.Status = "pending"; + Tracked.Cores = static_cast<int>(CoresPerJob); + + { + std::lock_guard<std::mutex> Lock(m_JobsLock); + m_Jobs.push_back(std::move(Tracked)); + } + + m_EstimatedCoreCount.fetch_add(CoresPerJob); + + ZEN_INFO("Nomad job '{}' submitted (estimated cores: {})", JobId, m_EstimatedCoreCount.load()); + } +} + +void +NomadProvisioner::PollExistingJobs() +{ + ZEN_TRACE_CPU("NomadProvisioner::PollExistingJobs"); + + std::lock_guard<std::mutex> Lock(m_JobsLock); + + for (auto& Job : m_Jobs) + { + if (m_ShouldExit.load()) + { + break; + } + + NomadJobInfo Info; + if (!m_Client->GetJobStatus(Job.JobId, Info)) + { + ZEN_DEBUG("failed to poll status for job '{}'", Job.JobId); + continue; + } + + const std::string PrevStatus = Job.Status; + Job.Status = Info.Status; + + if (PrevStatus != Job.Status) + { + ZEN_INFO("Nomad job '{}' status changed: {} -> {}", Job.JobId, PrevStatus, Job.Status); + + if (Job.Status == "running" && PrevStatus != "running") + { + m_RunningJobCount.fetch_add(1); + } + else if (Job.Status != "running" && PrevStatus == "running") + { + m_RunningJobCount.fetch_sub(1); + } + } + } +} + +void +NomadProvisioner::CleanupDeadJobs() +{ + ZEN_TRACE_CPU("NomadProvisioner::CleanupDeadJobs"); + + std::lock_guard<std::mutex> Lock(m_JobsLock); + + for (auto It = m_Jobs.begin(); It != m_Jobs.end();) + { + if (It->Status == "dead") + { + ZEN_INFO("Nomad job '{}' is dead, removing from tracked jobs", It->JobId); + m_EstimatedCoreCount.fetch_sub(static_cast<uint32_t>(It->Cores)); + It = m_Jobs.erase(It); + } + else + { + ++It; + } + } +} + +void +NomadProvisioner::StopAllJobs() +{ + ZEN_TRACE_CPU("NomadProvisioner::StopAllJobs"); + + std::lock_guard<std::mutex> Lock(m_JobsLock); + + for (const auto& Job : m_Jobs) + { + ZEN_INFO("stopping Nomad job '{}' during shutdown", Job.JobId); + m_Client->StopJob(Job.JobId); + } + + m_Jobs.clear(); + m_EstimatedCoreCount.store(0); + m_RunningJobCount.store(0); +} + +} // namespace zen::nomad |