aboutsummaryrefslogtreecommitdiff
path: root/src/zennomad/nomadprovisioner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zennomad/nomadprovisioner.cpp')
-rw-r--r--src/zennomad/nomadprovisioner.cpp264
1 files changed, 264 insertions, 0 deletions
diff --git a/src/zennomad/nomadprovisioner.cpp b/src/zennomad/nomadprovisioner.cpp
new file mode 100644
index 000000000..3fe9c0ac3
--- /dev/null
+++ b/src/zennomad/nomadprovisioner.cpp
@@ -0,0 +1,264 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zennomad/nomadclient.h>
+#include <zennomad/nomadprovisioner.h>
+
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/process.h>
+#include <zencore/scopeguard.h>
+#include <zencore/thread.h>
+#include <zencore/trace.h>
+
+#include <chrono>
+
+namespace zen::nomad {
+
+NomadProvisioner::NomadProvisioner(const NomadConfig& Config, std::string_view OrchestratorEndpoint)
+: m_Config(Config)
+, m_OrchestratorEndpoint(OrchestratorEndpoint)
+, m_ProcessId(static_cast<uint32_t>(zen::GetCurrentProcessId()))
+, m_Log(zen::logging::Get("nomad.provisioner"))
+{
+ ZEN_DEBUG("initializing provisioner (server: {}, driver: {}, max_cores: {}, cores_per_job: {}, max_jobs: {})",
+ m_Config.ServerUrl,
+ ToString(m_Config.TaskDriver),
+ m_Config.MaxCores,
+ m_Config.CoresPerJob,
+ m_Config.MaxJobs);
+
+ m_Client = std::make_unique<NomadClient>(m_Config);
+ if (!m_Client->Initialize())
+ {
+ ZEN_ERROR("failed to initialize Nomad HTTP client");
+ return;
+ }
+
+ ZEN_DEBUG("Nomad HTTP client initialized, starting management thread");
+
+ m_Thread = std::thread([this] { ManagementThread(); });
+}
+
+NomadProvisioner::~NomadProvisioner()
+{
+ ZEN_DEBUG("provisioner shutting down");
+
+ m_ShouldExit.store(true);
+ m_WakeCV.notify_all();
+
+ if (m_Thread.joinable())
+ {
+ m_Thread.join();
+ }
+
+ StopAllJobs();
+
+ ZEN_DEBUG("provisioner shutdown complete");
+}
+
+void
+NomadProvisioner::SetTargetCoreCount(uint32_t Count)
+{
+ const uint32_t Clamped = std::min(Count, static_cast<uint32_t>(m_Config.MaxCores));
+ const uint32_t Previous = m_TargetCoreCount.exchange(Clamped);
+
+ if (Clamped != Previous)
+ {
+ ZEN_DEBUG("target core count changed: {} -> {}", Previous, Clamped);
+ }
+
+ m_WakeCV.notify_all();
+}
+
+NomadProvisioningStats
+NomadProvisioner::GetStats() const
+{
+ NomadProvisioningStats Stats;
+ Stats.TargetCoreCount = m_TargetCoreCount.load();
+ Stats.EstimatedCoreCount = m_EstimatedCoreCount.load();
+ Stats.RunningJobCount = m_RunningJobCount.load();
+
+ {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ Stats.ActiveJobCount = static_cast<uint32_t>(m_Jobs.size());
+ }
+
+ return Stats;
+}
+
+std::string
+NomadProvisioner::GenerateJobId()
+{
+ const uint32_t Index = m_JobIndex.fetch_add(1);
+
+ ExtendableStringBuilder<128> Builder;
+ Builder << m_Config.JobPrefix << "-" << m_ProcessId << "-" << Index;
+ return std::string(Builder.ToView());
+}
+
+void
+NomadProvisioner::ManagementThread()
+{
+ ZEN_TRACE_CPU("Nomad_Mgmt");
+ zen::SetCurrentThreadName("nomad_mgmt");
+
+ ZEN_INFO("Nomad management thread started");
+
+ while (!m_ShouldExit.load())
+ {
+ ZEN_DEBUG("management cycle: target={} estimated={} running={} active={}",
+ m_TargetCoreCount.load(),
+ m_EstimatedCoreCount.load(),
+ m_RunningJobCount.load(),
+ [this] {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ return m_Jobs.size();
+ }());
+
+ SubmitNewJobs();
+ PollExistingJobs();
+ CleanupDeadJobs();
+
+ // Wait up to 5 seconds or until woken
+ std::unique_lock<std::mutex> Lock(m_WakeMutex);
+ m_WakeCV.wait_for(Lock, std::chrono::seconds(5), [this] { return m_ShouldExit.load(); });
+ }
+
+ ZEN_INFO("Nomad management thread exiting");
+}
+
+void
+NomadProvisioner::SubmitNewJobs()
+{
+ ZEN_TRACE_CPU("NomadProvisioner::SubmitNewJobs");
+
+ const uint32_t CoresPerJob = static_cast<uint32_t>(m_Config.CoresPerJob);
+
+ while (m_EstimatedCoreCount.load() < m_TargetCoreCount.load())
+ {
+ {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ if (static_cast<int>(m_Jobs.size()) >= m_Config.MaxJobs)
+ {
+ ZEN_INFO("Nomad max jobs limit reached ({})", m_Config.MaxJobs);
+ break;
+ }
+ }
+
+ if (m_ShouldExit.load())
+ {
+ break;
+ }
+
+ const std::string JobId = GenerateJobId();
+
+ ZEN_DEBUG("submitting job '{}' (estimated: {}, target: {})", JobId, m_EstimatedCoreCount.load(), m_TargetCoreCount.load());
+
+ const std::string JobJson = m_Client->BuildJobJson(JobId, m_OrchestratorEndpoint);
+
+ NomadJobInfo JobInfo;
+ JobInfo.Id = JobId;
+
+ if (!m_Client->SubmitJob(JobJson, JobInfo))
+ {
+ ZEN_WARN("failed to submit Nomad job '{}'", JobId);
+ break;
+ }
+
+ TrackedJob Tracked;
+ Tracked.JobId = JobId;
+ Tracked.Status = "pending";
+ Tracked.Cores = static_cast<int>(CoresPerJob);
+
+ {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ m_Jobs.push_back(std::move(Tracked));
+ }
+
+ m_EstimatedCoreCount.fetch_add(CoresPerJob);
+
+ ZEN_INFO("Nomad job '{}' submitted (estimated cores: {})", JobId, m_EstimatedCoreCount.load());
+ }
+}
+
+void
+NomadProvisioner::PollExistingJobs()
+{
+ ZEN_TRACE_CPU("NomadProvisioner::PollExistingJobs");
+
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+
+ for (auto& Job : m_Jobs)
+ {
+ if (m_ShouldExit.load())
+ {
+ break;
+ }
+
+ NomadJobInfo Info;
+ if (!m_Client->GetJobStatus(Job.JobId, Info))
+ {
+ ZEN_DEBUG("failed to poll status for job '{}'", Job.JobId);
+ continue;
+ }
+
+ const std::string PrevStatus = Job.Status;
+ Job.Status = Info.Status;
+
+ if (PrevStatus != Job.Status)
+ {
+ ZEN_INFO("Nomad job '{}' status changed: {} -> {}", Job.JobId, PrevStatus, Job.Status);
+
+ if (Job.Status == "running" && PrevStatus != "running")
+ {
+ m_RunningJobCount.fetch_add(1);
+ }
+ else if (Job.Status != "running" && PrevStatus == "running")
+ {
+ m_RunningJobCount.fetch_sub(1);
+ }
+ }
+ }
+}
+
+void
+NomadProvisioner::CleanupDeadJobs()
+{
+ ZEN_TRACE_CPU("NomadProvisioner::CleanupDeadJobs");
+
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+
+ for (auto It = m_Jobs.begin(); It != m_Jobs.end();)
+ {
+ if (It->Status == "dead")
+ {
+ ZEN_INFO("Nomad job '{}' is dead, removing from tracked jobs", It->JobId);
+ m_EstimatedCoreCount.fetch_sub(static_cast<uint32_t>(It->Cores));
+ It = m_Jobs.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+void
+NomadProvisioner::StopAllJobs()
+{
+ ZEN_TRACE_CPU("NomadProvisioner::StopAllJobs");
+
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+
+ for (const auto& Job : m_Jobs)
+ {
+ ZEN_INFO("stopping Nomad job '{}' during shutdown", Job.JobId);
+ m_Client->StopJob(Job.JobId);
+ }
+
+ m_Jobs.clear();
+ m_EstimatedCoreCount.store(0);
+ m_RunningJobCount.store(0);
+}
+
+} // namespace zen::nomad