aboutsummaryrefslogtreecommitdiff
path: root/src/zennomad/nomadprocess.cpp
diff options
context:
space:
mode:
authorLiam Mitchell <[email protected]>2026-03-09 19:06:36 -0700
committerLiam Mitchell <[email protected]>2026-03-09 19:06:36 -0700
commitd1abc50ee9d4fb72efc646e17decafea741caa34 (patch)
treee4288e00f2f7ca0391b83d986efcb69d3ba66a83 /src/zennomad/nomadprocess.cpp
parentAllow requests with invalid content-types unless specified in command line or... (diff)
parentupdated chunk–block analyser (#818) (diff)
downloadzen-d1abc50ee9d4fb72efc646e17decafea741caa34.tar.xz
zen-d1abc50ee9d4fb72efc646e17decafea741caa34.zip
Merge branch 'main' into lm/restrict-content-type
Diffstat (limited to 'src/zennomad/nomadprocess.cpp')
-rw-r--r--src/zennomad/nomadprocess.cpp354
1 files changed, 354 insertions, 0 deletions
diff --git a/src/zennomad/nomadprocess.cpp b/src/zennomad/nomadprocess.cpp
new file mode 100644
index 000000000..1ae968fb7
--- /dev/null
+++ b/src/zennomad/nomadprocess.cpp
@@ -0,0 +1,354 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zennomad/nomadclient.h>
+#include <zennomad/nomadprocess.h>
+
+#include <zenbase/zenbase.h>
+#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
+#include <zencore/logging.h>
+#include <zencore/memoryview.h>
+#include <zencore/process.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <json11.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <fmt/format.h>
+
+namespace zen::nomad {
+
+//////////////////////////////////////////////////////////////////////////
+
+struct NomadProcess::Impl
+{
+ Impl(std::string_view BaseUri) : m_HttpClient(BaseUri) {}
+ ~Impl() = default;
+
+ void SpawnNomadAgent()
+ {
+ ZEN_TRACE_CPU("SpawnNomadAgent");
+
+ if (m_ProcessHandle.IsValid())
+ {
+ return;
+ }
+
+ CreateProcOptions Options;
+ Options.Flags |= CreateProcOptions::Flag_Windows_NewProcessGroup;
+
+ CreateProcResult Result = CreateProc("nomad" ZEN_EXE_SUFFIX_LITERAL, "nomad" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options);
+
+ if (Result)
+ {
+ m_ProcessHandle.Initialize(Result);
+
+ Stopwatch Timer;
+
+ // Poll to check when the agent is ready
+
+ do
+ {
+ Sleep(100);
+ HttpClient::Response Resp = m_HttpClient.Get("v1/status/leader");
+ if (Resp)
+ {
+ ZEN_INFO("Nomad agent started successfully (waited {})", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ return;
+ }
+ } while (Timer.GetElapsedTimeMs() < 30000);
+ }
+
+ // Report failure!
+
+ ZEN_WARN("Nomad agent failed to start within timeout period");
+ }
+
+ void StopNomadAgent()
+ {
+ if (!m_ProcessHandle.IsValid())
+ {
+ return;
+ }
+
+ // This waits for the process to exit and also resets the handle
+ m_ProcessHandle.Kill();
+ }
+
+private:
+ ProcessHandle m_ProcessHandle;
+ HttpClient m_HttpClient;
+};
+
+NomadProcess::NomadProcess() : m_Impl(std::make_unique<Impl>("http://localhost:4646/"))
+{
+}
+
+NomadProcess::~NomadProcess()
+{
+}
+
+void
+NomadProcess::SpawnNomadAgent()
+{
+ m_Impl->SpawnNomadAgent();
+}
+
+void
+NomadProcess::StopNomadAgent()
+{
+ m_Impl->StopNomadAgent();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+NomadTestClient::NomadTestClient(std::string_view BaseUri) : m_HttpClient(BaseUri)
+{
+}
+
+NomadTestClient::~NomadTestClient()
+{
+}
+
+NomadJobInfo
+NomadTestClient::SubmitJob(std::string_view JobId, std::string_view Command, const std::vector<std::string>& Args)
+{
+ ZEN_TRACE_CPU("SubmitNomadJob");
+
+ NomadJobInfo Result;
+
+ // Build the job JSON for a raw_exec batch job
+ json11::Json::object TaskConfig;
+ TaskConfig["command"] = std::string(Command);
+
+ json11::Json::array JsonArgs;
+ for (const auto& Arg : Args)
+ {
+ JsonArgs.push_back(Arg);
+ }
+ TaskConfig["args"] = JsonArgs;
+
+ json11::Json::object Resources;
+ Resources["CPU"] = 100;
+ Resources["MemoryMB"] = 64;
+
+ json11::Json::object Task;
+ Task["Name"] = "test-task";
+ Task["Driver"] = "raw_exec";
+ Task["Config"] = TaskConfig;
+ Task["Resources"] = Resources;
+
+ json11::Json::array Tasks;
+ Tasks.push_back(Task);
+
+ json11::Json::object Group;
+ Group["Name"] = "test-group";
+ Group["Count"] = 1;
+ Group["Tasks"] = Tasks;
+
+ json11::Json::array Groups;
+ Groups.push_back(Group);
+
+ json11::Json::array Datacenters;
+ Datacenters.push_back("dc1");
+
+ json11::Json::object Job;
+ Job["ID"] = std::string(JobId);
+ Job["Name"] = std::string(JobId);
+ Job["Type"] = "batch";
+ Job["Datacenters"] = Datacenters;
+ Job["TaskGroups"] = Groups;
+
+ json11::Json::object Root;
+ Root["Job"] = Job;
+
+ std::string Body = json11::Json(Root).dump();
+
+ IoBuffer Payload = IoBufferBuilder::MakeFromMemory(MemoryView{Body.data(), Body.size()}, ZenContentType::kJSON);
+
+ HttpClient::Response Response =
+ m_HttpClient.Put("v1/jobs", Payload, {{"Content-Type", "application/json"}, {"Accept", "application/json"}});
+
+ if (!Response || !Response.IsSuccess())
+ {
+ ZEN_WARN("NomadTestClient: SubmitJob failed for '{}'", JobId);
+ return Result;
+ }
+
+ std::string ResponseBody(Response.AsText());
+ std::string Err;
+ const json11::Json Json = json11::Json::parse(ResponseBody, Err);
+
+ if (!Err.empty())
+ {
+ ZEN_WARN("NomadTestClient: invalid JSON in SubmitJob response: {}", Err);
+ return Result;
+ }
+
+ Result.Id = std::string(JobId);
+ Result.Status = "pending";
+
+ ZEN_INFO("NomadTestClient: job '{}' submitted (eval_id={})", JobId, Json["EvalID"].string_value());
+
+ return Result;
+}
+
+NomadJobInfo
+NomadTestClient::GetJobStatus(std::string_view JobId)
+{
+ ZEN_TRACE_CPU("GetNomadJobStatus");
+
+ NomadJobInfo Result;
+
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("v1/job/{}", JobId));
+
+ if (Response.Error)
+ {
+ ZEN_WARN("NomadTestClient: GetJobStatus failed for '{}': {}", JobId, Response.Error->ErrorMessage);
+ return Result;
+ }
+
+ if (static_cast<int>(Response.StatusCode) == 404)
+ {
+ Result.Status = "dead";
+ return Result;
+ }
+
+ if (!Response.IsSuccess())
+ {
+ ZEN_WARN("NomadTestClient: GetJobStatus failed with HTTP/{}", static_cast<int>(Response.StatusCode));
+ return Result;
+ }
+
+ std::string Body(Response.AsText());
+ std::string Err;
+ const json11::Json Json = json11::Json::parse(Body, Err);
+
+ if (!Err.empty())
+ {
+ ZEN_WARN("NomadTestClient: invalid JSON in GetJobStatus response: {}", Err);
+ return Result;
+ }
+
+ Result.Id = Json["ID"].string_value();
+ Result.Status = Json["Status"].string_value();
+ if (const json11::Json Desc = Json["StatusDescription"]; Desc.is_string())
+ {
+ Result.StatusDescription = Desc.string_value();
+ }
+
+ return Result;
+}
+
+void
+NomadTestClient::StopJob(std::string_view JobId)
+{
+ ZEN_TRACE_CPU("StopNomadJob");
+
+ HttpClient::Response Response = m_HttpClient.Delete(fmt::format("v1/job/{}", JobId));
+
+ if (!Response || !Response.IsSuccess())
+ {
+ ZEN_WARN("NomadTestClient: StopJob failed for '{}'", JobId);
+ return;
+ }
+
+ ZEN_INFO("NomadTestClient: job '{}' stopped", JobId);
+}
+
+std::vector<NomadAllocInfo>
+NomadTestClient::GetAllocations(std::string_view JobId)
+{
+ ZEN_TRACE_CPU("GetNomadAllocations");
+
+ std::vector<NomadAllocInfo> Allocs;
+
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("v1/job/{}/allocations", JobId));
+
+ if (!Response || !Response.IsSuccess())
+ {
+ ZEN_WARN("NomadTestClient: GetAllocations failed for '{}'", JobId);
+ return Allocs;
+ }
+
+ std::string Body(Response.AsText());
+ std::string Err;
+ const json11::Json Json = json11::Json::parse(Body, Err);
+
+ if (!Err.empty() || !Json.is_array())
+ {
+ return Allocs;
+ }
+
+ for (const json11::Json& AllocVal : Json.array_items())
+ {
+ NomadAllocInfo Alloc;
+ Alloc.Id = AllocVal["ID"].string_value();
+ Alloc.ClientStatus = AllocVal["ClientStatus"].string_value();
+
+ if (const json11::Json TaskStates = AllocVal["TaskStates"]; TaskStates.is_object())
+ {
+ for (const auto& [TaskName, TaskState] : TaskStates.object_items())
+ {
+ if (TaskState["State"].is_string())
+ {
+ Alloc.TaskState = TaskState["State"].string_value();
+ }
+ }
+ }
+
+ Allocs.push_back(std::move(Alloc));
+ }
+
+ return Allocs;
+}
+
+std::vector<NomadJobInfo>
+NomadTestClient::ListJobs(std::string_view Prefix)
+{
+ ZEN_TRACE_CPU("ListNomadJobs");
+
+ std::vector<NomadJobInfo> Jobs;
+
+ std::string Url = "v1/jobs";
+ if (!Prefix.empty())
+ {
+ Url = fmt::format("v1/jobs?prefix={}", Prefix);
+ }
+
+ HttpClient::Response Response = m_HttpClient.Get(Url);
+
+ if (!Response || !Response.IsSuccess())
+ {
+ ZEN_WARN("NomadTestClient: ListJobs failed");
+ return Jobs;
+ }
+
+ std::string Body(Response.AsText());
+ std::string Err;
+ const json11::Json Json = json11::Json::parse(Body, Err);
+
+ if (!Err.empty() || !Json.is_array())
+ {
+ return Jobs;
+ }
+
+ for (const json11::Json& JobVal : Json.array_items())
+ {
+ NomadJobInfo Job;
+ Job.Id = JobVal["ID"].string_value();
+ Job.Status = JobVal["Status"].string_value();
+ if (const json11::Json Desc = JobVal["StatusDescription"]; Desc.is_string())
+ {
+ Job.StatusDescription = Desc.string_value();
+ }
+ Jobs.push_back(std::move(Job));
+ }
+
+ return Jobs;
+}
+
+} // namespace zen::nomad