diff options
| -rw-r--r-- | src/zenovermind/include/zenovermind/overmindclient.h | 69 | ||||
| -rw-r--r-- | src/zenovermind/include/zenovermind/overmindconfig.h | 45 | ||||
| -rw-r--r-- | src/zenovermind/include/zenovermind/overmindprovisioner.h | 110 | ||||
| -rw-r--r-- | src/zenovermind/include/zenovermind/zenovermind.h | 9 | ||||
| -rw-r--r-- | src/zenovermind/overmindclient.cpp | 254 | ||||
| -rw-r--r-- | src/zenovermind/overmindconfig.cpp | 28 | ||||
| -rw-r--r-- | src/zenovermind/overmindprovisioner.cpp | 262 | ||||
| -rw-r--r-- | src/zenovermind/xmake.lua | 10 | ||||
| -rw-r--r-- | src/zenserver/compute/computeserver.cpp | 172 | ||||
| -rw-r--r-- | src/zenserver/compute/computeserver.h | 18 | ||||
| -rw-r--r-- | src/zenserver/xmake.lua | 4 | ||||
| -rw-r--r-- | xmake.lua | 10 |
12 files changed, 991 insertions, 0 deletions
diff --git a/src/zenovermind/include/zenovermind/overmindclient.h b/src/zenovermind/include/zenovermind/overmindclient.h new file mode 100644 index 000000000..68348b4a6 --- /dev/null +++ b/src/zenovermind/include/zenovermind/overmindclient.h @@ -0,0 +1,69 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenovermind/overmindconfig.h> + +#include <zencore/logbase.h> + +#include <memory> +#include <string> + +namespace zen { +class HttpClient; +} + +namespace zen::overmind { + +/** Summary of an Overmind job returned by the REST API. */ +struct OvermindJobInfo +{ + std::string Id; + std::string Status; ///< "STATUS_PENDING", "STATUS_RUNNING", "STATUS_COMPLETE", "STATUS_ERROR", ... +}; + +/** HTTP client for the Overmind REST gateway (v1). + * + * Handles job scheduling, status polling, and job cancellation via the + * grpc-gateway REST endpoints on port 2580. + * + * All calls are synchronous. Thread safety: individual methods are + * not thread-safe; callers must synchronize access. + */ +class OvermindClient +{ +public: + explicit OvermindClient(const OvermindConfig& Config); + ~OvermindClient(); + + OvermindClient(const OvermindClient&) = delete; + OvermindClient& operator=(const OvermindClient&) = delete; + + /** Initialize the underlying HTTP client. Must be called before other methods. */ + bool Initialize(); + + /** Build the JSON body for a ScheduleJob request. */ + std::string BuildJobJson(const std::string& JobName, + const std::string& OrchestratorEndpoint, + const std::string& CoordinatorSession = {}, + bool CleanStart = false, + const std::string& TraceHost = {}) const; + + /** Schedule a job via POST /v1/jobs. On success, populates OutJob. */ + bool ScheduleJob(const std::string& JobJson, OvermindJobInfo& OutJob); + + /** Get the status of a job via GET /v1/jobs/{jobId}. */ + bool GetJobStatus(const std::string& JobId, OvermindJobInfo& OutJob); + + /** Cancel a job via DELETE /v1/jobs/{jobId}. */ + bool CancelJob(const std::string& JobId); + + LoggerRef Log() { return m_Log; } + +private: + OvermindConfig m_Config; + std::unique_ptr<zen::HttpClient> m_Http; + LoggerRef m_Log; +}; + +} // namespace zen::overmind diff --git a/src/zenovermind/include/zenovermind/overmindconfig.h b/src/zenovermind/include/zenovermind/overmindconfig.h new file mode 100644 index 000000000..a463e31ea --- /dev/null +++ b/src/zenovermind/include/zenovermind/overmindconfig.h @@ -0,0 +1,45 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenovermind/zenovermind.h> + +#include <string> + +namespace zen::overmind { + +/** Configuration for Overmind worker provisioning. + * + * Specifies the Overmind REST gateway URL, authentication, namespace, + * region, and resource limits. Used by OvermindClient and OvermindProvisioner. + */ +struct OvermindConfig +{ + bool Enabled = false; ///< Whether Overmind provisioning is active + std::string ServerUrl; ///< Overmind REST gateway URL (e.g. "http://localhost:2580") + std::string AuthToken; ///< JWT bearer token for authentication + + std::string Namespace; ///< Overmind namespace for job submission + std::string Region; ///< Target region (e.g. "REGION_US_EAST") + + /** Overmind command reference for the zenserver binary in + * "namespace:name:version" format (e.g. "infra:zenserver:v1.0.0"). */ + std::string CommandRef; + + std::string Os = "OPERATING_SYSTEM_LINUX"; ///< Target operating system + std::string Arch = "CPU_ARCHITECTURE_X86_64"; ///< Target CPU architecture + + std::string Memory = "4GiB"; ///< Memory per task + std::string Cpu = "2000m"; ///< CPU per task (millicores) + + int MaxJobs = 64; ///< Maximum concurrent Overmind jobs + int CoresPerJob = 32; ///< Estimated cores per job (for scaling calculations) + int MaxCores = 2048; ///< Maximum total cores to provision + + std::string JobName = "zenserver-worker"; ///< Name for generated Overmind jobs + + /** Validate the configuration. Returns false if required fields are missing. */ + bool Validate() const; +}; + +} // namespace zen::overmind diff --git a/src/zenovermind/include/zenovermind/overmindprovisioner.h b/src/zenovermind/include/zenovermind/overmindprovisioner.h new file mode 100644 index 000000000..cb0a84728 --- /dev/null +++ b/src/zenovermind/include/zenovermind/overmindprovisioner.h @@ -0,0 +1,110 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenovermind/overmindconfig.h> + +#include <zencore/logbase.h> + +#include <atomic> +#include <condition_variable> +#include <cstdint> +#include <memory> +#include <mutex> +#include <string> +#include <thread> +#include <vector> + +namespace zen::overmind { + +class OvermindClient; + +/** Snapshot of the current Overmind provisioning state, returned by OvermindProvisioner::GetStats(). */ +struct OvermindProvisioningStats +{ + uint32_t TargetCoreCount = 0; ///< Requested number of cores (clamped to MaxCores) + uint32_t EstimatedCoreCount = 0; ///< Cores expected from submitted jobs + uint32_t ActiveJobCount = 0; ///< Number of currently tracked Overmind jobs + uint32_t RunningJobCount = 0; ///< Number of jobs in running status +}; + +/** Job lifecycle manager for Overmind worker provisioning. + * + * Provisions remote compute workers by scheduling jobs via the Overmind + * REST gateway. Each job runs zenserver in compute mode, which + * announces itself back to the orchestrator. + * + * Uses a single management thread that periodically: + * 1. Submits new jobs when estimated cores < target cores + * 2. Polls existing jobs for status changes + * 3. Cleans up completed/failed jobs and adjusts counters + * + * Thread safety: SetTargetCoreCount and GetStats may be called from any thread. + */ +class OvermindProvisioner +{ +public: + /** Construct a provisioner. + * @param Config Overmind connection and job configuration. + * @param OrchestratorEndpoint URL of the orchestrator that remote workers announce to. */ + OvermindProvisioner(const OvermindConfig& Config, + std::string_view OrchestratorEndpoint, + std::string_view CoordinatorSession = {}, + bool CleanStart = false, + std::string_view TraceHost = {}); + + /** Signals the management thread to exit and cancels all tracked jobs. */ + ~OvermindProvisioner(); + + OvermindProvisioner(const OvermindProvisioner&) = delete; + OvermindProvisioner& operator=(const OvermindProvisioner&) = delete; + + /** Set the target number of cores to provision. + * Clamped to OvermindConfig::MaxCores. The management thread will + * schedule new jobs to approach this target. */ + void SetTargetCoreCount(uint32_t Count); + + /** Return a snapshot of the current provisioning counters. */ + OvermindProvisioningStats GetStats() const; + +private: + LoggerRef Log() { return m_Log; } + + struct TrackedJob + { + std::string Id; + std::string Status; ///< Overmind status string + int Cores = 0; + }; + + void ManagementThread(); + void SubmitNewJobs(); + void PollExistingJobs(); + void CleanupFinishedJobs(); + void CancelAllJobs(); + + OvermindConfig m_Config; + std::string m_OrchestratorEndpoint; + std::string m_CoordinatorSession; + bool m_CleanStart = false; + std::string m_TraceHost; + + std::unique_ptr<OvermindClient> m_Client; + + mutable std::mutex m_JobsLock; + std::vector<TrackedJob> m_Jobs; + std::atomic<uint32_t> m_JobIndex{0}; + + std::atomic<uint32_t> m_TargetCoreCount{0}; + std::atomic<uint32_t> m_EstimatedCoreCount{0}; + std::atomic<uint32_t> m_RunningJobCount{0}; + + std::thread m_Thread; + std::mutex m_WakeMutex; + std::condition_variable m_WakeCV; + std::atomic<bool> m_ShouldExit{false}; + + LoggerRef m_Log; +}; + +} // namespace zen::overmind diff --git a/src/zenovermind/include/zenovermind/zenovermind.h b/src/zenovermind/include/zenovermind/zenovermind.h new file mode 100644 index 000000000..b7f451a16 --- /dev/null +++ b/src/zenovermind/include/zenovermind/zenovermind.h @@ -0,0 +1,9 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#if !defined(ZEN_WITH_OVERMIND) +# define ZEN_WITH_OVERMIND 1 +#endif diff --git a/src/zenovermind/overmindclient.cpp b/src/zenovermind/overmindclient.cpp new file mode 100644 index 000000000..6d01437bf --- /dev/null +++ b/src/zenovermind/overmindclient.cpp @@ -0,0 +1,254 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenovermind/overmindclient.h> + +#include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> +#include <zencore/logging.h> +#include <zencore/memoryview.h> +#include <zencore/trace.h> +#include <zenhttp/httpclient.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <json11.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen::overmind { + +namespace { + + HttpClient::KeyValueMap MakeOvermindHeaders(const OvermindConfig& Config) + { + HttpClient::KeyValueMap Headers; + if (!Config.AuthToken.empty()) + { + Headers->emplace("Authorization", "Bearer " + Config.AuthToken); + } + return Headers; + } + +} // namespace + +OvermindClient::OvermindClient(const OvermindConfig& Config) : m_Config(Config), m_Log(zen::logging::Get("overmind.client")) +{ +} + +OvermindClient::~OvermindClient() = default; + +bool +OvermindClient::Initialize() +{ + ZEN_TRACE_CPU("OvermindClient::Initialize"); + + HttpClientSettings Settings; + Settings.LogCategory = "overmind.http"; + Settings.ConnectTimeout = std::chrono::milliseconds{10000}; + Settings.Timeout = std::chrono::milliseconds{60000}; + Settings.RetryCount = 1; + + // Ensure the base URL ends with a slash so path concatenation works correctly + std::string BaseUrl = m_Config.ServerUrl; + if (!BaseUrl.empty() && BaseUrl.back() != '/') + { + BaseUrl += '/'; + } + + m_Http = std::make_unique<zen::HttpClient>(BaseUrl, Settings); + + return true; +} + +std::string +OvermindClient::BuildJobJson(const std::string& JobName, + const std::string& OrchestratorEndpoint, + const std::string& CoordinatorSession, + bool CleanStart, + const std::string& TraceHost) const +{ + ZEN_TRACE_CPU("OvermindClient::BuildJobJson"); + + // Build the args array for the zenserver compute command + json11::Json::array Args; + Args.push_back("compute"); + Args.push_back("--http=asio"); + + if (!OrchestratorEndpoint.empty()) + { + ExtendableStringBuilder<256> CoordArg; + CoordArg << "--coordinator-endpoint=" << OrchestratorEndpoint; + Args.push_back(std::string(CoordArg.ToView())); + } + + { + ExtendableStringBuilder<128> IdArg; + IdArg << "--instance-id=overmind-" << JobName; + Args.push_back(std::string(IdArg.ToView())); + } + + if (!CoordinatorSession.empty()) + { + ExtendableStringBuilder<128> SessionArg; + SessionArg << "--coordinator-session=" << CoordinatorSession; + Args.push_back(std::string(SessionArg.ToView())); + } + + if (CleanStart) + { + Args.push_back("--clean"); + } + + if (!TraceHost.empty()) + { + ExtendableStringBuilder<128> TraceArg; + TraceArg << "--tracehost=" << TraceHost; + Args.push_back(std::string(TraceArg.ToView())); + } + + json11::Json Task = json11::Json::object{ + {"name", "zenserver"}, + {"type", "TASK_TYPE_MAIN"}, + {"command", m_Config.CommandRef}, + {"args", Args}, + {"resources", + json11::Json::object{ + {"memory", m_Config.Memory}, + {"cpu", m_Config.Cpu}, + }}, + }; + + json11::Json Body = json11::Json::object{ + {"namespace", m_Config.Namespace}, + {"region", m_Config.Region}, + {"definition", + json11::Json::object{ + {"name", JobName}, + {"os", m_Config.Os}, + {"arch", m_Config.Arch}, + {"tasks", json11::Json::array{Task}}, + }}, + }; + + return Body.dump(); +} + +bool +OvermindClient::ScheduleJob(const std::string& JobJson, OvermindJobInfo& OutJob) +{ + ZEN_TRACE_CPU("OvermindClient::ScheduleJob"); + + const IoBuffer Payload = IoBufferBuilder::MakeFromMemory(MemoryView{JobJson.data(), JobJson.size()}, ZenContentType::kJSON); + + const HttpClient::Response Response = m_Http->Post("v1/jobs", Payload, MakeOvermindHeaders(m_Config)); + + if (Response.Error) + { + ZEN_WARN("Overmind job schedule failed: {}", Response.Error->ErrorMessage); + return false; + } + + if (!Response.IsSuccess()) + { + ZEN_WARN("Overmind job schedule failed with HTTP/{}", static_cast<int>(Response.StatusCode)); + return false; + } + + const std::string Body(Response.AsText()); + std::string Err; + const json11::Json Json = json11::Json::parse(Body, Err); + + if (!Err.empty()) + { + ZEN_WARN("invalid JSON response from Overmind job schedule: {}", Err); + return false; + } + + const auto& Job = Json["job"]; + OutJob.Id = Job["id"].string_value(); + OutJob.Status = Job["status"].string_value(); + + if (OutJob.Id.empty()) + { + ZEN_WARN("Overmind job schedule response missing job ID"); + return false; + } + + ZEN_INFO("Overmind job scheduled: id={}", OutJob.Id); + + return true; +} + +bool +OvermindClient::GetJobStatus(const std::string& JobId, OvermindJobInfo& OutJob) +{ + ZEN_TRACE_CPU("OvermindClient::GetJobStatus"); + + ExtendableStringBuilder<128> Path; + Path << "v1/jobs/" << JobId; + + const HttpClient::Response Response = m_Http->Get(Path.ToView(), MakeOvermindHeaders(m_Config)); + + if (Response.Error) + { + ZEN_WARN("Overmind job status query failed for '{}': {}", JobId, Response.Error->ErrorMessage); + return false; + } + + const int StatusCode = static_cast<int>(Response.StatusCode); + + if (StatusCode == 404) + { + ZEN_INFO("Overmind job '{}' not found", JobId); + OutJob.Status = "STATUS_ERROR"; + return true; + } + + if (!Response.IsSuccess()) + { + ZEN_WARN("Overmind job status query failed with HTTP/{}", StatusCode); + return false; + } + + const std::string Body(Response.AsText()); + std::string Err; + const json11::Json Json = json11::Json::parse(Body, Err); + + if (!Err.empty()) + { + ZEN_WARN("invalid JSON in Overmind job status response: {}", Err); + return false; + } + + const auto& Job = Json["job"]; + OutJob.Id = Job["id"].string_value(); + OutJob.Status = Job["status"].string_value(); + + return true; +} + +bool +OvermindClient::CancelJob(const std::string& JobId) +{ + ZEN_TRACE_CPU("OvermindClient::CancelJob"); + + ExtendableStringBuilder<256> Path; + Path << "v1/jobs/" << JobId << "?namespace=" << m_Config.Namespace; + + const HttpClient::Response Response = m_Http->Delete(Path.ToView(), MakeOvermindHeaders(m_Config)); + + if (Response.Error) + { + ZEN_WARN("Overmind job cancel failed for '{}': {}", JobId, Response.Error->ErrorMessage); + return false; + } + + if (!Response.IsSuccess()) + { + ZEN_WARN("Overmind job cancel failed with HTTP/{}", static_cast<int>(Response.StatusCode)); + return false; + } + + ZEN_INFO("Overmind job '{}' cancelled", JobId); + return true; +} + +} // namespace zen::overmind diff --git a/src/zenovermind/overmindconfig.cpp b/src/zenovermind/overmindconfig.cpp new file mode 100644 index 000000000..63e955725 --- /dev/null +++ b/src/zenovermind/overmindconfig.cpp @@ -0,0 +1,28 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenovermind/overmindconfig.h> + +namespace zen::overmind { + +bool +OvermindConfig::Validate() const +{ + if (ServerUrl.empty()) + { + return false; + } + + if (Namespace.empty()) + { + return false; + } + + if (CommandRef.empty()) + { + return false; + } + + return true; +} + +} // namespace zen::overmind diff --git a/src/zenovermind/overmindprovisioner.cpp b/src/zenovermind/overmindprovisioner.cpp new file mode 100644 index 000000000..9464fbaed --- /dev/null +++ b/src/zenovermind/overmindprovisioner.cpp @@ -0,0 +1,262 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenovermind/overmindclient.h> +#include <zenovermind/overmindprovisioner.h> + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/thread.h> +#include <zencore/trace.h> + +#include <chrono> + +namespace zen::overmind { + +OvermindProvisioner::OvermindProvisioner(const OvermindConfig& Config, + std::string_view OrchestratorEndpoint, + std::string_view CoordinatorSession, + bool CleanStart, + std::string_view TraceHost) +: m_Config(Config) +, m_OrchestratorEndpoint(OrchestratorEndpoint) +, m_CoordinatorSession(CoordinatorSession) +, m_CleanStart(CleanStart) +, m_TraceHost(TraceHost) +, m_Log(zen::logging::Get("overmind.provisioner")) +{ + ZEN_DEBUG("initializing provisioner (server: {}, namespace: {}, max_cores: {}, cores_per_job: {}, max_jobs: {})", + m_Config.ServerUrl, + m_Config.Namespace, + m_Config.MaxCores, + m_Config.CoresPerJob, + m_Config.MaxJobs); + + m_Client = std::make_unique<OvermindClient>(m_Config); + if (!m_Client->Initialize()) + { + ZEN_ERROR("failed to initialize Overmind HTTP client"); + return; + } + + ZEN_DEBUG("Overmind HTTP client initialized, starting management thread"); + + m_Thread = std::thread([this] { ManagementThread(); }); +} + +OvermindProvisioner::~OvermindProvisioner() +{ + ZEN_DEBUG("provisioner shutting down"); + + m_ShouldExit.store(true); + m_WakeCV.notify_all(); + + if (m_Thread.joinable()) + { + m_Thread.join(); + } + + CancelAllJobs(); + + ZEN_DEBUG("provisioner shutdown complete"); +} + +void +OvermindProvisioner::SetTargetCoreCount(uint32_t Count) +{ + const uint32_t Clamped = std::min(Count, static_cast<uint32_t>(m_Config.MaxCores)); + const uint32_t Previous = m_TargetCoreCount.exchange(Clamped); + + if (Clamped != Previous) + { + ZEN_DEBUG("target core count changed: {} -> {}", Previous, Clamped); + } + + m_WakeCV.notify_all(); +} + +OvermindProvisioningStats +OvermindProvisioner::GetStats() const +{ + OvermindProvisioningStats Stats; + Stats.TargetCoreCount = m_TargetCoreCount.load(); + Stats.EstimatedCoreCount = m_EstimatedCoreCount.load(); + Stats.RunningJobCount = m_RunningJobCount.load(); + + { + std::lock_guard<std::mutex> Lock(m_JobsLock); + Stats.ActiveJobCount = static_cast<uint32_t>(m_Jobs.size()); + } + + return Stats; +} + +void +OvermindProvisioner::ManagementThread() +{ + ZEN_TRACE_CPU("Overmind_Mgmt"); + zen::SetCurrentThreadName("overmind_mgmt"); + + ZEN_INFO("Overmind management thread started"); + + while (!m_ShouldExit.load()) + { + ZEN_DEBUG("management cycle: target={} estimated={} running={} active={}", + m_TargetCoreCount.load(), + m_EstimatedCoreCount.load(), + m_RunningJobCount.load(), + [this] { + std::lock_guard<std::mutex> Lock(m_JobsLock); + return m_Jobs.size(); + }()); + + SubmitNewJobs(); + PollExistingJobs(); + CleanupFinishedJobs(); + + // Wait up to 5 seconds or until woken + std::unique_lock<std::mutex> Lock(m_WakeMutex); + m_WakeCV.wait_for(Lock, std::chrono::seconds(5), [this] { return m_ShouldExit.load(); }); + } + + ZEN_INFO("Overmind management thread exiting"); +} + +void +OvermindProvisioner::SubmitNewJobs() +{ + ZEN_TRACE_CPU("OvermindProvisioner::SubmitNewJobs"); + + const uint32_t CoresPerJob = static_cast<uint32_t>(m_Config.CoresPerJob); + + while (m_EstimatedCoreCount.load() < m_TargetCoreCount.load()) + { + { + std::lock_guard<std::mutex> Lock(m_JobsLock); + if (static_cast<int>(m_Jobs.size()) >= m_Config.MaxJobs) + { + ZEN_INFO("Overmind max jobs limit reached ({})", m_Config.MaxJobs); + break; + } + } + + if (m_ShouldExit.load()) + { + break; + } + + const uint32_t Index = m_JobIndex.fetch_add(1); + + ExtendableStringBuilder<128> NameBuilder; + NameBuilder << m_Config.JobName << "-" << Index; + const std::string JobName(NameBuilder.ToView()); + + ZEN_DEBUG("scheduling job '{}' (estimated: {}, target: {})", JobName, m_EstimatedCoreCount.load(), m_TargetCoreCount.load()); + + const std::string JobJson = + m_Client->BuildJobJson(JobName, m_OrchestratorEndpoint, m_CoordinatorSession, m_CleanStart, m_TraceHost); + + OvermindJobInfo JobInfo; + if (!m_Client->ScheduleJob(JobJson, JobInfo)) + { + ZEN_WARN("failed to schedule Overmind job '{}'", JobName); + break; + } + + TrackedJob Tracked; + Tracked.Id = JobInfo.Id; + Tracked.Status = JobInfo.Status; + Tracked.Cores = static_cast<int>(CoresPerJob); + + { + std::lock_guard<std::mutex> Lock(m_JobsLock); + m_Jobs.push_back(std::move(Tracked)); + } + + m_EstimatedCoreCount.fetch_add(CoresPerJob); + + ZEN_INFO("Overmind job '{}' (id: {}) scheduled (estimated cores: {})", JobName, JobInfo.Id, m_EstimatedCoreCount.load()); + } +} + +void +OvermindProvisioner::PollExistingJobs() +{ + ZEN_TRACE_CPU("OvermindProvisioner::PollExistingJobs"); + + std::lock_guard<std::mutex> Lock(m_JobsLock); + + for (auto& Job : m_Jobs) + { + if (m_ShouldExit.load()) + { + break; + } + + OvermindJobInfo Info; + if (!m_Client->GetJobStatus(Job.Id, Info)) + { + ZEN_DEBUG("failed to poll status for job '{}'", Job.Id); + continue; + } + + const std::string PrevStatus = Job.Status; + Job.Status = Info.Status; + + if (PrevStatus != Job.Status) + { + ZEN_INFO("Overmind job '{}' status changed: {} -> {}", Job.Id, PrevStatus, Job.Status); + + if (Job.Status == "STATUS_RUNNING" && PrevStatus != "STATUS_RUNNING") + { + m_RunningJobCount.fetch_add(1); + } + else if (Job.Status != "STATUS_RUNNING" && PrevStatus == "STATUS_RUNNING") + { + m_RunningJobCount.fetch_sub(1); + } + } + } +} + +void +OvermindProvisioner::CleanupFinishedJobs() +{ + ZEN_TRACE_CPU("OvermindProvisioner::CleanupFinishedJobs"); + + std::lock_guard<std::mutex> Lock(m_JobsLock); + + for (auto It = m_Jobs.begin(); It != m_Jobs.end();) + { + if (It->Status == "STATUS_COMPLETE" || It->Status == "STATUS_ERROR") + { + ZEN_INFO("Overmind job '{}' finished (status: {}), removing from tracked jobs", It->Id, It->Status); + m_EstimatedCoreCount.fetch_sub(static_cast<uint32_t>(It->Cores)); + It = m_Jobs.erase(It); + } + else + { + ++It; + } + } +} + +void +OvermindProvisioner::CancelAllJobs() +{ + ZEN_TRACE_CPU("OvermindProvisioner::CancelAllJobs"); + + std::lock_guard<std::mutex> Lock(m_JobsLock); + + for (const auto& Job : m_Jobs) + { + ZEN_INFO("cancelling Overmind job '{}' during shutdown", Job.Id); + m_Client->CancelJob(Job.Id); + } + + m_Jobs.clear(); + m_EstimatedCoreCount.store(0); + m_RunningJobCount.store(0); +} + +} // namespace zen::overmind diff --git a/src/zenovermind/xmake.lua b/src/zenovermind/xmake.lua new file mode 100644 index 000000000..49d667f27 --- /dev/null +++ b/src/zenovermind/xmake.lua @@ -0,0 +1,10 @@ +-- Copyright Epic Games, Inc. All Rights Reserved. + +target('zenovermind') + set_kind("static") + set_group("libs") + add_headerfiles("**.h") + add_files("**.cpp") + add_includedirs("include", {public=true}) + add_deps("zencore", "zenhttp", "zenutil") + add_packages("json11") diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp index f35fe0f97..89886beee 100644 --- a/src/zenserver/compute/computeserver.cpp +++ b/src/zenserver/compute/computeserver.cpp @@ -29,6 +29,10 @@ # include <zennomad/nomadconfig.h> # include <zennomad/nomadprovisioner.h> # endif +# if ZEN_WITH_OVERMIND +# include <zenovermind/overmindconfig.h> +# include <zenovermind/overmindprovisioner.h> +# endif ZEN_THIRD_PARTY_INCLUDES_START # include <cxxopts.hpp> @@ -331,6 +335,107 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value<std::string>(m_ServerOptions.NomadConfig.JobPrefix)->default_value("zenserver-worker"), ""); # endif + +# if ZEN_WITH_OVERMIND + // Overmind provisioning options + Options.add_option("overmind", + "", + "overmind-enabled", + "Enable Overmind worker provisioning", + cxxopts::value<bool>(m_ServerOptions.OvermindConfig.Enabled)->default_value("false"), + ""); + + Options.add_option("overmind", + "", + "overmind-server", + "Overmind REST gateway URL", + cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.ServerUrl)->default_value(""), + ""); + + Options.add_option("overmind", + "", + "overmind-token", + "Overmind JWT bearer token", + cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.AuthToken)->default_value(""), + ""); + + Options.add_option("overmind", + "", + "overmind-namespace", + "Overmind namespace for job submission", + cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.Namespace)->default_value(""), + ""); + + Options.add_option("overmind", + "", + "overmind-region", + "Overmind target region (e.g. REGION_US_EAST)", + cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.Region)->default_value(""), + ""); + + Options.add_option("overmind", + "", + "overmind-command", + "Overmind command reference (namespace:name:version)", + cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.CommandRef)->default_value(""), + ""); + + Options.add_option("overmind", + "", + "overmind-os", + "Target operating system for Overmind jobs", + cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.Os)->default_value("OPERATING_SYSTEM_LINUX"), + ""); + + Options.add_option("overmind", + "", + "overmind-arch", + "Target CPU architecture for Overmind jobs", + cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.Arch)->default_value("CPU_ARCHITECTURE_X86_64"), + ""); + + Options.add_option("overmind", + "", + "overmind-memory", + "Memory per Overmind task (e.g. 4GiB)", + cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.Memory)->default_value("4GiB"), + ""); + + Options.add_option("overmind", + "", + "overmind-cpu", + "CPU per Overmind task in millicores (e.g. 2000m)", + cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.Cpu)->default_value("2000m"), + ""); + + Options.add_option("overmind", + "", + "overmind-max-jobs", + "Maximum concurrent Overmind jobs", + cxxopts::value<int>(m_ServerOptions.OvermindConfig.MaxJobs)->default_value("64"), + ""); + + Options.add_option("overmind", + "", + "overmind-cores-per-job", + "Estimated cores per Overmind job (for scaling)", + cxxopts::value<int>(m_ServerOptions.OvermindConfig.CoresPerJob)->default_value("32"), + ""); + + Options.add_option("overmind", + "", + "overmind-max-cores", + "Maximum total cores to provision via Overmind", + cxxopts::value<int>(m_ServerOptions.OvermindConfig.MaxCores)->default_value("2048"), + ""); + + Options.add_option("overmind", + "", + "overmind-job-name", + "Name for generated Overmind jobs", + cxxopts::value<std::string>(m_ServerOptions.OvermindConfig.JobName)->default_value("zenserver-worker"), + ""); +# endif } void @@ -467,6 +572,12 @@ ZenComputeServer::Cleanup() m_NomadProvisioner.reset(); # endif +# if ZEN_WITH_OVERMIND + // Shut down Overmind provisioner - stops the management thread and + // cancels all tracked jobs. + m_OvermindProvisioner.reset(); +# endif + // Close the orchestrator WebSocket client before stopping the io_context m_WsReconnectTimer.cancel(); if (m_OrchestratorWsClient) @@ -630,6 +741,36 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) } } # endif + +# if ZEN_WITH_OVERMIND + // Overmind provisioner + if (ServerConfig.OvermindConfig.Enabled && !ServerConfig.OvermindConfig.ServerUrl.empty()) + { + ZEN_INFO("instantiating Overmind provisioner (server: {})", ServerConfig.OvermindConfig.ServerUrl); + + const auto& OvermindCfg = ServerConfig.OvermindConfig; + + if (!OvermindCfg.Validate()) + { + ZEN_ERROR("invalid Overmind configuration"); + } + else + { + ExtendableStringBuilder<256> OrchestratorEndpoint; + OrchestratorEndpoint << m_Http->GetServiceUri(m_OrchestratorService.get()); + if (auto View = OrchestratorEndpoint.ToView(); !View.empty() && View.back() != '/') + { + OrchestratorEndpoint << '/'; + } + + m_OvermindProvisioner = std::make_unique<overmind::OvermindProvisioner>(OvermindCfg, + OrchestratorEndpoint, + m_OrchestratorService->GetSessionId().ToString(), + ServerConfig.ProvisionClean, + ServerConfig.ProvisionTraceHost); + } + } +# endif } void @@ -727,6 +868,11 @@ ZenComputeServer::BuildAnnounceBody() AnnounceBody << "provisioner" << "nomad"; } + else if (m_InstanceId.starts_with("overmind-")) + { + AnnounceBody << "provisioner" + << "overmind"; + } if (!m_CoordinatorSession.empty()) { @@ -901,6 +1047,18 @@ ZenComputeServer::ProvisionerMaintenanceTick() Stats.RunningJobCount); } # endif + +# if ZEN_WITH_OVERMIND + if (m_OvermindProvisioner) + { + m_OvermindProvisioner->SetTargetCoreCount(UINT32_MAX); + auto Stats = m_OvermindProvisioner->GetStats(); + ZEN_DEBUG("Overmind maintenance: target={}, estimated={}, running jobs={}", + Stats.TargetCoreCount, + Stats.EstimatedCoreCount, + Stats.RunningJobCount); + } +# endif } void @@ -913,6 +1071,9 @@ ZenComputeServer::EnqueueProvisionerMaintenanceTimer() # if ZEN_WITH_NOMAD HasProvisioner = HasProvisioner || (m_NomadProvisioner != nullptr); # endif +# if ZEN_WITH_OVERMIND + HasProvisioner = HasProvisioner || (m_OvermindProvisioner != nullptr); +# endif if (!HasProvisioner) { @@ -1011,6 +1172,17 @@ ZenComputeServer::Run() } # endif +# if ZEN_WITH_OVERMIND + // Start Overmind provisioning if configured - request maximum allowed cores. + // SetTargetCoreCount clamps to OvermindConfig::MaxCores internally. + if (m_OvermindProvisioner) + { + m_OvermindProvisioner->SetTargetCoreCount(UINT32_MAX); + auto Stats = m_OvermindProvisioner->GetStats(); + ZEN_INFO("Overmind provisioning started (target cores: {})", Stats.TargetCoreCount); + } +# endif + EnqueueProvisionerMaintenanceTimer(); m_Http->Run(IsInteractiveMode); diff --git a/src/zenserver/compute/computeserver.h b/src/zenserver/compute/computeserver.h index aa9c1a5b3..38705d2e4 100644 --- a/src/zenserver/compute/computeserver.h +++ b/src/zenserver/compute/computeserver.h @@ -40,6 +40,13 @@ class NomadProvisioner; } // namespace zen::nomad # endif +# if ZEN_WITH_OVERMIND +# include <zenovermind/overmindconfig.h> +namespace zen::overmind { +class OvermindProvisioner; +} // namespace zen::overmind +# endif + namespace zen { class HttpApiService; @@ -64,6 +71,10 @@ struct ZenComputeServerConfig : public ZenServerConfig # if ZEN_WITH_NOMAD nomad::NomadConfig NomadConfig; # endif + +# if ZEN_WITH_OVERMIND + overmind::OvermindConfig OvermindConfig; +# endif }; struct ZenComputeServerConfigurator : public ZenServerConfiguratorBase @@ -95,6 +106,10 @@ private: std::string m_NomadDriverStr = "raw_exec"; std::string m_NomadDistributionStr = "predeployed"; # endif + +# if ZEN_WITH_OVERMIND + // No string-to-enum options needed for Overmind yet +# endif }; class ZenComputeServerMain : public ZenServerMain @@ -150,6 +165,9 @@ private: # if ZEN_WITH_NOMAD std::unique_ptr<zen::nomad::NomadProvisioner> m_NomadProvisioner; # endif +# if ZEN_WITH_OVERMIND + std::unique_ptr<zen::overmind::OvermindProvisioner> m_OvermindProvisioner; +# endif SystemMetricsTracker m_MetricsTracker; std::string m_CoordinatorEndpoint; std::string m_CoordinatorSession; diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua index b609d1050..e8fb3d3e7 100644 --- a/src/zenserver/xmake.lua +++ b/src/zenserver/xmake.lua @@ -57,6 +57,10 @@ target("zenserver") add_deps("zennomad") end + if has_config("zenovermind") then + add_deps("zenovermind") + end + if is_mode("release") then set_optimize("fastest") end @@ -442,6 +442,13 @@ option("zennomad") option_end() add_define_by_config("ZEN_WITH_NOMAD", "zennomad") +option("zenovermind") + set_default(compute_default) + set_showmenu(true) + set_description("Enable Overmind worker provisioning") +option_end() +add_define_by_config("ZEN_WITH_OVERMIND", "zenovermind") + if is_os("windows") then add_defines("UE_MEMORY_TRACE_AVAILABLE=1") @@ -501,6 +508,9 @@ end if has_config("zennomad") then includes("src/zennomad") end +if has_config("zenovermind") then + includes("src/zenovermind") +end includes("src/zenstore", "src/zenstore-test") includes("src/zentelemetry", "src/zentelemetry-test") includes("src/zenutil", "src/zenutil-test") |