aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/zenovermind/include/zenovermind/overmindclient.h69
-rw-r--r--src/zenovermind/include/zenovermind/overmindconfig.h45
-rw-r--r--src/zenovermind/include/zenovermind/overmindprovisioner.h110
-rw-r--r--src/zenovermind/include/zenovermind/zenovermind.h9
-rw-r--r--src/zenovermind/overmindclient.cpp254
-rw-r--r--src/zenovermind/overmindconfig.cpp28
-rw-r--r--src/zenovermind/overmindprovisioner.cpp262
-rw-r--r--src/zenovermind/xmake.lua10
-rw-r--r--src/zenserver/compute/computeserver.cpp172
-rw-r--r--src/zenserver/compute/computeserver.h18
-rw-r--r--src/zenserver/xmake.lua4
-rw-r--r--xmake.lua10
12 files changed, 991 insertions, 0 deletions
diff --git a/src/zenovermind/include/zenovermind/overmindclient.h b/src/zenovermind/include/zenovermind/overmindclient.h
new file mode 100644
index 000000000..68348b4a6
--- /dev/null
+++ b/src/zenovermind/include/zenovermind/overmindclient.h
@@ -0,0 +1,69 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenovermind/overmindconfig.h>
+
+#include <zencore/logbase.h>
+
+#include <memory>
+#include <string>
+
+namespace zen {
+class HttpClient;
+}
+
+namespace zen::overmind {
+
+/** Summary of an Overmind job returned by the REST API. */
+struct OvermindJobInfo
+{
+ std::string Id;
+ std::string Status; ///< "STATUS_PENDING", "STATUS_RUNNING", "STATUS_COMPLETE", "STATUS_ERROR", ...
+};
+
+/** HTTP client for the Overmind REST gateway (v1).
+ *
+ * Handles job scheduling, status polling, and job cancellation via the
+ * grpc-gateway REST endpoints on port 2580.
+ *
+ * All calls are synchronous. Thread safety: individual methods are
+ * not thread-safe; callers must synchronize access.
+ */
+class OvermindClient
+{
+public:
+ explicit OvermindClient(const OvermindConfig& Config);
+ ~OvermindClient();
+
+ OvermindClient(const OvermindClient&) = delete;
+ OvermindClient& operator=(const OvermindClient&) = delete;
+
+ /** Initialize the underlying HTTP client. Must be called before other methods. */
+ bool Initialize();
+
+ /** Build the JSON body for a ScheduleJob request. */
+ std::string BuildJobJson(const std::string& JobName,
+ const std::string& OrchestratorEndpoint,
+ const std::string& CoordinatorSession = {},
+ bool CleanStart = false,
+ const std::string& TraceHost = {}) const;
+
+ /** Schedule a job via POST /v1/jobs. On success, populates OutJob. */
+ bool ScheduleJob(const std::string& JobJson, OvermindJobInfo& OutJob);
+
+ /** Get the status of a job via GET /v1/jobs/{jobId}. */
+ bool GetJobStatus(const std::string& JobId, OvermindJobInfo& OutJob);
+
+ /** Cancel a job via DELETE /v1/jobs/{jobId}. */
+ bool CancelJob(const std::string& JobId);
+
+ LoggerRef Log() { return m_Log; }
+
+private:
+ OvermindConfig m_Config;
+ std::unique_ptr<zen::HttpClient> m_Http;
+ LoggerRef m_Log;
+};
+
+} // namespace zen::overmind
diff --git a/src/zenovermind/include/zenovermind/overmindconfig.h b/src/zenovermind/include/zenovermind/overmindconfig.h
new file mode 100644
index 000000000..a463e31ea
--- /dev/null
+++ b/src/zenovermind/include/zenovermind/overmindconfig.h
@@ -0,0 +1,45 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenovermind/zenovermind.h>
+
+#include <string>
+
+namespace zen::overmind {
+
+/** Configuration for Overmind worker provisioning.
+ *
+ * Specifies the Overmind REST gateway URL, authentication, namespace,
+ * region, and resource limits. Used by OvermindClient and OvermindProvisioner.
+ */
+struct OvermindConfig
+{
+ bool Enabled = false; ///< Whether Overmind provisioning is active
+ std::string ServerUrl; ///< Overmind REST gateway URL (e.g. "http://localhost:2580")
+ std::string AuthToken; ///< JWT bearer token for authentication
+
+ std::string Namespace; ///< Overmind namespace for job submission
+ std::string Region; ///< Target region (e.g. "REGION_US_EAST")
+
+ /** Overmind command reference for the zenserver binary in
+ * "namespace:name:version" format (e.g. "infra:zenserver:v1.0.0"). */
+ std::string CommandRef;
+
+ std::string Os = "OPERATING_SYSTEM_LINUX"; ///< Target operating system
+ std::string Arch = "CPU_ARCHITECTURE_X86_64"; ///< Target CPU architecture
+
+ std::string Memory = "4GiB"; ///< Memory per task
+ std::string Cpu = "2000m"; ///< CPU per task (millicores)
+
+ int MaxJobs = 64; ///< Maximum concurrent Overmind jobs
+ int CoresPerJob = 32; ///< Estimated cores per job (for scaling calculations)
+ int MaxCores = 2048; ///< Maximum total cores to provision
+
+ std::string JobName = "zenserver-worker"; ///< Name for generated Overmind jobs
+
+ /** Validate the configuration. Returns false if required fields are missing. */
+ bool Validate() const;
+};
+
+} // namespace zen::overmind
diff --git a/src/zenovermind/include/zenovermind/overmindprovisioner.h b/src/zenovermind/include/zenovermind/overmindprovisioner.h
new file mode 100644
index 000000000..cb0a84728
--- /dev/null
+++ b/src/zenovermind/include/zenovermind/overmindprovisioner.h
@@ -0,0 +1,110 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenovermind/overmindconfig.h>
+
+#include <zencore/logbase.h>
+
+#include <atomic>
+#include <condition_variable>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <vector>
+
+namespace zen::overmind {
+
+class OvermindClient;
+
+/** Snapshot of the current Overmind provisioning state, returned by OvermindProvisioner::GetStats(). */
+struct OvermindProvisioningStats
+{
+ uint32_t TargetCoreCount = 0; ///< Requested number of cores (clamped to MaxCores)
+ uint32_t EstimatedCoreCount = 0; ///< Cores expected from submitted jobs
+ uint32_t ActiveJobCount = 0; ///< Number of currently tracked Overmind jobs
+ uint32_t RunningJobCount = 0; ///< Number of jobs in running status
+};
+
+/** Job lifecycle manager for Overmind worker provisioning.
+ *
+ * Provisions remote compute workers by scheduling jobs via the Overmind
+ * REST gateway. Each job runs zenserver in compute mode, which
+ * announces itself back to the orchestrator.
+ *
+ * Uses a single management thread that periodically:
+ * 1. Submits new jobs when estimated cores < target cores
+ * 2. Polls existing jobs for status changes
+ * 3. Cleans up completed/failed jobs and adjusts counters
+ *
+ * Thread safety: SetTargetCoreCount and GetStats may be called from any thread.
+ */
+class OvermindProvisioner
+{
+public:
+ /** Construct a provisioner.
+ * @param Config Overmind connection and job configuration.
+ * @param OrchestratorEndpoint URL of the orchestrator that remote workers announce to. */
+ OvermindProvisioner(const OvermindConfig& Config,
+ std::string_view OrchestratorEndpoint,
+ std::string_view CoordinatorSession = {},
+ bool CleanStart = false,
+ std::string_view TraceHost = {});
+
+ /** Signals the management thread to exit and cancels all tracked jobs. */
+ ~OvermindProvisioner();
+
+ OvermindProvisioner(const OvermindProvisioner&) = delete;
+ OvermindProvisioner& operator=(const OvermindProvisioner&) = delete;
+
+ /** Set the target number of cores to provision.
+ * Clamped to OvermindConfig::MaxCores. The management thread will
+ * schedule new jobs to approach this target. */
+ void SetTargetCoreCount(uint32_t Count);
+
+ /** Return a snapshot of the current provisioning counters. */
+ OvermindProvisioningStats GetStats() const;
+
+private:
+ LoggerRef Log() { return m_Log; }
+
+ struct TrackedJob
+ {
+ std::string Id;
+ std::string Status; ///< Overmind status string
+ int Cores = 0;
+ };
+
+ void ManagementThread();
+ void SubmitNewJobs();
+ void PollExistingJobs();
+ void CleanupFinishedJobs();
+ void CancelAllJobs();
+
+ OvermindConfig m_Config;
+ std::string m_OrchestratorEndpoint;
+ std::string m_CoordinatorSession;
+ bool m_CleanStart = false;
+ std::string m_TraceHost;
+
+ std::unique_ptr<OvermindClient> m_Client;
+
+ mutable std::mutex m_JobsLock;
+ std::vector<TrackedJob> m_Jobs;
+ std::atomic<uint32_t> m_JobIndex{0};
+
+ std::atomic<uint32_t> m_TargetCoreCount{0};
+ std::atomic<uint32_t> m_EstimatedCoreCount{0};
+ std::atomic<uint32_t> m_RunningJobCount{0};
+
+ std::thread m_Thread;
+ std::mutex m_WakeMutex;
+ std::condition_variable m_WakeCV;
+ std::atomic<bool> m_ShouldExit{false};
+
+ LoggerRef m_Log;
+};
+
+} // namespace zen::overmind
diff --git a/src/zenovermind/include/zenovermind/zenovermind.h b/src/zenovermind/include/zenovermind/zenovermind.h
new file mode 100644
index 000000000..b7f451a16
--- /dev/null
+++ b/src/zenovermind/include/zenovermind/zenovermind.h
@@ -0,0 +1,9 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#if !defined(ZEN_WITH_OVERMIND)
+# define ZEN_WITH_OVERMIND 1
+#endif
diff --git a/src/zenovermind/overmindclient.cpp b/src/zenovermind/overmindclient.cpp
new file mode 100644
index 000000000..6d01437bf
--- /dev/null
+++ b/src/zenovermind/overmindclient.cpp
@@ -0,0 +1,254 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenovermind/overmindclient.h>
+
+#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
+#include <zencore/logging.h>
+#include <zencore/memoryview.h>
+#include <zencore/trace.h>
+#include <zenhttp/httpclient.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <json11.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen::overmind {
+
+namespace {
+
+ HttpClient::KeyValueMap MakeOvermindHeaders(const OvermindConfig& Config)
+ {
+ HttpClient::KeyValueMap Headers;
+ if (!Config.AuthToken.empty())
+ {
+ Headers->emplace("Authorization", "Bearer " + Config.AuthToken);
+ }
+ return Headers;
+ }
+
+} // namespace
+
+OvermindClient::OvermindClient(const OvermindConfig& Config) : m_Config(Config), m_Log(zen::logging::Get("overmind.client"))
+{
+}
+
+OvermindClient::~OvermindClient() = default;
+
+bool
+OvermindClient::Initialize()
+{
+ ZEN_TRACE_CPU("OvermindClient::Initialize");
+
+ HttpClientSettings Settings;
+ Settings.LogCategory = "overmind.http";
+ Settings.ConnectTimeout = std::chrono::milliseconds{10000};
+ Settings.Timeout = std::chrono::milliseconds{60000};
+ Settings.RetryCount = 1;
+
+ // Ensure the base URL ends with a slash so path concatenation works correctly
+ std::string BaseUrl = m_Config.ServerUrl;
+ if (!BaseUrl.empty() && BaseUrl.back() != '/')
+ {
+ BaseUrl += '/';
+ }
+
+ m_Http = std::make_unique<zen::HttpClient>(BaseUrl, Settings);
+
+ return true;
+}
+
+std::string
+OvermindClient::BuildJobJson(const std::string& JobName,
+ const std::string& OrchestratorEndpoint,
+ const std::string& CoordinatorSession,
+ bool CleanStart,
+ const std::string& TraceHost) const
+{
+ ZEN_TRACE_CPU("OvermindClient::BuildJobJson");
+
+ // Build the args array for the zenserver compute command
+ json11::Json::array Args;
+ Args.push_back("compute");
+ Args.push_back("--http=asio");
+
+ if (!OrchestratorEndpoint.empty())
+ {
+ ExtendableStringBuilder<256> CoordArg;
+ CoordArg << "--coordinator-endpoint=" << OrchestratorEndpoint;
+ Args.push_back(std::string(CoordArg.ToView()));
+ }
+
+ {
+ ExtendableStringBuilder<128> IdArg;
+ IdArg << "--instance-id=overmind-" << JobName;
+ Args.push_back(std::string(IdArg.ToView()));
+ }
+
+ if (!CoordinatorSession.empty())
+ {
+ ExtendableStringBuilder<128> SessionArg;
+ SessionArg << "--coordinator-session=" << CoordinatorSession;
+ Args.push_back(std::string(SessionArg.ToView()));
+ }
+
+ if (CleanStart)
+ {
+ Args.push_back("--clean");
+ }
+
+ if (!TraceHost.empty())
+ {
+ ExtendableStringBuilder<128> TraceArg;
+ TraceArg << "--tracehost=" << TraceHost;
+ Args.push_back(std::string(TraceArg.ToView()));
+ }
+
+ json11::Json Task = json11::Json::object{
+ {"name", "zenserver"},
+ {"type", "TASK_TYPE_MAIN"},
+ {"command", m_Config.CommandRef},
+ {"args", Args},
+ {"resources",
+ json11::Json::object{
+ {"memory", m_Config.Memory},
+ {"cpu", m_Config.Cpu},
+ }},
+ };
+
+ json11::Json Body = json11::Json::object{
+ {"namespace", m_Config.Namespace},
+ {"region", m_Config.Region},
+ {"definition",
+ json11::Json::object{
+ {"name", JobName},
+ {"os", m_Config.Os},
+ {"arch", m_Config.Arch},
+ {"tasks", json11::Json::array{Task}},
+ }},
+ };
+
+ return Body.dump();
+}
+
+bool
+OvermindClient::ScheduleJob(const std::string& JobJson, OvermindJobInfo& OutJob)
+{
+ ZEN_TRACE_CPU("OvermindClient::ScheduleJob");
+
+ const IoBuffer Payload = IoBufferBuilder::MakeFromMemory(MemoryView{JobJson.data(), JobJson.size()}, ZenContentType::kJSON);
+
+ const HttpClient::Response Response = m_Http->Post("v1/jobs", Payload, MakeOvermindHeaders(m_Config));
+
+ if (Response.Error)
+ {
+ ZEN_WARN("Overmind job schedule failed: {}", Response.Error->ErrorMessage);
+ return false;
+ }
+
+ if (!Response.IsSuccess())
+ {
+ ZEN_WARN("Overmind job schedule failed with HTTP/{}", static_cast<int>(Response.StatusCode));
+ return false;
+ }
+
+ const std::string Body(Response.AsText());
+ std::string Err;
+ const json11::Json Json = json11::Json::parse(Body, Err);
+
+ if (!Err.empty())
+ {
+ ZEN_WARN("invalid JSON response from Overmind job schedule: {}", Err);
+ return false;
+ }
+
+ const auto& Job = Json["job"];
+ OutJob.Id = Job["id"].string_value();
+ OutJob.Status = Job["status"].string_value();
+
+ if (OutJob.Id.empty())
+ {
+ ZEN_WARN("Overmind job schedule response missing job ID");
+ return false;
+ }
+
+ ZEN_INFO("Overmind job scheduled: id={}", OutJob.Id);
+
+ return true;
+}
+
+bool
+OvermindClient::GetJobStatus(const std::string& JobId, OvermindJobInfo& OutJob)
+{
+ ZEN_TRACE_CPU("OvermindClient::GetJobStatus");
+
+ ExtendableStringBuilder<128> Path;
+ Path << "v1/jobs/" << JobId;
+
+ const HttpClient::Response Response = m_Http->Get(Path.ToView(), MakeOvermindHeaders(m_Config));
+
+ if (Response.Error)
+ {
+ ZEN_WARN("Overmind job status query failed for '{}': {}", JobId, Response.Error->ErrorMessage);
+ return false;
+ }
+
+ const int StatusCode = static_cast<int>(Response.StatusCode);
+
+ if (StatusCode == 404)
+ {
+ ZEN_INFO("Overmind job '{}' not found", JobId);
+ OutJob.Status = "STATUS_ERROR";
+ return true;
+ }
+
+ if (!Response.IsSuccess())
+ {
+ ZEN_WARN("Overmind job status query failed with HTTP/{}", StatusCode);
+ return false;
+ }
+
+ const std::string Body(Response.AsText());
+ std::string Err;
+ const json11::Json Json = json11::Json::parse(Body, Err);
+
+ if (!Err.empty())
+ {
+ ZEN_WARN("invalid JSON in Overmind job status response: {}", Err);
+ return false;
+ }
+
+ const auto& Job = Json["job"];
+ OutJob.Id = Job["id"].string_value();
+ OutJob.Status = Job["status"].string_value();
+
+ return true;
+}
+
+bool
+OvermindClient::CancelJob(const std::string& JobId)
+{
+ ZEN_TRACE_CPU("OvermindClient::CancelJob");
+
+ ExtendableStringBuilder<256> Path;
+ Path << "v1/jobs/" << JobId << "?namespace=" << m_Config.Namespace;
+
+ const HttpClient::Response Response = m_Http->Delete(Path.ToView(), MakeOvermindHeaders(m_Config));
+
+ if (Response.Error)
+ {
+ ZEN_WARN("Overmind job cancel failed for '{}': {}", JobId, Response.Error->ErrorMessage);
+ return false;
+ }
+
+ if (!Response.IsSuccess())
+ {
+ ZEN_WARN("Overmind job cancel failed with HTTP/{}", static_cast<int>(Response.StatusCode));
+ return false;
+ }
+
+ ZEN_INFO("Overmind job '{}' cancelled", JobId);
+ return true;
+}
+
+} // namespace zen::overmind
diff --git a/src/zenovermind/overmindconfig.cpp b/src/zenovermind/overmindconfig.cpp
new file mode 100644
index 000000000..63e955725
--- /dev/null
+++ b/src/zenovermind/overmindconfig.cpp
@@ -0,0 +1,28 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenovermind/overmindconfig.h>
+
+namespace zen::overmind {
+
+bool
+OvermindConfig::Validate() const
+{
+ if (ServerUrl.empty())
+ {
+ return false;
+ }
+
+ if (Namespace.empty())
+ {
+ return false;
+ }
+
+ if (CommandRef.empty())
+ {
+ return false;
+ }
+
+ return true;
+}
+
+} // namespace zen::overmind
diff --git a/src/zenovermind/overmindprovisioner.cpp b/src/zenovermind/overmindprovisioner.cpp
new file mode 100644
index 000000000..9464fbaed
--- /dev/null
+++ b/src/zenovermind/overmindprovisioner.cpp
@@ -0,0 +1,262 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenovermind/overmindclient.h>
+#include <zenovermind/overmindprovisioner.h>
+
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/thread.h>
+#include <zencore/trace.h>
+
+#include <chrono>
+
+namespace zen::overmind {
+
+OvermindProvisioner::OvermindProvisioner(const OvermindConfig& Config,
+ std::string_view OrchestratorEndpoint,
+ std::string_view CoordinatorSession,
+ bool CleanStart,
+ std::string_view TraceHost)
+: m_Config(Config)
+, m_OrchestratorEndpoint(OrchestratorEndpoint)
+, m_CoordinatorSession(CoordinatorSession)
+, m_CleanStart(CleanStart)
+, m_TraceHost(TraceHost)
+, m_Log(zen::logging::Get("overmind.provisioner"))
+{
+ ZEN_DEBUG("initializing provisioner (server: {}, namespace: {}, max_cores: {}, cores_per_job: {}, max_jobs: {})",
+ m_Config.ServerUrl,
+ m_Config.Namespace,
+ m_Config.MaxCores,
+ m_Config.CoresPerJob,
+ m_Config.MaxJobs);
+
+ m_Client = std::make_unique<OvermindClient>(m_Config);
+ if (!m_Client->Initialize())
+ {
+ ZEN_ERROR("failed to initialize Overmind HTTP client");
+ return;
+ }
+
+ ZEN_DEBUG("Overmind HTTP client initialized, starting management thread");
+
+ m_Thread = std::thread([this] { ManagementThread(); });
+}
+
+OvermindProvisioner::~OvermindProvisioner()
+{
+ ZEN_DEBUG("provisioner shutting down");
+
+ m_ShouldExit.store(true);
+ m_WakeCV.notify_all();
+
+ if (m_Thread.joinable())
+ {
+ m_Thread.join();
+ }
+
+ CancelAllJobs();
+
+ ZEN_DEBUG("provisioner shutdown complete");
+}
+
+void
+OvermindProvisioner::SetTargetCoreCount(uint32_t Count)
+{
+ const uint32_t Clamped = std::min(Count, static_cast<uint32_t>(m_Config.MaxCores));
+ const uint32_t Previous = m_TargetCoreCount.exchange(Clamped);
+
+ if (Clamped != Previous)
+ {
+ ZEN_DEBUG("target core count changed: {} -> {}", Previous, Clamped);
+ }
+
+ m_WakeCV.notify_all();
+}
+
+OvermindProvisioningStats
+OvermindProvisioner::GetStats() const
+{
+ OvermindProvisioningStats Stats;
+ Stats.TargetCoreCount = m_TargetCoreCount.load();
+ Stats.EstimatedCoreCount = m_EstimatedCoreCount.load();
+ Stats.RunningJobCount = m_RunningJobCount.load();
+
+ {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ Stats.ActiveJobCount = static_cast<uint32_t>(m_Jobs.size());
+ }
+
+ return Stats;
+}
+
+void
+OvermindProvisioner::ManagementThread()
+{
+ ZEN_TRACE_CPU("Overmind_Mgmt");
+ zen::SetCurrentThreadName("overmind_mgmt");
+
+ ZEN_INFO("Overmind management thread started");
+
+ while (!m_ShouldExit.load())
+ {
+ ZEN_DEBUG("management cycle: target={} estimated={} running={} active={}",
+ m_TargetCoreCount.load(),
+ m_EstimatedCoreCount.load(),
+ m_RunningJobCount.load(),
+ [this] {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ return m_Jobs.size();
+ }());
+
+ SubmitNewJobs();
+ PollExistingJobs();
+ CleanupFinishedJobs();
+
+ // Wait up to 5 seconds or until woken
+ std::unique_lock<std::mutex> Lock(m_WakeMutex);
+ m_WakeCV.wait_for(Lock, std::chrono::seconds(5), [this] { return m_ShouldExit.load(); });
+ }
+
+ ZEN_INFO("Overmind management thread exiting");
+}
+
+void
+OvermindProvisioner::SubmitNewJobs()
+{
+ ZEN_TRACE_CPU("OvermindProvisioner::SubmitNewJobs");
+
+ const uint32_t CoresPerJob = static_cast<uint32_t>(m_Config.CoresPerJob);
+
+ while (m_EstimatedCoreCount.load() < m_TargetCoreCount.load())
+ {
+ {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ if (static_cast<int>(m_Jobs.size()) >= m_Config.MaxJobs)
+ {
+ ZEN_INFO("Overmind max jobs limit reached ({})", m_Config.MaxJobs);
+ break;
+ }
+ }
+
+ if (m_ShouldExit.load())
+ {
+ break;
+ }
+
+ const uint32_t Index = m_JobIndex.fetch_add(1);
+
+ ExtendableStringBuilder<128> NameBuilder;
+ NameBuilder << m_Config.JobName << "-" << Index;
+ const std::string JobName(NameBuilder.ToView());
+
+ ZEN_DEBUG("scheduling job '{}' (estimated: {}, target: {})", JobName, m_EstimatedCoreCount.load(), m_TargetCoreCount.load());
+
+ const std::string JobJson =
+ m_Client->BuildJobJson(JobName, m_OrchestratorEndpoint, m_CoordinatorSession, m_CleanStart, m_TraceHost);
+
+ OvermindJobInfo JobInfo;
+ if (!m_Client->ScheduleJob(JobJson, JobInfo))
+ {
+ ZEN_WARN("failed to schedule Overmind job '{}'", JobName);
+ break;
+ }
+
+ TrackedJob Tracked;
+ Tracked.Id = JobInfo.Id;
+ Tracked.Status = JobInfo.Status;
+ Tracked.Cores = static_cast<int>(CoresPerJob);
+
+ {
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+ m_Jobs.push_back(std::move(Tracked));
+ }
+
+ m_EstimatedCoreCount.fetch_add(CoresPerJob);
+
+ ZEN_INFO("Overmind job '{}' (id: {}) scheduled (estimated cores: {})", JobName, JobInfo.Id, m_EstimatedCoreCount.load());
+ }
+}
+
+void
+OvermindProvisioner::PollExistingJobs()
+{
+ ZEN_TRACE_CPU("OvermindProvisioner::PollExistingJobs");
+
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+
+ for (auto& Job : m_Jobs)
+ {
+ if (m_ShouldExit.load())
+ {
+ break;
+ }
+
+ OvermindJobInfo Info;
+ if (!m_Client->GetJobStatus(Job.Id, Info))
+ {
+ ZEN_DEBUG("failed to poll status for job '{}'", Job.Id);
+ continue;
+ }
+
+ const std::string PrevStatus = Job.Status;
+ Job.Status = Info.Status;
+
+ if (PrevStatus != Job.Status)
+ {
+ ZEN_INFO("Overmind job '{}' status changed: {} -> {}", Job.Id, PrevStatus, Job.Status);
+
+ if (Job.Status == "STATUS_RUNNING" && PrevStatus != "STATUS_RUNNING")
+ {
+ m_RunningJobCount.fetch_add(1);
+ }
+ else if (Job.Status != "STATUS_RUNNING" && PrevStatus == "STATUS_RUNNING")
+ {
+ m_RunningJobCount.fetch_sub(1);
+ }
+ }
+ }
+}
+
+void
+OvermindProvisioner::CleanupFinishedJobs()
+{
+ ZEN_TRACE_CPU("OvermindProvisioner::CleanupFinishedJobs");
+
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+
+ for (auto It = m_Jobs.begin(); It != m_Jobs.end();)
+ {
+ if (It->Status == "STATUS_COMPLETE" || It->Status == "STATUS_ERROR")
+ {
+ ZEN_INFO("Overmind job '{}' finished (status: {}), removing from tracked jobs", It->Id, It->Status);
+ m_EstimatedCoreCount.fetch_sub(static_cast<uint32_t>(It->Cores));
+ It = m_Jobs.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+void
+OvermindProvisioner::CancelAllJobs()
+{
+ ZEN_TRACE_CPU("OvermindProvisioner::CancelAllJobs");
+
+ std::lock_guard<std::mutex> Lock(m_JobsLock);
+
+ for (const auto& Job : m_Jobs)
+ {
+ ZEN_INFO("cancelling Overmind job '{}' during shutdown", Job.Id);
+ m_Client->CancelJob(Job.Id);
+ }
+
+ m_Jobs.clear();
+ m_EstimatedCoreCount.store(0);
+ m_RunningJobCount.store(0);
+}
+
+} // namespace zen::overmind
diff --git a/src/zenovermind/xmake.lua b/src/zenovermind/xmake.lua
new file mode 100644
index 000000000..49d667f27
--- /dev/null
+++ b/src/zenovermind/xmake.lua
@@ -0,0 +1,10 @@
+-- Copyright Epic Games, Inc. All Rights Reserved.
+
+target('zenovermind')
+ set_kind("static")
+ set_group("libs")
+ add_headerfiles("**.h")
+ add_files("**.cpp")
+ add_includedirs("include", {public=true})
+ add_deps("zencore", "zenhttp", "zenutil")
+ add_packages("json11")
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp
index f35fe0f97..89886beee 100644
--- a/src/zenserver/compute/computeserver.cpp
+++ b/src/zenserver/compute/computeserver.cpp
@@ -29,6 +29,10 @@
# include <zennomad/nomadconfig.h>
# include <zennomad/nomadprovisioner.h>
# endif
+# if ZEN_WITH_OVERMIND
+# include <zenovermind/overmindconfig.h>
+# include <zenovermind/overmindprovisioner.h>
+# endif
ZEN_THIRD_PARTY_INCLUDES_START
# include <cxxopts.hpp>
@@ -331,6 +335,107 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
cxxopts::value<std::string>(m_ServerOptions.NomadConfig.JobPrefix)->default_value("zenserver-worker"),
"");
# endif
+
+# if ZEN_WITH_OVERMIND
+ // Overmind provisioning options
+ Options.add_option("overmind",
+ "",
+ "overmind-enabled",
+ "Enable Overmind worker provisioning",
+ cxxopts::value<bool>(m_ServerOptions.OvermindConfig.Enabled)->default_value("false"),
+ "");
+
+ Options.add_option("overmind",
+ "",
+ "overmind-server",
+ "Overmind REST gateway URL",
+ cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.ServerUrl)->default_value(""),
+ "");
+
+ Options.add_option("overmind",
+ "",
+ "overmind-token",
+ "Overmind JWT bearer token",
+ cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.AuthToken)->default_value(""),
+ "");
+
+ Options.add_option("overmind",
+ "",
+ "overmind-namespace",
+ "Overmind namespace for job submission",
+ cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.Namespace)->default_value(""),
+ "");
+
+ Options.add_option("overmind",
+ "",
+ "overmind-region",
+ "Overmind target region (e.g. REGION_US_EAST)",
+ cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.Region)->default_value(""),
+ "");
+
+ Options.add_option("overmind",
+ "",
+ "overmind-command",
+ "Overmind command reference (namespace:name:version)",
+ cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.CommandRef)->default_value(""),
+ "");
+
+ Options.add_option("overmind",
+ "",
+ "overmind-os",
+ "Target operating system for Overmind jobs",
+ cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.Os)->default_value("OPERATING_SYSTEM_LINUX"),
+ "");
+
+ Options.add_option("overmind",
+ "",
+ "overmind-arch",
+ "Target CPU architecture for Overmind jobs",
+ cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.Arch)->default_value("CPU_ARCHITECTURE_X86_64"),
+ "");
+
+ Options.add_option("overmind",
+ "",
+ "overmind-memory",
+ "Memory per Overmind task (e.g. 4GiB)",
+ cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.Memory)->default_value("4GiB"),
+ "");
+
+ Options.add_option("overmind",
+ "",
+ "overmind-cpu",
+ "CPU per Overmind task in millicores (e.g. 2000m)",
+ cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.Cpu)->default_value("2000m"),
+ "");
+
+ Options.add_option("overmind",
+ "",
+ "overmind-max-jobs",
+ "Maximum concurrent Overmind jobs",
+ cxxopts::value<int>(m_ServerOptions.OvermindConfig.MaxJobs)->default_value("64"),
+ "");
+
+ Options.add_option("overmind",
+ "",
+ "overmind-cores-per-job",
+ "Estimated cores per Overmind job (for scaling)",
+ cxxopts::value<int>(m_ServerOptions.OvermindConfig.CoresPerJob)->default_value("32"),
+ "");
+
+ Options.add_option("overmind",
+ "",
+ "overmind-max-cores",
+ "Maximum total cores to provision via Overmind",
+ cxxopts::value<int>(m_ServerOptions.OvermindConfig.MaxCores)->default_value("2048"),
+ "");
+
+ Options.add_option("overmind",
+ "",
+ "overmind-job-name",
+ "Name for generated Overmind jobs",
+ cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.JobName)->default_value("zenserver-worker"),
+ "");
+# endif
}
void
@@ -467,6 +572,12 @@ ZenComputeServer::Cleanup()
m_NomadProvisioner.reset();
# endif
+# if ZEN_WITH_OVERMIND
+ // Shut down Overmind provisioner - stops the management thread and
+ // cancels all tracked jobs.
+ m_OvermindProvisioner.reset();
+# endif
+
// Close the orchestrator WebSocket client before stopping the io_context
m_WsReconnectTimer.cancel();
if (m_OrchestratorWsClient)
@@ -630,6 +741,36 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
}
}
# endif
+
+# if ZEN_WITH_OVERMIND
+ // Overmind provisioner
+ if (ServerConfig.OvermindConfig.Enabled && !ServerConfig.OvermindConfig.ServerUrl.empty())
+ {
+ ZEN_INFO("instantiating Overmind provisioner (server: {})", ServerConfig.OvermindConfig.ServerUrl);
+
+ const auto& OvermindCfg = ServerConfig.OvermindConfig;
+
+ if (!OvermindCfg.Validate())
+ {
+ ZEN_ERROR("invalid Overmind configuration");
+ }
+ else
+ {
+ ExtendableStringBuilder<256> OrchestratorEndpoint;
+ OrchestratorEndpoint << m_Http->GetServiceUri(m_OrchestratorService.get());
+ if (auto View = OrchestratorEndpoint.ToView(); !View.empty() && View.back() != '/')
+ {
+ OrchestratorEndpoint << '/';
+ }
+
+ m_OvermindProvisioner = std::make_unique<overmind::OvermindProvisioner>(OvermindCfg,
+ OrchestratorEndpoint,
+ m_OrchestratorService->GetSessionId().ToString(),
+ ServerConfig.ProvisionClean,
+ ServerConfig.ProvisionTraceHost);
+ }
+ }
+# endif
}
void
@@ -727,6 +868,11 @@ ZenComputeServer::BuildAnnounceBody()
AnnounceBody << "provisioner"
<< "nomad";
}
+ else if (m_InstanceId.starts_with("overmind-"))
+ {
+ AnnounceBody << "provisioner"
+ << "overmind";
+ }
if (!m_CoordinatorSession.empty())
{
@@ -901,6 +1047,18 @@ ZenComputeServer::ProvisionerMaintenanceTick()
Stats.RunningJobCount);
}
# endif
+
+# if ZEN_WITH_OVERMIND
+ if (m_OvermindProvisioner)
+ {
+ m_OvermindProvisioner->SetTargetCoreCount(UINT32_MAX);
+ auto Stats = m_OvermindProvisioner->GetStats();
+ ZEN_DEBUG("Overmind maintenance: target={}, estimated={}, running jobs={}",
+ Stats.TargetCoreCount,
+ Stats.EstimatedCoreCount,
+ Stats.RunningJobCount);
+ }
+# endif
}
void
@@ -913,6 +1071,9 @@ ZenComputeServer::EnqueueProvisionerMaintenanceTimer()
# if ZEN_WITH_NOMAD
HasProvisioner = HasProvisioner || (m_NomadProvisioner != nullptr);
# endif
+# if ZEN_WITH_OVERMIND
+ HasProvisioner = HasProvisioner || (m_OvermindProvisioner != nullptr);
+# endif
if (!HasProvisioner)
{
@@ -1011,6 +1172,17 @@ ZenComputeServer::Run()
}
# endif
+# if ZEN_WITH_OVERMIND
+ // Start Overmind provisioning if configured - request maximum allowed cores.
+ // SetTargetCoreCount clamps to OvermindConfig::MaxCores internally.
+ if (m_OvermindProvisioner)
+ {
+ m_OvermindProvisioner->SetTargetCoreCount(UINT32_MAX);
+ auto Stats = m_OvermindProvisioner->GetStats();
+ ZEN_INFO("Overmind provisioning started (target cores: {})", Stats.TargetCoreCount);
+ }
+# endif
+
EnqueueProvisionerMaintenanceTimer();
m_Http->Run(IsInteractiveMode);
diff --git a/src/zenserver/compute/computeserver.h b/src/zenserver/compute/computeserver.h
index aa9c1a5b3..38705d2e4 100644
--- a/src/zenserver/compute/computeserver.h
+++ b/src/zenserver/compute/computeserver.h
@@ -40,6 +40,13 @@ class NomadProvisioner;
} // namespace zen::nomad
# endif
+# if ZEN_WITH_OVERMIND
+# include <zenovermind/overmindconfig.h>
+namespace zen::overmind {
+class OvermindProvisioner;
+} // namespace zen::overmind
+# endif
+
namespace zen {
class HttpApiService;
@@ -64,6 +71,10 @@ struct ZenComputeServerConfig : public ZenServerConfig
# if ZEN_WITH_NOMAD
nomad::NomadConfig NomadConfig;
# endif
+
+# if ZEN_WITH_OVERMIND
+ overmind::OvermindConfig OvermindConfig;
+# endif
};
struct ZenComputeServerConfigurator : public ZenServerConfiguratorBase
@@ -95,6 +106,10 @@ private:
std::string m_NomadDriverStr = "raw_exec";
std::string m_NomadDistributionStr = "predeployed";
# endif
+
+# if ZEN_WITH_OVERMIND
+ // No string-to-enum options needed for Overmind yet
+# endif
};
class ZenComputeServerMain : public ZenServerMain
@@ -150,6 +165,9 @@ private:
# if ZEN_WITH_NOMAD
std::unique_ptr<zen::nomad::NomadProvisioner> m_NomadProvisioner;
# endif
+# if ZEN_WITH_OVERMIND
+ std::unique_ptr<zen::overmind::OvermindProvisioner> m_OvermindProvisioner;
+# endif
SystemMetricsTracker m_MetricsTracker;
std::string m_CoordinatorEndpoint;
std::string m_CoordinatorSession;
diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua
index b609d1050..e8fb3d3e7 100644
--- a/src/zenserver/xmake.lua
+++ b/src/zenserver/xmake.lua
@@ -57,6 +57,10 @@ target("zenserver")
add_deps("zennomad")
end
+ if has_config("zenovermind") then
+ add_deps("zenovermind")
+ end
+
if is_mode("release") then
set_optimize("fastest")
end
diff --git a/xmake.lua b/xmake.lua
index d2e061852..ab940aeb6 100644
--- a/xmake.lua
+++ b/xmake.lua
@@ -442,6 +442,13 @@ option("zennomad")
option_end()
add_define_by_config("ZEN_WITH_NOMAD", "zennomad")
+option("zenovermind")
+ set_default(compute_default)
+ set_showmenu(true)
+ set_description("Enable Overmind worker provisioning")
+option_end()
+add_define_by_config("ZEN_WITH_OVERMIND", "zenovermind")
+
if is_os("windows") then
add_defines("UE_MEMORY_TRACE_AVAILABLE=1")
@@ -501,6 +508,9 @@ end
if has_config("zennomad") then
includes("src/zennomad")
end
+if has_config("zenovermind") then
+ includes("src/zenovermind")
+end
includes("src/zenstore", "src/zenstore-test")
includes("src/zentelemetry", "src/zentelemetry-test")
includes("src/zenutil", "src/zenutil-test")