// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include namespace zen::overmind { OvermindProvisioner::OvermindProvisioner(const OvermindConfig& Config, std::string_view OrchestratorEndpoint, std::string_view CoordinatorSession, bool CleanStart, std::string_view TraceHost) : m_Config(Config) , m_OrchestratorEndpoint(OrchestratorEndpoint) , m_CoordinatorSession(CoordinatorSession) , m_CleanStart(CleanStart) , m_TraceHost(TraceHost) , m_Log(zen::logging::Get("overmind.provisioner")) { ZEN_DEBUG("initializing provisioner (server: {}, namespace: {}, max_cores: {}, cores_per_job: {}, max_jobs: {})", m_Config.ServerUrl, m_Config.Namespace, m_Config.MaxCores, m_Config.CoresPerJob, m_Config.MaxJobs); m_Client = std::make_unique(m_Config); if (!m_Client->Initialize()) { ZEN_ERROR("failed to initialize Overmind HTTP client"); return; } ZEN_DEBUG("Overmind HTTP client initialized, starting management thread"); m_Thread = std::thread([this] { ManagementThread(); }); } OvermindProvisioner::~OvermindProvisioner() { ZEN_DEBUG("provisioner shutting down"); m_ShouldExit.store(true); m_WakeCV.notify_all(); if (m_Thread.joinable()) { m_Thread.join(); } CancelAllJobs(); ZEN_DEBUG("provisioner shutdown complete"); } void OvermindProvisioner::SetTargetCoreCount(uint32_t Count) { const uint32_t Clamped = std::min(Count, static_cast(m_Config.MaxCores)); const uint32_t Previous = m_TargetCoreCount.exchange(Clamped); if (Clamped != Previous) { ZEN_DEBUG("target core count changed: {} -> {}", Previous, Clamped); } m_WakeCV.notify_all(); } OvermindProvisioningStats OvermindProvisioner::GetStats() const { OvermindProvisioningStats Stats; Stats.TargetCoreCount = m_TargetCoreCount.load(); Stats.EstimatedCoreCount = m_EstimatedCoreCount.load(); Stats.RunningJobCount = m_RunningJobCount.load(); { std::lock_guard Lock(m_JobsLock); Stats.ActiveJobCount = static_cast(m_Jobs.size()); } return Stats; } void OvermindProvisioner::ManagementThread() { ZEN_TRACE_CPU("Overmind_Mgmt"); zen::SetCurrentThreadName("overmind_mgmt"); ZEN_INFO("Overmind management thread started"); while (!m_ShouldExit.load()) { ZEN_DEBUG("management cycle: target={} estimated={} running={} active={}", m_TargetCoreCount.load(), m_EstimatedCoreCount.load(), m_RunningJobCount.load(), [this] { std::lock_guard Lock(m_JobsLock); return m_Jobs.size(); }()); SubmitNewJobs(); PollExistingJobs(); CleanupFinishedJobs(); // Wait up to 5 seconds or until woken std::unique_lock Lock(m_WakeMutex); m_WakeCV.wait_for(Lock, std::chrono::seconds(5), [this] { return m_ShouldExit.load(); }); } ZEN_INFO("Overmind management thread exiting"); } void OvermindProvisioner::SubmitNewJobs() { ZEN_TRACE_CPU("OvermindProvisioner::SubmitNewJobs"); const uint32_t CoresPerJob = static_cast(m_Config.CoresPerJob); while (m_EstimatedCoreCount.load() < m_TargetCoreCount.load()) { { std::lock_guard Lock(m_JobsLock); if (static_cast(m_Jobs.size()) >= m_Config.MaxJobs) { ZEN_INFO("Overmind max jobs limit reached ({})", m_Config.MaxJobs); break; } } if (m_ShouldExit.load()) { break; } const uint32_t Index = m_JobIndex.fetch_add(1); ExtendableStringBuilder<128> NameBuilder; NameBuilder << m_Config.JobName << "-" << Index; const std::string JobName(NameBuilder.ToView()); ZEN_DEBUG("scheduling job '{}' (estimated: {}, target: {})", JobName, m_EstimatedCoreCount.load(), m_TargetCoreCount.load()); const std::string JobJson = m_Client->BuildJobJson(JobName, m_OrchestratorEndpoint, m_CoordinatorSession, m_CleanStart, m_TraceHost); OvermindJobInfo JobInfo; if (!m_Client->ScheduleJob(JobJson, JobInfo)) { ZEN_WARN("failed to schedule Overmind job '{}'", JobName); break; } TrackedJob Tracked; Tracked.Id = JobInfo.Id; Tracked.Status = JobInfo.Status; Tracked.Cores = static_cast(CoresPerJob); { std::lock_guard Lock(m_JobsLock); m_Jobs.push_back(std::move(Tracked)); } m_EstimatedCoreCount.fetch_add(CoresPerJob); ZEN_INFO("Overmind job '{}' (id: {}) scheduled (estimated cores: {})", JobName, JobInfo.Id, m_EstimatedCoreCount.load()); } } void OvermindProvisioner::PollExistingJobs() { ZEN_TRACE_CPU("OvermindProvisioner::PollExistingJobs"); std::lock_guard Lock(m_JobsLock); for (auto& Job : m_Jobs) { if (m_ShouldExit.load()) { break; } OvermindJobInfo Info; if (!m_Client->GetJobStatus(Job.Id, Info)) { ZEN_DEBUG("failed to poll status for job '{}'", Job.Id); continue; } const std::string PrevStatus = Job.Status; Job.Status = Info.Status; if (PrevStatus != Job.Status) { ZEN_INFO("Overmind job '{}' status changed: {} -> {}", Job.Id, PrevStatus, Job.Status); if (Job.Status == "STATUS_RUNNING" && PrevStatus != "STATUS_RUNNING") { m_RunningJobCount.fetch_add(1); } else if (Job.Status != "STATUS_RUNNING" && PrevStatus == "STATUS_RUNNING") { m_RunningJobCount.fetch_sub(1); } } } } void OvermindProvisioner::CleanupFinishedJobs() { ZEN_TRACE_CPU("OvermindProvisioner::CleanupFinishedJobs"); std::lock_guard Lock(m_JobsLock); for (auto It = m_Jobs.begin(); It != m_Jobs.end();) { if (It->Status == "STATUS_COMPLETE" || It->Status == "STATUS_ERROR") { ZEN_INFO("Overmind job '{}' finished (status: {}), removing from tracked jobs", It->Id, It->Status); m_EstimatedCoreCount.fetch_sub(static_cast(It->Cores)); It = m_Jobs.erase(It); } else { ++It; } } } void OvermindProvisioner::CancelAllJobs() { ZEN_TRACE_CPU("OvermindProvisioner::CancelAllJobs"); std::lock_guard Lock(m_JobsLock); for (const auto& Job : m_Jobs) { ZEN_INFO("cancelling Overmind job '{}' during shutdown", Job.Id); m_Client->CancelJob(Job.Id); } m_Jobs.clear(); m_EstimatedCoreCount.store(0); m_RunningJobCount.store(0); } } // namespace zen::overmind