diff options
| author | Liam Mitchell <[email protected]> | 2026-03-09 19:06:36 -0700 |
|---|---|---|
| committer | Liam Mitchell <[email protected]> | 2026-03-09 19:06:36 -0700 |
| commit | d1abc50ee9d4fb72efc646e17decafea741caa34 (patch) | |
| tree | e4288e00f2f7ca0391b83d986efcb69d3ba66a83 /src/zennomad/nomadclient.cpp | |
| parent | Allow requests with invalid content-types unless specified in command line or... (diff) | |
| parent | updated chunk–block analyser (#818) (diff) | |
| download | zen-d1abc50ee9d4fb72efc646e17decafea741caa34.tar.xz zen-d1abc50ee9d4fb72efc646e17decafea741caa34.zip | |
Merge branch 'main' into lm/restrict-content-type
Diffstat (limited to 'src/zennomad/nomadclient.cpp')
| -rw-r--r-- | src/zennomad/nomadclient.cpp | 366 |
1 files changed, 366 insertions, 0 deletions
diff --git a/src/zennomad/nomadclient.cpp b/src/zennomad/nomadclient.cpp new file mode 100644 index 000000000..9edcde125 --- /dev/null +++ b/src/zennomad/nomadclient.cpp @@ -0,0 +1,366 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> +#include <zencore/logging.h> +#include <zencore/memoryview.h> +#include <zencore/trace.h> +#include <zenhttp/httpclient.h> +#include <zennomad/nomadclient.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <json11.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen::nomad { + +namespace { + + HttpClient::KeyValueMap MakeNomadHeaders(const NomadConfig& Config) + { + HttpClient::KeyValueMap Headers; + if (!Config.AclToken.empty()) + { + Headers->emplace("X-Nomad-Token", Config.AclToken); + } + return Headers; + } + +} // namespace + +NomadClient::NomadClient(const NomadConfig& Config) : m_Config(Config), m_Log(zen::logging::Get("nomad.client")) +{ +} + +NomadClient::~NomadClient() = default; + +bool +NomadClient::Initialize() +{ + ZEN_TRACE_CPU("NomadClient::Initialize"); + + HttpClientSettings Settings; + Settings.LogCategory = "nomad.http"; + Settings.ConnectTimeout = std::chrono::milliseconds{10000}; + Settings.Timeout = std::chrono::milliseconds{60000}; + Settings.RetryCount = 1; + + // Ensure the base URL ends with a slash so path concatenation works correctly + std::string BaseUrl = m_Config.ServerUrl; + if (!BaseUrl.empty() && BaseUrl.back() != '/') + { + BaseUrl += '/'; + } + + m_Http = std::make_unique<zen::HttpClient>(BaseUrl, Settings); + + return true; +} + +std::string +NomadClient::BuildJobJson(const std::string& JobId, const std::string& OrchestratorEndpoint) const +{ + ZEN_TRACE_CPU("NomadClient::BuildJobJson"); + + // Build the task config based on driver and distribution mode + json11::Json::object TaskConfig; + + if (m_Config.TaskDriver == Driver::RawExec) + { + std::string Command; + if (m_Config.BinDistribution == BinaryDistribution::PreDeployed) + { + Command = m_Config.BinaryPath; + } + else + { + // Artifact mode: binary is downloaded to local/zenserver + Command = "local/zenserver"; + } + + TaskConfig["command"] = Command; + + json11::Json::array Args; + Args.push_back("compute"); + Args.push_back("--http=asio"); + if (!OrchestratorEndpoint.empty()) + { + ExtendableStringBuilder<256> CoordArg; + CoordArg << "--coordinator-endpoint=" << OrchestratorEndpoint; + Args.push_back(std::string(CoordArg.ToView())); + } + { + ExtendableStringBuilder<128> IdArg; + IdArg << "--instance-id=nomad-" << JobId; + Args.push_back(std::string(IdArg.ToView())); + } + TaskConfig["args"] = Args; + } + else + { + // Docker driver + TaskConfig["image"] = m_Config.DockerImage; + + json11::Json::array Args; + Args.push_back("compute"); + Args.push_back("--http=asio"); + if (!OrchestratorEndpoint.empty()) + { + ExtendableStringBuilder<256> CoordArg; + CoordArg << "--coordinator-endpoint=" << OrchestratorEndpoint; + Args.push_back(std::string(CoordArg.ToView())); + } + { + ExtendableStringBuilder<128> IdArg; + IdArg << "--instance-id=nomad-" << JobId; + Args.push_back(std::string(IdArg.ToView())); + } + TaskConfig["args"] = Args; + } + + // Build resource stanza + json11::Json::object Resources; + Resources["CPU"] = m_Config.CpuMhz; + Resources["MemoryMB"] = m_Config.MemoryMb; + + // Build the task + json11::Json::object Task; + Task["Name"] = "zenserver"; + Task["Driver"] = (m_Config.TaskDriver == Driver::RawExec) ? "raw_exec" : "docker"; + Task["Config"] = TaskConfig; + Task["Resources"] = Resources; + + // Add artifact stanza if using artifact distribution + if (m_Config.BinDistribution == BinaryDistribution::Artifact && !m_Config.ArtifactSource.empty()) + { + json11::Json::object Artifact; + Artifact["GetterSource"] = m_Config.ArtifactSource; + + json11::Json::array Artifacts; + Artifacts.push_back(Artifact); + Task["Artifacts"] = Artifacts; + } + + json11::Json::array Tasks; + Tasks.push_back(Task); + + // Build the task group + json11::Json::object Group; + Group["Name"] = "zenserver-group"; + Group["Count"] = 1; + Group["Tasks"] = Tasks; + + json11::Json::array Groups; + Groups.push_back(Group); + + // Build datacenters array + json11::Json::array Datacenters; + Datacenters.push_back(m_Config.Datacenter); + + // Build the job + json11::Json::object Job; + Job["ID"] = JobId; + Job["Name"] = JobId; + Job["Type"] = "batch"; + Job["Datacenters"] = Datacenters; + Job["TaskGroups"] = Groups; + + if (!m_Config.Namespace.empty() && m_Config.Namespace != "default") + { + Job["Namespace"] = m_Config.Namespace; + } + + if (!m_Config.Region.empty()) + { + Job["Region"] = m_Config.Region; + } + + // Wrap in the registration envelope + json11::Json::object Root; + Root["Job"] = Job; + + return json11::Json(Root).dump(); +} + +bool +NomadClient::SubmitJob(const std::string& JobJson, NomadJobInfo& OutJob) +{ + ZEN_TRACE_CPU("NomadClient::SubmitJob"); + + const IoBuffer Payload = IoBufferBuilder::MakeFromMemory(MemoryView{JobJson.data(), JobJson.size()}, ZenContentType::kJSON); + + const HttpClient::Response Response = m_Http->Put("v1/jobs", Payload, MakeNomadHeaders(m_Config)); + + if (Response.Error) + { + ZEN_WARN("Nomad job submit failed: {}", Response.Error->ErrorMessage); + return false; + } + + const int StatusCode = static_cast<int>(Response.StatusCode); + + if (!Response.IsSuccess()) + { + ZEN_WARN("Nomad job submit failed with HTTP/{}", StatusCode); + return false; + } + + const std::string Body(Response.AsText()); + std::string Err; + const json11::Json Json = json11::Json::parse(Body, Err); + + if (!Err.empty()) + { + ZEN_WARN("invalid JSON response from Nomad job submit: {}", Err); + return false; + } + + // The response contains EvalID; the job ID is what we submitted + OutJob.Id = Json["JobModifyIndex"].is_number() ? OutJob.Id : ""; + OutJob.Status = "pending"; + + ZEN_INFO("Nomad job submitted: eval_id={}", Json["EvalID"].string_value()); + + return true; +} + +bool +NomadClient::GetJobStatus(const std::string& JobId, NomadJobInfo& OutJob) +{ + ZEN_TRACE_CPU("NomadClient::GetJobStatus"); + + ExtendableStringBuilder<128> Path; + Path << "v1/job/" << JobId; + + const HttpClient::Response Response = m_Http->Get(Path.ToView(), MakeNomadHeaders(m_Config)); + + if (Response.Error) + { + ZEN_WARN("Nomad job status query failed for '{}': {}", JobId, Response.Error->ErrorMessage); + return false; + } + + const int StatusCode = static_cast<int>(Response.StatusCode); + + if (StatusCode == 404) + { + ZEN_INFO("Nomad job '{}' not found", JobId); + OutJob.Status = "dead"; + return true; + } + + if (!Response.IsSuccess()) + { + ZEN_WARN("Nomad job status query failed with HTTP/{}", StatusCode); + return false; + } + + const std::string Body(Response.AsText()); + std::string Err; + const json11::Json Json = json11::Json::parse(Body, Err); + + if (!Err.empty()) + { + ZEN_WARN("invalid JSON in Nomad job status response: {}", Err); + return false; + } + + OutJob.Id = Json["ID"].string_value(); + OutJob.Status = Json["Status"].string_value(); + if (const json11::Json Desc = Json["StatusDescription"]; Desc.is_string()) + { + OutJob.StatusDescription = Desc.string_value(); + } + + return true; +} + +bool +NomadClient::GetAllocations(const std::string& JobId, std::vector<NomadAllocInfo>& OutAllocs) +{ + ZEN_TRACE_CPU("NomadClient::GetAllocations"); + + ExtendableStringBuilder<128> Path; + Path << "v1/job/" << JobId << "/allocations"; + + const HttpClient::Response Response = m_Http->Get(Path.ToView(), MakeNomadHeaders(m_Config)); + + if (Response.Error) + { + ZEN_WARN("Nomad allocation query failed for '{}': {}", JobId, Response.Error->ErrorMessage); + return false; + } + + if (!Response.IsSuccess()) + { + ZEN_WARN("Nomad allocation query failed with HTTP/{}", static_cast<int>(Response.StatusCode)); + return false; + } + + const std::string Body(Response.AsText()); + std::string Err; + const json11::Json Json = json11::Json::parse(Body, Err); + + if (!Err.empty()) + { + ZEN_WARN("invalid JSON in Nomad allocation response: {}", Err); + return false; + } + + OutAllocs.clear(); + if (!Json.is_array()) + { + return true; + } + + for (const json11::Json& AllocVal : Json.array_items()) + { + NomadAllocInfo Alloc; + Alloc.Id = AllocVal["ID"].string_value(); + Alloc.ClientStatus = AllocVal["ClientStatus"].string_value(); + + // Extract task state if available + if (const json11::Json TaskStates = AllocVal["TaskStates"]; TaskStates.is_object()) + { + for (const auto& [TaskName, TaskState] : TaskStates.object_items()) + { + if (TaskState["State"].is_string()) + { + Alloc.TaskState = TaskState["State"].string_value(); + } + } + } + + OutAllocs.push_back(std::move(Alloc)); + } + + return true; +} + +bool +NomadClient::StopJob(const std::string& JobId) +{ + ZEN_TRACE_CPU("NomadClient::StopJob"); + + ExtendableStringBuilder<128> Path; + Path << "v1/job/" << JobId; + + const HttpClient::Response Response = m_Http->Delete(Path.ToView(), MakeNomadHeaders(m_Config)); + + if (Response.Error) + { + ZEN_WARN("Nomad job stop failed for '{}': {}", JobId, Response.Error->ErrorMessage); + return false; + } + + if (!Response.IsSuccess()) + { + ZEN_WARN("Nomad job stop failed with HTTP/{}", static_cast<int>(Response.StatusCode)); + return false; + } + + ZEN_INFO("Nomad job '{}' stopped", JobId); + return true; +} + +} // namespace zen::nomad |