diff options
Diffstat (limited to 'src/zenovermind/overmindprovisioner.cpp')
| -rw-r--r-- | src/zenovermind/overmindprovisioner.cpp | 262 |
1 files changed, 262 insertions, 0 deletions
diff --git a/src/zenovermind/overmindprovisioner.cpp b/src/zenovermind/overmindprovisioner.cpp new file mode 100644 index 000000000..9464fbaed --- /dev/null +++ b/src/zenovermind/overmindprovisioner.cpp @@ -0,0 +1,262 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenovermind/overmindclient.h> +#include <zenovermind/overmindprovisioner.h> + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/thread.h> +#include <zencore/trace.h> + +#include <chrono> + +namespace zen::overmind { + +OvermindProvisioner::OvermindProvisioner(const OvermindConfig& Config, + std::string_view OrchestratorEndpoint, + std::string_view CoordinatorSession, + bool CleanStart, + std::string_view TraceHost) +: m_Config(Config) +, m_OrchestratorEndpoint(OrchestratorEndpoint) +, m_CoordinatorSession(CoordinatorSession) +, m_CleanStart(CleanStart) +, m_TraceHost(TraceHost) +, m_Log(zen::logging::Get("overmind.provisioner")) +{ + ZEN_DEBUG("initializing provisioner (server: {}, namespace: {}, max_cores: {}, cores_per_job: {}, max_jobs: {})", + m_Config.ServerUrl, + m_Config.Namespace, + m_Config.MaxCores, + m_Config.CoresPerJob, + m_Config.MaxJobs); + + m_Client = std::make_unique<OvermindClient>(m_Config); + if (!m_Client->Initialize()) + { + ZEN_ERROR("failed to initialize Overmind HTTP client"); + return; + } + + ZEN_DEBUG("Overmind HTTP client initialized, starting management thread"); + + m_Thread = std::thread([this] { ManagementThread(); }); +} + +OvermindProvisioner::~OvermindProvisioner() +{ + ZEN_DEBUG("provisioner shutting down"); + + m_ShouldExit.store(true); + m_WakeCV.notify_all(); + + if (m_Thread.joinable()) + { + m_Thread.join(); + } + + CancelAllJobs(); + + ZEN_DEBUG("provisioner shutdown complete"); +} + +void +OvermindProvisioner::SetTargetCoreCount(uint32_t Count) +{ + const uint32_t Clamped = std::min(Count, static_cast<uint32_t>(m_Config.MaxCores)); + const uint32_t Previous = m_TargetCoreCount.exchange(Clamped); + + if (Clamped != Previous) + { + ZEN_DEBUG("target core count changed: {} -> {}", Previous, Clamped); + } + + m_WakeCV.notify_all(); +} + +OvermindProvisioningStats +OvermindProvisioner::GetStats() const +{ + OvermindProvisioningStats Stats; + Stats.TargetCoreCount = m_TargetCoreCount.load(); + Stats.EstimatedCoreCount = m_EstimatedCoreCount.load(); + Stats.RunningJobCount = m_RunningJobCount.load(); + + { + std::lock_guard<std::mutex> Lock(m_JobsLock); + Stats.ActiveJobCount = static_cast<uint32_t>(m_Jobs.size()); + } + + return Stats; +} + +void +OvermindProvisioner::ManagementThread() +{ + ZEN_TRACE_CPU("Overmind_Mgmt"); + zen::SetCurrentThreadName("overmind_mgmt"); + + ZEN_INFO("Overmind management thread started"); + + while (!m_ShouldExit.load()) + { + ZEN_DEBUG("management cycle: target={} estimated={} running={} active={}", + m_TargetCoreCount.load(), + m_EstimatedCoreCount.load(), + m_RunningJobCount.load(), + [this] { + std::lock_guard<std::mutex> Lock(m_JobsLock); + return m_Jobs.size(); + }()); + + SubmitNewJobs(); + PollExistingJobs(); + CleanupFinishedJobs(); + + // Wait up to 5 seconds or until woken + std::unique_lock<std::mutex> Lock(m_WakeMutex); + m_WakeCV.wait_for(Lock, std::chrono::seconds(5), [this] { return m_ShouldExit.load(); }); + } + + ZEN_INFO("Overmind management thread exiting"); +} + +void +OvermindProvisioner::SubmitNewJobs() +{ + ZEN_TRACE_CPU("OvermindProvisioner::SubmitNewJobs"); + + const uint32_t CoresPerJob = static_cast<uint32_t>(m_Config.CoresPerJob); + + while (m_EstimatedCoreCount.load() < m_TargetCoreCount.load()) + { + { + std::lock_guard<std::mutex> Lock(m_JobsLock); + if (static_cast<int>(m_Jobs.size()) >= m_Config.MaxJobs) + { + ZEN_INFO("Overmind max jobs limit reached ({})", m_Config.MaxJobs); + break; + } + } + + if (m_ShouldExit.load()) + { + break; + } + + const uint32_t Index = m_JobIndex.fetch_add(1); + + ExtendableStringBuilder<128> NameBuilder; + NameBuilder << m_Config.JobName << "-" << Index; + const std::string JobName(NameBuilder.ToView()); + + ZEN_DEBUG("scheduling job '{}' (estimated: {}, target: {})", JobName, m_EstimatedCoreCount.load(), m_TargetCoreCount.load()); + + const std::string JobJson = + m_Client->BuildJobJson(JobName, m_OrchestratorEndpoint, m_CoordinatorSession, m_CleanStart, m_TraceHost); + + OvermindJobInfo JobInfo; + if (!m_Client->ScheduleJob(JobJson, JobInfo)) + { + ZEN_WARN("failed to schedule Overmind job '{}'", JobName); + break; + } + + TrackedJob Tracked; + Tracked.Id = JobInfo.Id; + Tracked.Status = JobInfo.Status; + Tracked.Cores = static_cast<int>(CoresPerJob); + + { + std::lock_guard<std::mutex> Lock(m_JobsLock); + m_Jobs.push_back(std::move(Tracked)); + } + + m_EstimatedCoreCount.fetch_add(CoresPerJob); + + ZEN_INFO("Overmind job '{}' (id: {}) scheduled (estimated cores: {})", JobName, JobInfo.Id, m_EstimatedCoreCount.load()); + } +} + +void +OvermindProvisioner::PollExistingJobs() +{ + ZEN_TRACE_CPU("OvermindProvisioner::PollExistingJobs"); + + std::lock_guard<std::mutex> Lock(m_JobsLock); + + for (auto& Job : m_Jobs) + { + if (m_ShouldExit.load()) + { + break; + } + + OvermindJobInfo Info; + if (!m_Client->GetJobStatus(Job.Id, Info)) + { + ZEN_DEBUG("failed to poll status for job '{}'", Job.Id); + continue; + } + + const std::string PrevStatus = Job.Status; + Job.Status = Info.Status; + + if (PrevStatus != Job.Status) + { + ZEN_INFO("Overmind job '{}' status changed: {} -> {}", Job.Id, PrevStatus, Job.Status); + + if (Job.Status == "STATUS_RUNNING" && PrevStatus != "STATUS_RUNNING") + { + m_RunningJobCount.fetch_add(1); + } + else if (Job.Status != "STATUS_RUNNING" && PrevStatus == "STATUS_RUNNING") + { + m_RunningJobCount.fetch_sub(1); + } + } + } +} + +void +OvermindProvisioner::CleanupFinishedJobs() +{ + ZEN_TRACE_CPU("OvermindProvisioner::CleanupFinishedJobs"); + + std::lock_guard<std::mutex> Lock(m_JobsLock); + + for (auto It = m_Jobs.begin(); It != m_Jobs.end();) + { + if (It->Status == "STATUS_COMPLETE" || It->Status == "STATUS_ERROR") + { + ZEN_INFO("Overmind job '{}' finished (status: {}), removing from tracked jobs", It->Id, It->Status); + m_EstimatedCoreCount.fetch_sub(static_cast<uint32_t>(It->Cores)); + It = m_Jobs.erase(It); + } + else + { + ++It; + } + } +} + +void +OvermindProvisioner::CancelAllJobs() +{ + ZEN_TRACE_CPU("OvermindProvisioner::CancelAllJobs"); + + std::lock_guard<std::mutex> Lock(m_JobsLock); + + for (const auto& Job : m_Jobs) + { + ZEN_INFO("cancelling Overmind job '{}' during shutdown", Job.Id); + m_Client->CancelJob(Job.Id); + } + + m_Jobs.clear(); + m_EstimatedCoreCount.store(0); + m_RunningJobCount.store(0); +} + +} // namespace zen::overmind |