aboutsummaryrefslogtreecommitdiff
path: root/src/zenovermind/overmindprovisioner.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-14 16:18:23 +0200
committerStefan Boberg <[email protected]>2026-04-14 16:18:23 +0200
commit053b7373357d2555bac111b94c6909bc148f24ac (patch)
tree456a8ce2a1b38ff6aef342324f7fa4c17fdadd30 /src/zenovermind/overmindprovisioner.cpp
parent5.8.4 (diff)
downloadzen-sb/compute-overmind.tar.xz
zen-sb/compute-overmind.zip
Add Overmind provisioner alongside Horde and Nomadsb/compute-overmind
Introduces the zenovermind module with an HTTP client targeting the Overmind REST gateway (/v1/jobs) and a management-thread provisioner that schedules, polls, and cancels jobs following the same pattern as the existing Nomad provisioner. Wired into the compute server with full CLI options (--overmind-*), lifecycle management, and maintenance tick support behind the ZEN_WITH_OVERMIND compile flag.
Diffstat (limited to 'src/zenovermind/overmindprovisioner.cpp')
-rw-r--r--src/zenovermind/overmindprovisioner.cpp262
1 files changed, 262 insertions, 0 deletions
diff --git a/src/zenovermind/overmindprovisioner.cpp b/src/zenovermind/overmindprovisioner.cpp
new file mode 100644
index 000000000..9464fbaed
--- /dev/null
+++ b/src/zenovermind/overmindprovisioner.cpp
@@ -0,0 +1,262 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenovermind/overmindclient.h>
+#include <zenovermind/overmindprovisioner.h>
+
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/thread.h>
+#include <zencore/trace.h>
+
+#include <chrono>
+
+namespace zen::overmind {
+
+OvermindProvisioner::OvermindProvisioner(const OvermindConfig& Config,
+ std::string_view OrchestratorEndpoint,
+ std::string_view CoordinatorSession,
+ bool CleanStart,
+ std::string_view TraceHost)
+: m_Config(Config)
+, m_OrchestratorEndpoint(OrchestratorEndpoint)
+, m_CoordinatorSession(CoordinatorSession)
+, m_CleanStart(CleanStart)
+, m_TraceHost(TraceHost)
+, m_Log(zen::logging::Get("overmind.provisioner"))
+{
+ ZEN_DEBUG("initializing provisioner (server: {}, namespace: {}, max_cores: {}, cores_per_job: {}, max_jobs: {})",
+ m_Config.ServerUrl,
+ m_Config.Namespace,
+ m_Config.MaxCores,
+ m_Config.CoresPerJob,
+ m_Config.MaxJobs);
+
+ m_Client = std::make_unique<OvermindClient>(m_Config);
+ if (!m_Client->Initialize())
+ {
+ ZEN_ERROR("failed to initialize Overmind HTTP client");
+ return;
+ }
+
+ ZEN_DEBUG("Overmind HTTP client initialized, starting management thread");
+
+ m_Thread = std::thread([this] { ManagementThread(); });
+}
+
+OvermindProvisioner::~OvermindProvisioner()
+{
+ ZEN_DEBUG("provisioner shutting down");
+
+ m_ShouldExit.store(true);
+ m_WakeCV.notify_all();
+
+ if (m_Thread.joinable())
+ {
+ m_Thread.join();
+ }
+
+ CancelAllJobs();
+
+ ZEN_DEBUG("provisioner shutdown complete");
+}
+
+void
+OvermindProvisioner::SetTargetCoreCount(uint32_t Count)
+{
+ const uint32_t Clamped = std::min(Count, static_cast<uint32_t>(m_Config.MaxCores));
+ const uint32_t Previous = m_TargetCoreCount.exchange(Clamped);
+
+ if (Clamped != Previous)
+ {
+ ZEN_DEBUG("target core count changed: {} -> {}", Previous, Clamped);
+ }
+
+ m_WakeCV.notify_all();
+}
+
+OvermindProvisioningStats
+OvermindProvisioner::GetStats() const
+{
+ OvermindProvisioningStats Stats;
+ Stats.TargetCoreCount = m_TargetCoreCount.load();
+ Stats.EstimatedCoreCount = m_EstimatedCoreCount.load();
+ Stats.RunningJobCount = m_RunningJobCount.load();
+
+ {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ Stats.ActiveJobCount = static_cast<uint32_t>(m_Jobs.size());
+ }
+
+ return Stats;
+}
+
+void
+OvermindProvisioner::ManagementThread()
+{
+ ZEN_TRACE_CPU("Overmind_Mgmt");
+ zen::SetCurrentThreadName("overmind_mgmt");
+
+ ZEN_INFO("Overmind management thread started");
+
+ while (!m_ShouldExit.load())
+ {
+ ZEN_DEBUG("management cycle: target={} estimated={} running={} active={}",
+ m_TargetCoreCount.load(),
+ m_EstimatedCoreCount.load(),
+ m_RunningJobCount.load(),
+ [this] {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ return m_Jobs.size();
+ }());
+
+ SubmitNewJobs();
+ PollExistingJobs();
+ CleanupFinishedJobs();
+
+ // Wait up to 5 seconds or until woken
+ std::unique_lock<std::mutex> Lock(m_WakeMutex);
+ m_WakeCV.wait_for(Lock, std::chrono::seconds(5), [this] { return m_ShouldExit.load(); });
+ }
+
+ ZEN_INFO("Overmind management thread exiting");
+}
+
+void
+OvermindProvisioner::SubmitNewJobs()
+{
+ ZEN_TRACE_CPU("OvermindProvisioner::SubmitNewJobs");
+
+ const uint32_t CoresPerJob = static_cast<uint32_t>(m_Config.CoresPerJob);
+
+ while (m_EstimatedCoreCount.load() < m_TargetCoreCount.load())
+ {
+ {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ if (static_cast<int>(m_Jobs.size()) >= m_Config.MaxJobs)
+ {
+ ZEN_INFO("Overmind max jobs limit reached ({})", m_Config.MaxJobs);
+ break;
+ }
+ }
+
+ if (m_ShouldExit.load())
+ {
+ break;
+ }
+
+ const uint32_t Index = m_JobIndex.fetch_add(1);
+
+ ExtendableStringBuilder<128> NameBuilder;
+ NameBuilder << m_Config.JobName << "-" << Index;
+ const std::string JobName(NameBuilder.ToView());
+
+ ZEN_DEBUG("scheduling job '{}' (estimated: {}, target: {})", JobName, m_EstimatedCoreCount.load(), m_TargetCoreCount.load());
+
+ const std::string JobJson =
+ m_Client->BuildJobJson(JobName, m_OrchestratorEndpoint, m_CoordinatorSession, m_CleanStart, m_TraceHost);
+
+ OvermindJobInfo JobInfo;
+ if (!m_Client->ScheduleJob(JobJson, JobInfo))
+ {
+ ZEN_WARN("failed to schedule Overmind job '{}'", JobName);
+ break;
+ }
+
+ TrackedJob Tracked;
+ Tracked.Id = JobInfo.Id;
+ Tracked.Status = JobInfo.Status;
+ Tracked.Cores = static_cast<int>(CoresPerJob);
+
+ {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ m_Jobs.push_back(std::move(Tracked));
+ }
+
+ m_EstimatedCoreCount.fetch_add(CoresPerJob);
+
+ ZEN_INFO("Overmind job '{}' (id: {}) scheduled (estimated cores: {})", JobName, JobInfo.Id, m_EstimatedCoreCount.load());
+ }
+}
+
+void
+OvermindProvisioner::PollExistingJobs()
+{
+ ZEN_TRACE_CPU("OvermindProvisioner::PollExistingJobs");
+
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+
+ for (auto& Job : m_Jobs)
+ {
+ if (m_ShouldExit.load())
+ {
+ break;
+ }
+
+ OvermindJobInfo Info;
+ if (!m_Client->GetJobStatus(Job.Id, Info))
+ {
+ ZEN_DEBUG("failed to poll status for job '{}'", Job.Id);
+ continue;
+ }
+
+ const std::string PrevStatus = Job.Status;
+ Job.Status = Info.Status;
+
+ if (PrevStatus != Job.Status)
+ {
+ ZEN_INFO("Overmind job '{}' status changed: {} -> {}", Job.Id, PrevStatus, Job.Status);
+
+ if (Job.Status == "STATUS_RUNNING" && PrevStatus != "STATUS_RUNNING")
+ {
+ m_RunningJobCount.fetch_add(1);
+ }
+ else if (Job.Status != "STATUS_RUNNING" && PrevStatus == "STATUS_RUNNING")
+ {
+ m_RunningJobCount.fetch_sub(1);
+ }
+ }
+ }
+}
+
+void
+OvermindProvisioner::CleanupFinishedJobs()
+{
+ ZEN_TRACE_CPU("OvermindProvisioner::CleanupFinishedJobs");
+
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+
+ for (auto It = m_Jobs.begin(); It != m_Jobs.end();)
+ {
+ if (It->Status == "STATUS_COMPLETE" || It->Status == "STATUS_ERROR")
+ {
+ ZEN_INFO("Overmind job '{}' finished (status: {}), removing from tracked jobs", It->Id, It->Status);
+ m_EstimatedCoreCount.fetch_sub(static_cast<uint32_t>(It->Cores));
+ It = m_Jobs.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+void
+OvermindProvisioner::CancelAllJobs()
+{
+ ZEN_TRACE_CPU("OvermindProvisioner::CancelAllJobs");
+
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+
+ for (const auto& Job : m_Jobs)
+ {
+ ZEN_INFO("cancelling Overmind job '{}' during shutdown", Job.Id);
+ m_Client->CancelJob(Job.Id);
+ }
+
+ m_Jobs.clear();
+ m_EstimatedCoreCount.store(0);
+ m_RunningJobCount.store(0);
+}
+
+} // namespace zen::overmind