diff options
Diffstat (limited to 'src/zennomad/nomadprocess.cpp')
| -rw-r--r-- | src/zennomad/nomadprocess.cpp | 354 |
1 files changed, 354 insertions, 0 deletions
diff --git a/src/zennomad/nomadprocess.cpp b/src/zennomad/nomadprocess.cpp new file mode 100644 index 000000000..1ae968fb7 --- /dev/null +++ b/src/zennomad/nomadprocess.cpp @@ -0,0 +1,354 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zennomad/nomadclient.h> +#include <zennomad/nomadprocess.h> + +#include <zenbase/zenbase.h> +#include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> +#include <zencore/logging.h> +#include <zencore/memoryview.h> +#include <zencore/process.h> +#include <zencore/timer.h> +#include <zencore/trace.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <json11.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <fmt/format.h> + +namespace zen::nomad { + +////////////////////////////////////////////////////////////////////////// + +struct NomadProcess::Impl +{ + Impl(std::string_view BaseUri) : m_HttpClient(BaseUri) {} + ~Impl() = default; + + void SpawnNomadAgent() + { + ZEN_TRACE_CPU("SpawnNomadAgent"); + + if (m_ProcessHandle.IsValid()) + { + return; + } + + CreateProcOptions Options; + Options.Flags |= CreateProcOptions::Flag_Windows_NewProcessGroup; + + CreateProcResult Result = CreateProc("nomad" ZEN_EXE_SUFFIX_LITERAL, "nomad" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options); + + if (Result) + { + m_ProcessHandle.Initialize(Result); + + Stopwatch Timer; + + // Poll to check when the agent is ready + + do + { + Sleep(100); + HttpClient::Response Resp = m_HttpClient.Get("v1/status/leader"); + if (Resp) + { + ZEN_INFO("Nomad agent started successfully (waited {})", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + + return; + } + } while (Timer.GetElapsedTimeMs() < 30000); + } + + // Report failure! + + ZEN_WARN("Nomad agent failed to start within timeout period"); + } + + void StopNomadAgent() + { + if (!m_ProcessHandle.IsValid()) + { + return; + } + + // This waits for the process to exit and also resets the handle + m_ProcessHandle.Kill(); + } + +private: + ProcessHandle m_ProcessHandle; + HttpClient m_HttpClient; +}; + +NomadProcess::NomadProcess() : m_Impl(std::make_unique<Impl>("http://localhost:4646/")) +{ +} + +NomadProcess::~NomadProcess() +{ +} + +void +NomadProcess::SpawnNomadAgent() +{ + m_Impl->SpawnNomadAgent(); +} + +void +NomadProcess::StopNomadAgent() +{ + m_Impl->StopNomadAgent(); +} + +////////////////////////////////////////////////////////////////////////// + +NomadTestClient::NomadTestClient(std::string_view BaseUri) : m_HttpClient(BaseUri) +{ +} + +NomadTestClient::~NomadTestClient() +{ +} + +NomadJobInfo +NomadTestClient::SubmitJob(std::string_view JobId, std::string_view Command, const std::vector<std::string>& Args) +{ + ZEN_TRACE_CPU("SubmitNomadJob"); + + NomadJobInfo Result; + + // Build the job JSON for a raw_exec batch job + json11::Json::object TaskConfig; + TaskConfig["command"] = std::string(Command); + + json11::Json::array JsonArgs; + for (const auto& Arg : Args) + { + JsonArgs.push_back(Arg); + } + TaskConfig["args"] = JsonArgs; + + json11::Json::object Resources; + Resources["CPU"] = 100; + Resources["MemoryMB"] = 64; + + json11::Json::object Task; + Task["Name"] = "test-task"; + Task["Driver"] = "raw_exec"; + Task["Config"] = TaskConfig; + Task["Resources"] = Resources; + + json11::Json::array Tasks; + Tasks.push_back(Task); + + json11::Json::object Group; + Group["Name"] = "test-group"; + Group["Count"] = 1; + Group["Tasks"] = Tasks; + + json11::Json::array Groups; + Groups.push_back(Group); + + json11::Json::array Datacenters; + Datacenters.push_back("dc1"); + + json11::Json::object Job; + Job["ID"] = std::string(JobId); + Job["Name"] = std::string(JobId); + Job["Type"] = "batch"; + Job["Datacenters"] = Datacenters; + Job["TaskGroups"] = Groups; + + json11::Json::object Root; + Root["Job"] = Job; + + std::string Body = json11::Json(Root).dump(); + + IoBuffer Payload = IoBufferBuilder::MakeFromMemory(MemoryView{Body.data(), Body.size()}, ZenContentType::kJSON); + + HttpClient::Response Response = + m_HttpClient.Put("v1/jobs", Payload, {{"Content-Type", "application/json"}, {"Accept", "application/json"}}); + + if (!Response || !Response.IsSuccess()) + { + ZEN_WARN("NomadTestClient: SubmitJob failed for '{}'", JobId); + return Result; + } + + std::string ResponseBody(Response.AsText()); + std::string Err; + const json11::Json Json = json11::Json::parse(ResponseBody, Err); + + if (!Err.empty()) + { + ZEN_WARN("NomadTestClient: invalid JSON in SubmitJob response: {}", Err); + return Result; + } + + Result.Id = std::string(JobId); + Result.Status = "pending"; + + ZEN_INFO("NomadTestClient: job '{}' submitted (eval_id={})", JobId, Json["EvalID"].string_value()); + + return Result; +} + +NomadJobInfo +NomadTestClient::GetJobStatus(std::string_view JobId) +{ + ZEN_TRACE_CPU("GetNomadJobStatus"); + + NomadJobInfo Result; + + HttpClient::Response Response = m_HttpClient.Get(fmt::format("v1/job/{}", JobId)); + + if (Response.Error) + { + ZEN_WARN("NomadTestClient: GetJobStatus failed for '{}': {}", JobId, Response.Error->ErrorMessage); + return Result; + } + + if (static_cast<int>(Response.StatusCode) == 404) + { + Result.Status = "dead"; + return Result; + } + + if (!Response.IsSuccess()) + { + ZEN_WARN("NomadTestClient: GetJobStatus failed with HTTP/{}", static_cast<int>(Response.StatusCode)); + return Result; + } + + std::string Body(Response.AsText()); + std::string Err; + const json11::Json Json = json11::Json::parse(Body, Err); + + if (!Err.empty()) + { + ZEN_WARN("NomadTestClient: invalid JSON in GetJobStatus response: {}", Err); + return Result; + } + + Result.Id = Json["ID"].string_value(); + Result.Status = Json["Status"].string_value(); + if (const json11::Json Desc = Json["StatusDescription"]; Desc.is_string()) + { + Result.StatusDescription = Desc.string_value(); + } + + return Result; +} + +void +NomadTestClient::StopJob(std::string_view JobId) +{ + ZEN_TRACE_CPU("StopNomadJob"); + + HttpClient::Response Response = m_HttpClient.Delete(fmt::format("v1/job/{}", JobId)); + + if (!Response || !Response.IsSuccess()) + { + ZEN_WARN("NomadTestClient: StopJob failed for '{}'", JobId); + return; + } + + ZEN_INFO("NomadTestClient: job '{}' stopped", JobId); +} + +std::vector<NomadAllocInfo> +NomadTestClient::GetAllocations(std::string_view JobId) +{ + ZEN_TRACE_CPU("GetNomadAllocations"); + + std::vector<NomadAllocInfo> Allocs; + + HttpClient::Response Response = m_HttpClient.Get(fmt::format("v1/job/{}/allocations", JobId)); + + if (!Response || !Response.IsSuccess()) + { + ZEN_WARN("NomadTestClient: GetAllocations failed for '{}'", JobId); + return Allocs; + } + + std::string Body(Response.AsText()); + std::string Err; + const json11::Json Json = json11::Json::parse(Body, Err); + + if (!Err.empty() || !Json.is_array()) + { + return Allocs; + } + + for (const json11::Json& AllocVal : Json.array_items()) + { + NomadAllocInfo Alloc; + Alloc.Id = AllocVal["ID"].string_value(); + Alloc.ClientStatus = AllocVal["ClientStatus"].string_value(); + + if (const json11::Json TaskStates = AllocVal["TaskStates"]; TaskStates.is_object()) + { + for (const auto& [TaskName, TaskState] : TaskStates.object_items()) + { + if (TaskState["State"].is_string()) + { + Alloc.TaskState = TaskState["State"].string_value(); + } + } + } + + Allocs.push_back(std::move(Alloc)); + } + + return Allocs; +} + +std::vector<NomadJobInfo> +NomadTestClient::ListJobs(std::string_view Prefix) +{ + ZEN_TRACE_CPU("ListNomadJobs"); + + std::vector<NomadJobInfo> Jobs; + + std::string Url = "v1/jobs"; + if (!Prefix.empty()) + { + Url = fmt::format("v1/jobs?prefix={}", Prefix); + } + + HttpClient::Response Response = m_HttpClient.Get(Url); + + if (!Response || !Response.IsSuccess()) + { + ZEN_WARN("NomadTestClient: ListJobs failed"); + return Jobs; + } + + std::string Body(Response.AsText()); + std::string Err; + const json11::Json Json = json11::Json::parse(Body, Err); + + if (!Err.empty() || !Json.is_array()) + { + return Jobs; + } + + for (const json11::Json& JobVal : Json.array_items()) + { + NomadJobInfo Job; + Job.Id = JobVal["ID"].string_value(); + Job.Status = JobVal["Status"].string_value(); + if (const json11::Json Desc = JobVal["StatusDescription"]; Desc.is_string()) + { + Job.StatusDescription = Desc.string_value(); + } + Jobs.push_back(std::move(Job)); + } + + return Jobs; +} + +} // namespace zen::nomad |