aboutsummaryrefslogtreecommitdiff
path: root/src/zenovermind
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-14 16:18:23 +0200
committerStefan Boberg <[email protected]>2026-04-14 16:18:23 +0200
commit053b7373357d2555bac111b94c6909bc148f24ac (patch)
tree456a8ce2a1b38ff6aef342324f7fa4c17fdadd30 /src/zenovermind
parent5.8.4 (diff)
downloadzen-sb/compute-overmind.tar.xz
zen-sb/compute-overmind.zip
Add Overmind provisioner alongside Horde and Nomadsb/compute-overmind
Introduces the zenovermind module with an HTTP client targeting the Overmind REST gateway (/v1/jobs) and a management-thread provisioner that schedules, polls, and cancels jobs following the same pattern as the existing Nomad provisioner. Wired into the compute server with full CLI options (--overmind-*), lifecycle management, and maintenance tick support behind the ZEN_WITH_OVERMIND compile flag.
Diffstat (limited to 'src/zenovermind')
-rw-r--r--src/zenovermind/include/zenovermind/overmindclient.h69
-rw-r--r--src/zenovermind/include/zenovermind/overmindconfig.h45
-rw-r--r--src/zenovermind/include/zenovermind/overmindprovisioner.h110
-rw-r--r--src/zenovermind/include/zenovermind/zenovermind.h9
-rw-r--r--src/zenovermind/overmindclient.cpp254
-rw-r--r--src/zenovermind/overmindconfig.cpp28
-rw-r--r--src/zenovermind/overmindprovisioner.cpp262
-rw-r--r--src/zenovermind/xmake.lua10
8 files changed, 787 insertions, 0 deletions
diff --git a/src/zenovermind/include/zenovermind/overmindclient.h b/src/zenovermind/include/zenovermind/overmindclient.h
new file mode 100644
index 000000000..68348b4a6
--- /dev/null
+++ b/src/zenovermind/include/zenovermind/overmindclient.h
@@ -0,0 +1,69 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenovermind/overmindconfig.h>
+
+#include <zencore/logbase.h>
+
+#include <memory>
+#include <string>
+
+namespace zen {
+class HttpClient;
+}
+
+namespace zen::overmind {
+
+/** Summary of an Overmind job returned by the REST API. */
+struct OvermindJobInfo
+{
+ std::string Id;
+ std::string Status; ///< "STATUS_PENDING", "STATUS_RUNNING", "STATUS_COMPLETE", "STATUS_ERROR", ...
+};
+
+/** HTTP client for the Overmind REST gateway (v1).
+ *
+ * Handles job scheduling, status polling, and job cancellation via the
+ * grpc-gateway REST endpoints on port 2580.
+ *
+ * All calls are synchronous. Thread safety: individual methods are
+ * not thread-safe; callers must synchronize access.
+ */
+class OvermindClient
+{
+public:
+ explicit OvermindClient(const OvermindConfig& Config);
+ ~OvermindClient();
+
+ OvermindClient(const OvermindClient&) = delete;
+ OvermindClient& operator=(const OvermindClient&) = delete;
+
+ /** Initialize the underlying HTTP client. Must be called before other methods. */
+ bool Initialize();
+
+ /** Build the JSON body for a ScheduleJob request. */
+ std::string BuildJobJson(const std::string& JobName,
+ const std::string& OrchestratorEndpoint,
+ const std::string& CoordinatorSession = {},
+ bool CleanStart = false,
+ const std::string& TraceHost = {}) const;
+
+ /** Schedule a job via POST /v1/jobs. On success, populates OutJob. */
+ bool ScheduleJob(const std::string& JobJson, OvermindJobInfo& OutJob);
+
+ /** Get the status of a job via GET /v1/jobs/{jobId}. */
+ bool GetJobStatus(const std::string& JobId, OvermindJobInfo& OutJob);
+
+ /** Cancel a job via DELETE /v1/jobs/{jobId}. */
+ bool CancelJob(const std::string& JobId);
+
+ LoggerRef Log() { return m_Log; }
+
+private:
+ OvermindConfig m_Config;
+ std::unique_ptr<zen::HttpClient> m_Http;
+ LoggerRef m_Log;
+};
+
+} // namespace zen::overmind
diff --git a/src/zenovermind/include/zenovermind/overmindconfig.h b/src/zenovermind/include/zenovermind/overmindconfig.h
new file mode 100644
index 000000000..a463e31ea
--- /dev/null
+++ b/src/zenovermind/include/zenovermind/overmindconfig.h
@@ -0,0 +1,45 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenovermind/zenovermind.h>
+
+#include <string>
+
+namespace zen::overmind {
+
+/** Configuration for Overmind worker provisioning.
+ *
+ * Specifies the Overmind REST gateway URL, authentication, namespace,
+ * region, and resource limits. Used by OvermindClient and OvermindProvisioner.
+ */
+struct OvermindConfig
+{
+ bool Enabled = false; ///< Whether Overmind provisioning is active
+ std::string ServerUrl; ///< Overmind REST gateway URL (e.g. "http://localhost:2580")
+ std::string AuthToken; ///< JWT bearer token for authentication
+
+ std::string Namespace; ///< Overmind namespace for job submission
+ std::string Region; ///< Target region (e.g. "REGION_US_EAST")
+
+ /** Overmind command reference for the zenserver binary in
+ * "namespace:name:version" format (e.g. "infra:zenserver:v1.0.0"). */
+ std::string CommandRef;
+
+ std::string Os = "OPERATING_SYSTEM_LINUX"; ///< Target operating system
+ std::string Arch = "CPU_ARCHITECTURE_X86_64"; ///< Target CPU architecture
+
+ std::string Memory = "4GiB"; ///< Memory per task
+ std::string Cpu = "2000m"; ///< CPU per task (millicores)
+
+ int MaxJobs = 64; ///< Maximum concurrent Overmind jobs
+ int CoresPerJob = 32; ///< Estimated cores per job (for scaling calculations)
+ int MaxCores = 2048; ///< Maximum total cores to provision
+
+ std::string JobName = "zenserver-worker"; ///< Name for generated Overmind jobs
+
+ /** Validate the configuration. Returns false if required fields are missing. */
+ bool Validate() const;
+};
+
+} // namespace zen::overmind
diff --git a/src/zenovermind/include/zenovermind/overmindprovisioner.h b/src/zenovermind/include/zenovermind/overmindprovisioner.h
new file mode 100644
index 000000000..cb0a84728
--- /dev/null
+++ b/src/zenovermind/include/zenovermind/overmindprovisioner.h
@@ -0,0 +1,110 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenovermind/overmindconfig.h>
+
+#include <zencore/logbase.h>
+
+#include <atomic>
+#include <condition_variable>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <vector>
+
+namespace zen::overmind {
+
+class OvermindClient;
+
+/** Snapshot of the current Overmind provisioning state, returned by OvermindProvisioner::GetStats(). */
+struct OvermindProvisioningStats
+{
+ uint32_t TargetCoreCount = 0; ///< Requested number of cores (clamped to MaxCores)
+ uint32_t EstimatedCoreCount = 0; ///< Cores expected from submitted jobs
+ uint32_t ActiveJobCount = 0; ///< Number of currently tracked Overmind jobs
+ uint32_t RunningJobCount = 0; ///< Number of jobs in running status
+};
+
+/** Job lifecycle manager for Overmind worker provisioning.
+ *
+ * Provisions remote compute workers by scheduling jobs via the Overmind
+ * REST gateway. Each job runs zenserver in compute mode, which
+ * announces itself back to the orchestrator.
+ *
+ * Uses a single management thread that periodically:
+ * 1. Submits new jobs when estimated cores < target cores
+ * 2. Polls existing jobs for status changes
+ * 3. Cleans up completed/failed jobs and adjusts counters
+ *
+ * Thread safety: SetTargetCoreCount and GetStats may be called from any thread.
+ */
+class OvermindProvisioner
+{
+public:
+ /** Construct a provisioner.
+ * @param Config Overmind connection and job configuration.
+ * @param OrchestratorEndpoint URL of the orchestrator that remote workers announce to. */
+ OvermindProvisioner(const OvermindConfig& Config,
+ std::string_view OrchestratorEndpoint,
+ std::string_view CoordinatorSession = {},
+ bool CleanStart = false,
+ std::string_view TraceHost = {});
+
+ /** Signals the management thread to exit and cancels all tracked jobs. */
+ ~OvermindProvisioner();
+
+ OvermindProvisioner(const OvermindProvisioner&) = delete;
+ OvermindProvisioner& operator=(const OvermindProvisioner&) = delete;
+
+ /** Set the target number of cores to provision.
+ * Clamped to OvermindConfig::MaxCores. The management thread will
+ * schedule new jobs to approach this target. */
+ void SetTargetCoreCount(uint32_t Count);
+
+ /** Return a snapshot of the current provisioning counters. */
+ OvermindProvisioningStats GetStats() const;
+
+private:
+ LoggerRef Log() { return m_Log; }
+
+ struct TrackedJob
+ {
+ std::string Id;
+ std::string Status; ///< Overmind status string
+ int Cores = 0;
+ };
+
+ void ManagementThread();
+ void SubmitNewJobs();
+ void PollExistingJobs();
+ void CleanupFinishedJobs();
+ void CancelAllJobs();
+
+ OvermindConfig m_Config;
+ std::string m_OrchestratorEndpoint;
+ std::string m_CoordinatorSession;
+ bool m_CleanStart = false;
+ std::string m_TraceHost;
+
+ std::unique_ptr<OvermindClient> m_Client;
+
+ mutable std::mutex m_JobsLock;
+ std::vector<TrackedJob> m_Jobs;
+ std::atomic<uint32_t> m_JobIndex{0};
+
+ std::atomic<uint32_t> m_TargetCoreCount{0};
+ std::atomic<uint32_t> m_EstimatedCoreCount{0};
+ std::atomic<uint32_t> m_RunningJobCount{0};
+
+ std::thread m_Thread;
+ std::mutex m_WakeMutex;
+ std::condition_variable m_WakeCV;
+ std::atomic<bool> m_ShouldExit{false};
+
+ LoggerRef m_Log;
+};
+
+} // namespace zen::overmind
diff --git a/src/zenovermind/include/zenovermind/zenovermind.h b/src/zenovermind/include/zenovermind/zenovermind.h
new file mode 100644
index 000000000..b7f451a16
--- /dev/null
+++ b/src/zenovermind/include/zenovermind/zenovermind.h
@@ -0,0 +1,9 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#if !defined(ZEN_WITH_OVERMIND)
+# define ZEN_WITH_OVERMIND 1
+#endif
diff --git a/src/zenovermind/overmindclient.cpp b/src/zenovermind/overmindclient.cpp
new file mode 100644
index 000000000..6d01437bf
--- /dev/null
+++ b/src/zenovermind/overmindclient.cpp
@@ -0,0 +1,254 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenovermind/overmindclient.h>
+
+#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
+#include <zencore/logging.h>
+#include <zencore/memoryview.h>
+#include <zencore/trace.h>
+#include <zenhttp/httpclient.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <json11.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen::overmind {
+
+namespace {
+
+ HttpClient::KeyValueMap MakeOvermindHeaders(const OvermindConfig& Config)
+ {
+ HttpClient::KeyValueMap Headers;
+ if (!Config.AuthToken.empty())
+ {
+ Headers->emplace("Authorization", "Bearer " + Config.AuthToken);
+ }
+ return Headers;
+ }
+
+} // namespace
+
+OvermindClient::OvermindClient(const OvermindConfig& Config) : m_Config(Config), m_Log(zen::logging::Get("overmind.client"))
+{
+}
+
+OvermindClient::~OvermindClient() = default;
+
+bool
+OvermindClient::Initialize()
+{
+ ZEN_TRACE_CPU("OvermindClient::Initialize");
+
+ HttpClientSettings Settings;
+ Settings.LogCategory = "overmind.http";
+ Settings.ConnectTimeout = std::chrono::milliseconds{10000};
+ Settings.Timeout = std::chrono::milliseconds{60000};
+ Settings.RetryCount = 1;
+
+ // Ensure the base URL ends with a slash so path concatenation works correctly
+ std::string BaseUrl = m_Config.ServerUrl;
+ if (!BaseUrl.empty() && BaseUrl.back() != '/')
+ {
+ BaseUrl += '/';
+ }
+
+ m_Http = std::make_unique<zen::HttpClient>(BaseUrl, Settings);
+
+ return true;
+}
+
+std::string
+OvermindClient::BuildJobJson(const std::string& JobName,
+ const std::string& OrchestratorEndpoint,
+ const std::string& CoordinatorSession,
+ bool CleanStart,
+ const std::string& TraceHost) const
+{
+ ZEN_TRACE_CPU("OvermindClient::BuildJobJson");
+
+ // Build the args array for the zenserver compute command
+ json11::Json::array Args;
+ Args.push_back("compute");
+ Args.push_back("--http=asio");
+
+ if (!OrchestratorEndpoint.empty())
+ {
+ ExtendableStringBuilder<256> CoordArg;
+ CoordArg << "--coordinator-endpoint=" << OrchestratorEndpoint;
+ Args.push_back(std::string(CoordArg.ToView()));
+ }
+
+ {
+ ExtendableStringBuilder<128> IdArg;
+ IdArg << "--instance-id=overmind-" << JobName;
+ Args.push_back(std::string(IdArg.ToView()));
+ }
+
+ if (!CoordinatorSession.empty())
+ {
+ ExtendableStringBuilder<128> SessionArg;
+ SessionArg << "--coordinator-session=" << CoordinatorSession;
+ Args.push_back(std::string(SessionArg.ToView()));
+ }
+
+ if (CleanStart)
+ {
+ Args.push_back("--clean");
+ }
+
+ if (!TraceHost.empty())
+ {
+ ExtendableStringBuilder<128> TraceArg;
+ TraceArg << "--tracehost=" << TraceHost;
+ Args.push_back(std::string(TraceArg.ToView()));
+ }
+
+ json11::Json Task = json11::Json::object{
+ {"name", "zenserver"},
+ {"type", "TASK_TYPE_MAIN"},
+ {"command", m_Config.CommandRef},
+ {"args", Args},
+ {"resources",
+ json11::Json::object{
+ {"memory", m_Config.Memory},
+ {"cpu", m_Config.Cpu},
+ }},
+ };
+
+ json11::Json Body = json11::Json::object{
+ {"namespace", m_Config.Namespace},
+ {"region", m_Config.Region},
+ {"definition",
+ json11::Json::object{
+ {"name", JobName},
+ {"os", m_Config.Os},
+ {"arch", m_Config.Arch},
+ {"tasks", json11::Json::array{Task}},
+ }},
+ };
+
+ return Body.dump();
+}
+
+bool
+OvermindClient::ScheduleJob(const std::string& JobJson, OvermindJobInfo& OutJob)
+{
+ ZEN_TRACE_CPU("OvermindClient::ScheduleJob");
+
+ const IoBuffer Payload = IoBufferBuilder::MakeFromMemory(MemoryView{JobJson.data(), JobJson.size()}, ZenContentType::kJSON);
+
+ const HttpClient::Response Response = m_Http->Post("v1/jobs", Payload, MakeOvermindHeaders(m_Config));
+
+ if (Response.Error)
+ {
+ ZEN_WARN("Overmind job schedule failed: {}", Response.Error->ErrorMessage);
+ return false;
+ }
+
+ if (!Response.IsSuccess())
+ {
+ ZEN_WARN("Overmind job schedule failed with HTTP/{}", static_cast<int>(Response.StatusCode));
+ return false;
+ }
+
+ const std::string Body(Response.AsText());
+ std::string Err;
+ const json11::Json Json = json11::Json::parse(Body, Err);
+
+ if (!Err.empty())
+ {
+ ZEN_WARN("invalid JSON response from Overmind job schedule: {}", Err);
+ return false;
+ }
+
+ const auto& Job = Json["job"];
+ OutJob.Id = Job["id"].string_value();
+ OutJob.Status = Job["status"].string_value();
+
+ if (OutJob.Id.empty())
+ {
+ ZEN_WARN("Overmind job schedule response missing job ID");
+ return false;
+ }
+
+ ZEN_INFO("Overmind job scheduled: id={}", OutJob.Id);
+
+ return true;
+}
+
+bool
+OvermindClient::GetJobStatus(const std::string& JobId, OvermindJobInfo& OutJob)
+{
+ ZEN_TRACE_CPU("OvermindClient::GetJobStatus");
+
+ ExtendableStringBuilder<128> Path;
+ Path << "v1/jobs/" << JobId;
+
+ const HttpClient::Response Response = m_Http->Get(Path.ToView(), MakeOvermindHeaders(m_Config));
+
+ if (Response.Error)
+ {
+ ZEN_WARN("Overmind job status query failed for '{}': {}", JobId, Response.Error->ErrorMessage);
+ return false;
+ }
+
+ const int StatusCode = static_cast<int>(Response.StatusCode);
+
+ if (StatusCode == 404)
+ {
+ ZEN_INFO("Overmind job '{}' not found", JobId);
+ OutJob.Status = "STATUS_ERROR";
+ return true;
+ }
+
+ if (!Response.IsSuccess())
+ {
+ ZEN_WARN("Overmind job status query failed with HTTP/{}", StatusCode);
+ return false;
+ }
+
+ const std::string Body(Response.AsText());
+ std::string Err;
+ const json11::Json Json = json11::Json::parse(Body, Err);
+
+ if (!Err.empty())
+ {
+ ZEN_WARN("invalid JSON in Overmind job status response: {}", Err);
+ return false;
+ }
+
+ const auto& Job = Json["job"];
+ OutJob.Id = Job["id"].string_value();
+ OutJob.Status = Job["status"].string_value();
+
+ return true;
+}
+
+bool
+OvermindClient::CancelJob(const std::string& JobId)
+{
+ ZEN_TRACE_CPU("OvermindClient::CancelJob");
+
+ ExtendableStringBuilder<256> Path;
+ Path << "v1/jobs/" << JobId << "?namespace=" << m_Config.Namespace;
+
+ const HttpClient::Response Response = m_Http->Delete(Path.ToView(), MakeOvermindHeaders(m_Config));
+
+ if (Response.Error)
+ {
+ ZEN_WARN("Overmind job cancel failed for '{}': {}", JobId, Response.Error->ErrorMessage);
+ return false;
+ }
+
+ if (!Response.IsSuccess())
+ {
+ ZEN_WARN("Overmind job cancel failed with HTTP/{}", static_cast<int>(Response.StatusCode));
+ return false;
+ }
+
+ ZEN_INFO("Overmind job '{}' cancelled", JobId);
+ return true;
+}
+
+} // namespace zen::overmind
diff --git a/src/zenovermind/overmindconfig.cpp b/src/zenovermind/overmindconfig.cpp
new file mode 100644
index 000000000..63e955725
--- /dev/null
+++ b/src/zenovermind/overmindconfig.cpp
@@ -0,0 +1,28 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenovermind/overmindconfig.h>
+
+namespace zen::overmind {
+
+bool
+OvermindConfig::Validate() const
+{
+ if (ServerUrl.empty())
+ {
+ return false;
+ }
+
+ if (Namespace.empty())
+ {
+ return false;
+ }
+
+ if (CommandRef.empty())
+ {
+ return false;
+ }
+
+ return true;
+}
+
+} // namespace zen::overmind
diff --git a/src/zenovermind/overmindprovisioner.cpp b/src/zenovermind/overmindprovisioner.cpp
new file mode 100644
index 000000000..9464fbaed
--- /dev/null
+++ b/src/zenovermind/overmindprovisioner.cpp
@@ -0,0 +1,262 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenovermind/overmindclient.h>
+#include <zenovermind/overmindprovisioner.h>
+
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/thread.h>
+#include <zencore/trace.h>
+
+#include <chrono>
+
+namespace zen::overmind {
+
+OvermindProvisioner::OvermindProvisioner(const OvermindConfig& Config,
+ std::string_view OrchestratorEndpoint,
+ std::string_view CoordinatorSession,
+ bool CleanStart,
+ std::string_view TraceHost)
+: m_Config(Config)
+, m_OrchestratorEndpoint(OrchestratorEndpoint)
+, m_CoordinatorSession(CoordinatorSession)
+, m_CleanStart(CleanStart)
+, m_TraceHost(TraceHost)
+, m_Log(zen::logging::Get("overmind.provisioner"))
+{
+ ZEN_DEBUG("initializing provisioner (server: {}, namespace: {}, max_cores: {}, cores_per_job: {}, max_jobs: {})",
+ m_Config.ServerUrl,
+ m_Config.Namespace,
+ m_Config.MaxCores,
+ m_Config.CoresPerJob,
+ m_Config.MaxJobs);
+
+ m_Client = std::make_unique<OvermindClient>(m_Config);
+ if (!m_Client->Initialize())
+ {
+ ZEN_ERROR("failed to initialize Overmind HTTP client");
+ return;
+ }
+
+ ZEN_DEBUG("Overmind HTTP client initialized, starting management thread");
+
+ m_Thread = std::thread([this] { ManagementThread(); });
+}
+
+OvermindProvisioner::~OvermindProvisioner()
+{
+ ZEN_DEBUG("provisioner shutting down");
+
+ m_ShouldExit.store(true);
+ m_WakeCV.notify_all();
+
+ if (m_Thread.joinable())
+ {
+ m_Thread.join();
+ }
+
+ CancelAllJobs();
+
+ ZEN_DEBUG("provisioner shutdown complete");
+}
+
+void
+OvermindProvisioner::SetTargetCoreCount(uint32_t Count)
+{
+ const uint32_t Clamped = std::min(Count, static_cast<uint32_t>(m_Config.MaxCores));
+ const uint32_t Previous = m_TargetCoreCount.exchange(Clamped);
+
+ if (Clamped != Previous)
+ {
+ ZEN_DEBUG("target core count changed: {} -> {}", Previous, Clamped);
+ }
+
+ m_WakeCV.notify_all();
+}
+
+OvermindProvisioningStats
+OvermindProvisioner::GetStats() const
+{
+ OvermindProvisioningStats Stats;
+ Stats.TargetCoreCount = m_TargetCoreCount.load();
+ Stats.EstimatedCoreCount = m_EstimatedCoreCount.load();
+ Stats.RunningJobCount = m_RunningJobCount.load();
+
+ {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ Stats.ActiveJobCount = static_cast<uint32_t>(m_Jobs.size());
+ }
+
+ return Stats;
+}
+
+void
+OvermindProvisioner::ManagementThread()
+{
+ ZEN_TRACE_CPU("Overmind_Mgmt");
+ zen::SetCurrentThreadName("overmind_mgmt");
+
+ ZEN_INFO("Overmind management thread started");
+
+ while (!m_ShouldExit.load())
+ {
+ ZEN_DEBUG("management cycle: target={} estimated={} running={} active={}",
+ m_TargetCoreCount.load(),
+ m_EstimatedCoreCount.load(),
+ m_RunningJobCount.load(),
+ [this] {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ return m_Jobs.size();
+ }());
+
+ SubmitNewJobs();
+ PollExistingJobs();
+ CleanupFinishedJobs();
+
+ // Wait up to 5 seconds or until woken
+ std::unique_lock<std::mutex> Lock(m_WakeMutex);
+ m_WakeCV.wait_for(Lock, std::chrono::seconds(5), [this] { return m_ShouldExit.load(); });
+ }
+
+ ZEN_INFO("Overmind management thread exiting");
+}
+
+void
+OvermindProvisioner::SubmitNewJobs()
+{
+ ZEN_TRACE_CPU("OvermindProvisioner::SubmitNewJobs");
+
+ const uint32_t CoresPerJob = static_cast<uint32_t>(m_Config.CoresPerJob);
+
+ while (m_EstimatedCoreCount.load() < m_TargetCoreCount.load())
+ {
+ {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ if (static_cast<int>(m_Jobs.size()) >= m_Config.MaxJobs)
+ {
+ ZEN_INFO("Overmind max jobs limit reached ({})", m_Config.MaxJobs);
+ break;
+ }
+ }
+
+ if (m_ShouldExit.load())
+ {
+ break;
+ }
+
+ const uint32_t Index = m_JobIndex.fetch_add(1);
+
+ ExtendableStringBuilder<128> NameBuilder;
+ NameBuilder << m_Config.JobName << "-" << Index;
+ const std::string JobName(NameBuilder.ToView());
+
+ ZEN_DEBUG("scheduling job '{}' (estimated: {}, target: {})", JobName, m_EstimatedCoreCount.load(), m_TargetCoreCount.load());
+
+ const std::string JobJson =
+ m_Client->BuildJobJson(JobName, m_OrchestratorEndpoint, m_CoordinatorSession, m_CleanStart, m_TraceHost);
+
+ OvermindJobInfo JobInfo;
+ if (!m_Client->ScheduleJob(JobJson, JobInfo))
+ {
+ ZEN_WARN("failed to schedule Overmind job '{}'", JobName);
+ break;
+ }
+
+ TrackedJob Tracked;
+ Tracked.Id = JobInfo.Id;
+ Tracked.Status = JobInfo.Status;
+ Tracked.Cores = static_cast<int>(CoresPerJob);
+
+ {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ m_Jobs.push_back(std::move(Tracked));
+ }
+
+ m_EstimatedCoreCount.fetch_add(CoresPerJob);
+
+ ZEN_INFO("Overmind job '{}' (id: {}) scheduled (estimated cores: {})", JobName, JobInfo.Id, m_EstimatedCoreCount.load());
+ }
+}
+
+void
+OvermindProvisioner::PollExistingJobs()
+{
+ ZEN_TRACE_CPU("OvermindProvisioner::PollExistingJobs");
+
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+
+ for (auto& Job : m_Jobs)
+ {
+ if (m_ShouldExit.load())
+ {
+ break;
+ }
+
+ OvermindJobInfo Info;
+ if (!m_Client->GetJobStatus(Job.Id, Info))
+ {
+ ZEN_DEBUG("failed to poll status for job '{}'", Job.Id);
+ continue;
+ }
+
+ const std::string PrevStatus = Job.Status;
+ Job.Status = Info.Status;
+
+ if (PrevStatus != Job.Status)
+ {
+ ZEN_INFO("Overmind job '{}' status changed: {} -> {}", Job.Id, PrevStatus, Job.Status);
+
+ if (Job.Status == "STATUS_RUNNING" && PrevStatus != "STATUS_RUNNING")
+ {
+ m_RunningJobCount.fetch_add(1);
+ }
+ else if (Job.Status != "STATUS_RUNNING" && PrevStatus == "STATUS_RUNNING")
+ {
+ m_RunningJobCount.fetch_sub(1);
+ }
+ }
+ }
+}
+
+void
+OvermindProvisioner::CleanupFinishedJobs()
+{
+ ZEN_TRACE_CPU("OvermindProvisioner::CleanupFinishedJobs");
+
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+
+ for (auto It = m_Jobs.begin(); It != m_Jobs.end();)
+ {
+ if (It->Status == "STATUS_COMPLETE" || It->Status == "STATUS_ERROR")
+ {
+ ZEN_INFO("Overmind job '{}' finished (status: {}), removing from tracked jobs", It->Id, It->Status);
+ m_EstimatedCoreCount.fetch_sub(static_cast<uint32_t>(It->Cores));
+ It = m_Jobs.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+void
+OvermindProvisioner::CancelAllJobs()
+{
+ ZEN_TRACE_CPU("OvermindProvisioner::CancelAllJobs");
+
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+
+ for (const auto& Job : m_Jobs)
+ {
+ ZEN_INFO("cancelling Overmind job '{}' during shutdown", Job.Id);
+ m_Client->CancelJob(Job.Id);
+ }
+
+ m_Jobs.clear();
+ m_EstimatedCoreCount.store(0);
+ m_RunningJobCount.store(0);
+}
+
+} // namespace zen::overmind
diff --git a/src/zenovermind/xmake.lua b/src/zenovermind/xmake.lua
new file mode 100644
index 000000000..49d667f27
--- /dev/null
+++ b/src/zenovermind/xmake.lua
@@ -0,0 +1,10 @@
+-- Copyright Epic Games, Inc. All Rights Reserved.
+
+target('zenovermind')
+ set_kind("static")
+ set_group("libs")
+ add_headerfiles("**.h")
+ add_files("**.cpp")
+ add_includedirs("include", {public=true})
+ add_deps("zencore", "zenhttp", "zenutil")
+ add_packages("json11")