// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen::nomad { namespace { HttpClient::KeyValueMap MakeNomadHeaders(const NomadConfig& Config) { HttpClient::KeyValueMap Headers; if (!Config.AclToken.empty()) { Headers->emplace("X-Nomad-Token", Config.AclToken); } return Headers; } } // namespace NomadClient::NomadClient(const NomadConfig& Config) : m_Config(Config), m_Log(zen::logging::Get("nomad.client")) { } NomadClient::~NomadClient() = default; bool NomadClient::Initialize() { ZEN_TRACE_CPU("NomadClient::Initialize"); HttpClientSettings Settings; Settings.LogCategory = "nomad.http"; Settings.ConnectTimeout = std::chrono::milliseconds{10000}; Settings.Timeout = std::chrono::milliseconds{60000}; Settings.RetryCount = 1; // Ensure the base URL ends with a slash so path concatenation works correctly std::string BaseUrl = m_Config.ServerUrl; if (!BaseUrl.empty() && BaseUrl.back() != '/') { BaseUrl += '/'; } m_Http = std::make_unique(BaseUrl, Settings); return true; } std::string NomadClient::BuildJobJson(const std::string& JobId, const std::string& OrchestratorEndpoint) const { ZEN_TRACE_CPU("NomadClient::BuildJobJson"); // Build the task config based on driver and distribution mode json11::Json::object TaskConfig; if (m_Config.TaskDriver == Driver::RawExec) { std::string Command; if (m_Config.BinDistribution == BinaryDistribution::PreDeployed) { Command = m_Config.BinaryPath; } else { // Artifact mode: binary is downloaded to local/zenserver Command = "local/zenserver"; } TaskConfig["command"] = Command; json11::Json::array Args; Args.push_back("compute"); Args.push_back("--http=asio"); if (!OrchestratorEndpoint.empty()) { ExtendableStringBuilder<256> CoordArg; CoordArg << "--coordinator-endpoint=" << OrchestratorEndpoint; Args.push_back(std::string(CoordArg.ToView())); } { ExtendableStringBuilder<128> IdArg; IdArg << "--instance-id=nomad-" << JobId; Args.push_back(std::string(IdArg.ToView())); } TaskConfig["args"] = Args; } else { // Docker driver TaskConfig["image"] = m_Config.DockerImage; json11::Json::array Args; Args.push_back("compute"); Args.push_back("--http=asio"); if (!OrchestratorEndpoint.empty()) { ExtendableStringBuilder<256> CoordArg; CoordArg << "--coordinator-endpoint=" << OrchestratorEndpoint; Args.push_back(std::string(CoordArg.ToView())); } { ExtendableStringBuilder<128> IdArg; IdArg << "--instance-id=nomad-" << JobId; Args.push_back(std::string(IdArg.ToView())); } TaskConfig["args"] = Args; } // Build resource stanza json11::Json::object Resources; Resources["CPU"] = m_Config.CpuMhz; Resources["MemoryMB"] = m_Config.MemoryMb; // Build the task json11::Json::object Task; Task["Name"] = "zenserver"; Task["Driver"] = (m_Config.TaskDriver == Driver::RawExec) ? "raw_exec" : "docker"; Task["Config"] = TaskConfig; Task["Resources"] = Resources; // Add artifact stanza if using artifact distribution if (m_Config.BinDistribution == BinaryDistribution::Artifact && !m_Config.ArtifactSource.empty()) { json11::Json::object Artifact; Artifact["GetterSource"] = m_Config.ArtifactSource; json11::Json::array Artifacts; Artifacts.push_back(Artifact); Task["Artifacts"] = Artifacts; } json11::Json::array Tasks; Tasks.push_back(Task); // Build the task group json11::Json::object Group; Group["Name"] = "zenserver-group"; Group["Count"] = 1; Group["Tasks"] = Tasks; json11::Json::array Groups; Groups.push_back(Group); // Build datacenters array json11::Json::array Datacenters; Datacenters.push_back(m_Config.Datacenter); // Build the job json11::Json::object Job; Job["ID"] = JobId; Job["Name"] = JobId; Job["Type"] = "batch"; Job["Datacenters"] = Datacenters; Job["TaskGroups"] = Groups; if (!m_Config.Namespace.empty() && m_Config.Namespace != "default") { Job["Namespace"] = m_Config.Namespace; } if (!m_Config.Region.empty()) { Job["Region"] = m_Config.Region; } // Wrap in the registration envelope json11::Json::object Root; Root["Job"] = Job; return json11::Json(Root).dump(); } bool NomadClient::SubmitJob(const std::string& JobJson, NomadJobInfo& OutJob) { ZEN_TRACE_CPU("NomadClient::SubmitJob"); const IoBuffer Payload = IoBufferBuilder::MakeFromMemory(MemoryView{JobJson.data(), JobJson.size()}, ZenContentType::kJSON); const HttpClient::Response Response = m_Http->Put("v1/jobs", Payload, MakeNomadHeaders(m_Config)); if (Response.Error) { ZEN_WARN("Nomad job submit failed: {}", Response.Error->ErrorMessage); return false; } const int StatusCode = static_cast(Response.StatusCode); if (!Response.IsSuccess()) { ZEN_WARN("Nomad job submit failed with HTTP/{}", StatusCode); return false; } const std::string Body(Response.AsText()); std::string Err; const json11::Json Json = json11::Json::parse(Body, Err); if (!Err.empty()) { ZEN_WARN("invalid JSON response from Nomad job submit: {}", Err); return false; } // The response contains EvalID; the job ID is what we submitted OutJob.Id = Json["JobModifyIndex"].is_number() ? OutJob.Id : ""; OutJob.Status = "pending"; ZEN_INFO("Nomad job submitted: eval_id={}", Json["EvalID"].string_value()); return true; } bool NomadClient::GetJobStatus(const std::string& JobId, NomadJobInfo& OutJob) { ZEN_TRACE_CPU("NomadClient::GetJobStatus"); ExtendableStringBuilder<128> Path; Path << "v1/job/" << JobId; const HttpClient::Response Response = m_Http->Get(Path.ToView(), MakeNomadHeaders(m_Config)); if (Response.Error) { ZEN_WARN("Nomad job status query failed for '{}': {}", JobId, Response.Error->ErrorMessage); return false; } const int StatusCode = static_cast(Response.StatusCode); if (StatusCode == 404) { ZEN_INFO("Nomad job '{}' not found", JobId); OutJob.Status = "dead"; return true; } if (!Response.IsSuccess()) { ZEN_WARN("Nomad job status query failed with HTTP/{}", StatusCode); return false; } const std::string Body(Response.AsText()); std::string Err; const json11::Json Json = json11::Json::parse(Body, Err); if (!Err.empty()) { ZEN_WARN("invalid JSON in Nomad job status response: {}", Err); return false; } OutJob.Id = Json["ID"].string_value(); OutJob.Status = Json["Status"].string_value(); if (const json11::Json Desc = Json["StatusDescription"]; Desc.is_string()) { OutJob.StatusDescription = Desc.string_value(); } return true; } bool NomadClient::GetAllocations(const std::string& JobId, std::vector& OutAllocs) { ZEN_TRACE_CPU("NomadClient::GetAllocations"); ExtendableStringBuilder<128> Path; Path << "v1/job/" << JobId << "/allocations"; const HttpClient::Response Response = m_Http->Get(Path.ToView(), MakeNomadHeaders(m_Config)); if (Response.Error) { ZEN_WARN("Nomad allocation query failed for '{}': {}", JobId, Response.Error->ErrorMessage); return false; } if (!Response.IsSuccess()) { ZEN_WARN("Nomad allocation query failed with HTTP/{}", static_cast(Response.StatusCode)); return false; } const std::string Body(Response.AsText()); std::string Err; const json11::Json Json = json11::Json::parse(Body, Err); if (!Err.empty()) { ZEN_WARN("invalid JSON in Nomad allocation response: {}", Err); return false; } OutAllocs.clear(); if (!Json.is_array()) { return true; } for (const json11::Json& AllocVal : Json.array_items()) { NomadAllocInfo Alloc; Alloc.Id = AllocVal["ID"].string_value(); Alloc.ClientStatus = AllocVal["ClientStatus"].string_value(); // Extract task state if available if (const json11::Json TaskStates = AllocVal["TaskStates"]; TaskStates.is_object()) { for (const auto& [TaskName, TaskState] : TaskStates.object_items()) { if (TaskState["State"].is_string()) { Alloc.TaskState = TaskState["State"].string_value(); } } } OutAllocs.push_back(std::move(Alloc)); } return true; } bool NomadClient::StopJob(const std::string& JobId) { ZEN_TRACE_CPU("NomadClient::StopJob"); ExtendableStringBuilder<128> Path; Path << "v1/job/" << JobId; const HttpClient::Response Response = m_Http->Delete(Path.ToView(), MakeNomadHeaders(m_Config)); if (Response.Error) { ZEN_WARN("Nomad job stop failed for '{}': {}", JobId, Response.Error->ErrorMessage); return false; } if (!Response.IsSuccess()) { ZEN_WARN("Nomad job stop failed with HTTP/{}", static_cast(Response.StatusCode)); return false; } ZEN_INFO("Nomad job '{}' stopped", JobId); return true; } } // namespace zen::nomad