aboutsummaryrefslogtreecommitdiff
path: root/src/zennomad/nomadclient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zennomad/nomadclient.cpp')
-rw-r--r--src/zennomad/nomadclient.cpp366
1 files changed, 366 insertions, 0 deletions
diff --git a/src/zennomad/nomadclient.cpp b/src/zennomad/nomadclient.cpp
new file mode 100644
index 000000000..9edcde125
--- /dev/null
+++ b/src/zennomad/nomadclient.cpp
@@ -0,0 +1,366 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
+#include <zencore/logging.h>
+#include <zencore/memoryview.h>
+#include <zencore/trace.h>
+#include <zenhttp/httpclient.h>
+#include <zennomad/nomadclient.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <json11.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen::nomad {
+
+namespace {
+
+ HttpClient::KeyValueMap MakeNomadHeaders(const NomadConfig& Config)
+ {
+ HttpClient::KeyValueMap Headers;
+ if (!Config.AclToken.empty())
+ {
+ Headers->emplace("X-Nomad-Token", Config.AclToken);
+ }
+ return Headers;
+ }
+
+} // namespace
+
+NomadClient::NomadClient(const NomadConfig& Config) : m_Config(Config), m_Log(zen::logging::Get("nomad.client"))
+{
+}
+
+NomadClient::~NomadClient() = default;
+
+bool
+NomadClient::Initialize()
+{
+ ZEN_TRACE_CPU("NomadClient::Initialize");
+
+ HttpClientSettings Settings;
+ Settings.LogCategory = "nomad.http";
+ Settings.ConnectTimeout = std::chrono::milliseconds{10000};
+ Settings.Timeout = std::chrono::milliseconds{60000};
+ Settings.RetryCount = 1;
+
+ // Ensure the base URL ends with a slash so path concatenation works correctly
+ std::string BaseUrl = m_Config.ServerUrl;
+ if (!BaseUrl.empty() && BaseUrl.back() != '/')
+ {
+ BaseUrl += '/';
+ }
+
+ m_Http = std::make_unique<zen::HttpClient>(BaseUrl, Settings);
+
+ return true;
+}
+
+std::string
+NomadClient::BuildJobJson(const std::string& JobId, const std::string& OrchestratorEndpoint) const
+{
+ ZEN_TRACE_CPU("NomadClient::BuildJobJson");
+
+ // Build the task config based on driver and distribution mode
+ json11::Json::object TaskConfig;
+
+ if (m_Config.TaskDriver == Driver::RawExec)
+ {
+ std::string Command;
+ if (m_Config.BinDistribution == BinaryDistribution::PreDeployed)
+ {
+ Command = m_Config.BinaryPath;
+ }
+ else
+ {
+ // Artifact mode: binary is downloaded to local/zenserver
+ Command = "local/zenserver";
+ }
+
+ TaskConfig["command"] = Command;
+
+ json11::Json::array Args;
+ Args.push_back("compute");
+ Args.push_back("--http=asio");
+ if (!OrchestratorEndpoint.empty())
+ {
+ ExtendableStringBuilder<256> CoordArg;
+ CoordArg << "--coordinator-endpoint=" << OrchestratorEndpoint;
+ Args.push_back(std::string(CoordArg.ToView()));
+ }
+ {
+ ExtendableStringBuilder<128> IdArg;
+ IdArg << "--instance-id=nomad-" << JobId;
+ Args.push_back(std::string(IdArg.ToView()));
+ }
+ TaskConfig["args"] = Args;
+ }
+ else
+ {
+ // Docker driver
+ TaskConfig["image"] = m_Config.DockerImage;
+
+ json11::Json::array Args;
+ Args.push_back("compute");
+ Args.push_back("--http=asio");
+ if (!OrchestratorEndpoint.empty())
+ {
+ ExtendableStringBuilder<256> CoordArg;
+ CoordArg << "--coordinator-endpoint=" << OrchestratorEndpoint;
+ Args.push_back(std::string(CoordArg.ToView()));
+ }
+ {
+ ExtendableStringBuilder<128> IdArg;
+ IdArg << "--instance-id=nomad-" << JobId;
+ Args.push_back(std::string(IdArg.ToView()));
+ }
+ TaskConfig["args"] = Args;
+ }
+
+ // Build resource stanza
+ json11::Json::object Resources;
+ Resources["CPU"] = m_Config.CpuMhz;
+ Resources["MemoryMB"] = m_Config.MemoryMb;
+
+ // Build the task
+ json11::Json::object Task;
+ Task["Name"] = "zenserver";
+ Task["Driver"] = (m_Config.TaskDriver == Driver::RawExec) ? "raw_exec" : "docker";
+ Task["Config"] = TaskConfig;
+ Task["Resources"] = Resources;
+
+ // Add artifact stanza if using artifact distribution
+ if (m_Config.BinDistribution == BinaryDistribution::Artifact && !m_Config.ArtifactSource.empty())
+ {
+ json11::Json::object Artifact;
+ Artifact["GetterSource"] = m_Config.ArtifactSource;
+
+ json11::Json::array Artifacts;
+ Artifacts.push_back(Artifact);
+ Task["Artifacts"] = Artifacts;
+ }
+
+ json11::Json::array Tasks;
+ Tasks.push_back(Task);
+
+ // Build the task group
+ json11::Json::object Group;
+ Group["Name"] = "zenserver-group";
+ Group["Count"] = 1;
+ Group["Tasks"] = Tasks;
+
+ json11::Json::array Groups;
+ Groups.push_back(Group);
+
+ // Build datacenters array
+ json11::Json::array Datacenters;
+ Datacenters.push_back(m_Config.Datacenter);
+
+ // Build the job
+ json11::Json::object Job;
+ Job["ID"] = JobId;
+ Job["Name"] = JobId;
+ Job["Type"] = "batch";
+ Job["Datacenters"] = Datacenters;
+ Job["TaskGroups"] = Groups;
+
+ if (!m_Config.Namespace.empty() && m_Config.Namespace != "default")
+ {
+ Job["Namespace"] = m_Config.Namespace;
+ }
+
+ if (!m_Config.Region.empty())
+ {
+ Job["Region"] = m_Config.Region;
+ }
+
+ // Wrap in the registration envelope
+ json11::Json::object Root;
+ Root["Job"] = Job;
+
+ return json11::Json(Root).dump();
+}
+
+bool
+NomadClient::SubmitJob(const std::string& JobJson, NomadJobInfo& OutJob)
+{
+ ZEN_TRACE_CPU("NomadClient::SubmitJob");
+
+ const IoBuffer Payload = IoBufferBuilder::MakeFromMemory(MemoryView{JobJson.data(), JobJson.size()}, ZenContentType::kJSON);
+
+ const HttpClient::Response Response = m_Http->Put("v1/jobs", Payload, MakeNomadHeaders(m_Config));
+
+ if (Response.Error)
+ {
+ ZEN_WARN("Nomad job submit failed: {}", Response.Error->ErrorMessage);
+ return false;
+ }
+
+ const int StatusCode = static_cast<int>(Response.StatusCode);
+
+ if (!Response.IsSuccess())
+ {
+ ZEN_WARN("Nomad job submit failed with HTTP/{}", StatusCode);
+ return false;
+ }
+
+ const std::string Body(Response.AsText());
+ std::string Err;
+ const json11::Json Json = json11::Json::parse(Body, Err);
+
+ if (!Err.empty())
+ {
+ ZEN_WARN("invalid JSON response from Nomad job submit: {}", Err);
+ return false;
+ }
+
+ // The response contains EvalID; the job ID is what we submitted
+ OutJob.Id = Json["JobModifyIndex"].is_number() ? OutJob.Id : "";
+ OutJob.Status = "pending";
+
+ ZEN_INFO("Nomad job submitted: eval_id={}", Json["EvalID"].string_value());
+
+ return true;
+}
+
+bool
+NomadClient::GetJobStatus(const std::string& JobId, NomadJobInfo& OutJob)
+{
+ ZEN_TRACE_CPU("NomadClient::GetJobStatus");
+
+ ExtendableStringBuilder<128> Path;
+ Path << "v1/job/" << JobId;
+
+ const HttpClient::Response Response = m_Http->Get(Path.ToView(), MakeNomadHeaders(m_Config));
+
+ if (Response.Error)
+ {
+ ZEN_WARN("Nomad job status query failed for '{}': {}", JobId, Response.Error->ErrorMessage);
+ return false;
+ }
+
+ const int StatusCode = static_cast<int>(Response.StatusCode);
+
+ if (StatusCode == 404)
+ {
+ ZEN_INFO("Nomad job '{}' not found", JobId);
+ OutJob.Status = "dead";
+ return true;
+ }
+
+ if (!Response.IsSuccess())
+ {
+ ZEN_WARN("Nomad job status query failed with HTTP/{}", StatusCode);
+ return false;
+ }
+
+ const std::string Body(Response.AsText());
+ std::string Err;
+ const json11::Json Json = json11::Json::parse(Body, Err);
+
+ if (!Err.empty())
+ {
+ ZEN_WARN("invalid JSON in Nomad job status response: {}", Err);
+ return false;
+ }
+
+ OutJob.Id = Json["ID"].string_value();
+ OutJob.Status = Json["Status"].string_value();
+ if (const json11::Json Desc = Json["StatusDescription"]; Desc.is_string())
+ {
+ OutJob.StatusDescription = Desc.string_value();
+ }
+
+ return true;
+}
+
+bool
+NomadClient::GetAllocations(const std::string& JobId, std::vector<NomadAllocInfo>& OutAllocs)
+{
+ ZEN_TRACE_CPU("NomadClient::GetAllocations");
+
+ ExtendableStringBuilder<128> Path;
+ Path << "v1/job/" << JobId << "/allocations";
+
+ const HttpClient::Response Response = m_Http->Get(Path.ToView(), MakeNomadHeaders(m_Config));
+
+ if (Response.Error)
+ {
+ ZEN_WARN("Nomad allocation query failed for '{}': {}", JobId, Response.Error->ErrorMessage);
+ return false;
+ }
+
+ if (!Response.IsSuccess())
+ {
+ ZEN_WARN("Nomad allocation query failed with HTTP/{}", static_cast<int>(Response.StatusCode));
+ return false;
+ }
+
+ const std::string Body(Response.AsText());
+ std::string Err;
+ const json11::Json Json = json11::Json::parse(Body, Err);
+
+ if (!Err.empty())
+ {
+ ZEN_WARN("invalid JSON in Nomad allocation response: {}", Err);
+ return false;
+ }
+
+ OutAllocs.clear();
+ if (!Json.is_array())
+ {
+ return true;
+ }
+
+ for (const json11::Json& AllocVal : Json.array_items())
+ {
+ NomadAllocInfo Alloc;
+ Alloc.Id = AllocVal["ID"].string_value();
+ Alloc.ClientStatus = AllocVal["ClientStatus"].string_value();
+
+ // Extract task state if available
+ if (const json11::Json TaskStates = AllocVal["TaskStates"]; TaskStates.is_object())
+ {
+ for (const auto& [TaskName, TaskState] : TaskStates.object_items())
+ {
+ if (TaskState["State"].is_string())
+ {
+ Alloc.TaskState = TaskState["State"].string_value();
+ }
+ }
+ }
+
+ OutAllocs.push_back(std::move(Alloc));
+ }
+
+ return true;
+}
+
+bool
+NomadClient::StopJob(const std::string& JobId)
+{
+ ZEN_TRACE_CPU("NomadClient::StopJob");
+
+ ExtendableStringBuilder<128> Path;
+ Path << "v1/job/" << JobId;
+
+ const HttpClient::Response Response = m_Http->Delete(Path.ToView(), MakeNomadHeaders(m_Config));
+
+ if (Response.Error)
+ {
+ ZEN_WARN("Nomad job stop failed for '{}': {}", JobId, Response.Error->ErrorMessage);
+ return false;
+ }
+
+ if (!Response.IsSuccess())
+ {
+ ZEN_WARN("Nomad job stop failed with HTTP/{}", static_cast<int>(Response.StatusCode));
+ return false;
+ }
+
+ ZEN_INFO("Nomad job '{}' stopped", JobId);
+ return true;
+}
+
+} // namespace zen::nomad