// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #include namespace zen::nomad { ////////////////////////////////////////////////////////////////////////// struct NomadProcess::Impl { Impl(std::string_view BaseUri) : m_HttpClient(BaseUri) {} ~Impl() = default; void SpawnNomadAgent() { ZEN_TRACE_CPU("SpawnNomadAgent"); if (m_ProcessHandle.IsValid()) { return; } CreateProcOptions Options; Options.Flags |= CreateProcOptions::Flag_Windows_NewProcessGroup; CreateProcResult Result = CreateProc("nomad" ZEN_EXE_SUFFIX_LITERAL, "nomad" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options); if (Result) { m_ProcessHandle.Initialize(Result); Stopwatch Timer; // Poll to check when the agent is ready do { Sleep(100); HttpClient::Response Resp = m_HttpClient.Get("v1/status/leader"); if (Resp) { ZEN_INFO("Nomad agent started successfully (waited {})", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); return; } } while (Timer.GetElapsedTimeMs() < 30000); } // Report failure! ZEN_WARN("Nomad agent failed to start within timeout period"); } void StopNomadAgent() { if (!m_ProcessHandle.IsValid()) { return; } // This waits for the process to exit and also resets the handle m_ProcessHandle.Kill(); } private: ProcessHandle m_ProcessHandle; HttpClient m_HttpClient; }; NomadProcess::NomadProcess() : m_Impl(std::make_unique("http://localhost:4646/")) { } NomadProcess::~NomadProcess() { } void NomadProcess::SpawnNomadAgent() { m_Impl->SpawnNomadAgent(); } void NomadProcess::StopNomadAgent() { m_Impl->StopNomadAgent(); } ////////////////////////////////////////////////////////////////////////// NomadTestClient::NomadTestClient(std::string_view BaseUri) : m_HttpClient(BaseUri) { } NomadTestClient::~NomadTestClient() { } NomadJobInfo NomadTestClient::SubmitJob(std::string_view JobId, std::string_view Command, const std::vector& Args) { ZEN_TRACE_CPU("SubmitNomadJob"); NomadJobInfo Result; // Build the job JSON for a raw_exec batch job json11::Json::object TaskConfig; TaskConfig["command"] = std::string(Command); json11::Json::array JsonArgs; for (const auto& Arg : Args) { JsonArgs.push_back(Arg); } TaskConfig["args"] = JsonArgs; json11::Json::object Resources; Resources["CPU"] = 100; Resources["MemoryMB"] = 64; json11::Json::object Task; Task["Name"] = "test-task"; Task["Driver"] = "raw_exec"; Task["Config"] = TaskConfig; Task["Resources"] = Resources; json11::Json::array Tasks; Tasks.push_back(Task); json11::Json::object Group; Group["Name"] = "test-group"; Group["Count"] = 1; Group["Tasks"] = Tasks; json11::Json::array Groups; Groups.push_back(Group); json11::Json::array Datacenters; Datacenters.push_back("dc1"); json11::Json::object Job; Job["ID"] = std::string(JobId); Job["Name"] = std::string(JobId); Job["Type"] = "batch"; Job["Datacenters"] = Datacenters; Job["TaskGroups"] = Groups; json11::Json::object Root; Root["Job"] = Job; std::string Body = json11::Json(Root).dump(); IoBuffer Payload = IoBufferBuilder::MakeFromMemory(MemoryView{Body.data(), Body.size()}, ZenContentType::kJSON); HttpClient::Response Response = m_HttpClient.Put("v1/jobs", Payload, {{"Content-Type", "application/json"}, {"Accept", "application/json"}}); if (!Response || !Response.IsSuccess()) { ZEN_WARN("NomadTestClient: SubmitJob failed for '{}'", JobId); return Result; } std::string ResponseBody(Response.AsText()); std::string Err; const json11::Json Json = json11::Json::parse(ResponseBody, Err); if (!Err.empty()) { ZEN_WARN("NomadTestClient: invalid JSON in SubmitJob response: {}", Err); return Result; } Result.Id = std::string(JobId); Result.Status = "pending"; ZEN_INFO("NomadTestClient: job '{}' submitted (eval_id={})", JobId, Json["EvalID"].string_value()); return Result; } NomadJobInfo NomadTestClient::GetJobStatus(std::string_view JobId) { ZEN_TRACE_CPU("GetNomadJobStatus"); NomadJobInfo Result; HttpClient::Response Response = m_HttpClient.Get(fmt::format("v1/job/{}", JobId)); if (Response.Error) { ZEN_WARN("NomadTestClient: GetJobStatus failed for '{}': {}", JobId, Response.Error->ErrorMessage); return Result; } if (static_cast(Response.StatusCode) == 404) { Result.Status = "dead"; return Result; } if (!Response.IsSuccess()) { ZEN_WARN("NomadTestClient: GetJobStatus failed with HTTP/{}", static_cast(Response.StatusCode)); return Result; } std::string Body(Response.AsText()); std::string Err; const json11::Json Json = json11::Json::parse(Body, Err); if (!Err.empty()) { ZEN_WARN("NomadTestClient: invalid JSON in GetJobStatus response: {}", Err); return Result; } Result.Id = Json["ID"].string_value(); Result.Status = Json["Status"].string_value(); if (const json11::Json Desc = Json["StatusDescription"]; Desc.is_string()) { Result.StatusDescription = Desc.string_value(); } return Result; } void NomadTestClient::StopJob(std::string_view JobId) { ZEN_TRACE_CPU("StopNomadJob"); HttpClient::Response Response = m_HttpClient.Delete(fmt::format("v1/job/{}", JobId)); if (!Response || !Response.IsSuccess()) { ZEN_WARN("NomadTestClient: StopJob failed for '{}'", JobId); return; } ZEN_INFO("NomadTestClient: job '{}' stopped", JobId); } std::vector NomadTestClient::GetAllocations(std::string_view JobId) { ZEN_TRACE_CPU("GetNomadAllocations"); std::vector Allocs; HttpClient::Response Response = m_HttpClient.Get(fmt::format("v1/job/{}/allocations", JobId)); if (!Response || !Response.IsSuccess()) { ZEN_WARN("NomadTestClient: GetAllocations failed for '{}'", JobId); return Allocs; } std::string Body(Response.AsText()); std::string Err; const json11::Json Json = json11::Json::parse(Body, Err); if (!Err.empty() || !Json.is_array()) { return Allocs; } for (const json11::Json& AllocVal : Json.array_items()) { NomadAllocInfo Alloc; Alloc.Id = AllocVal["ID"].string_value(); Alloc.ClientStatus = AllocVal["ClientStatus"].string_value(); if (const json11::Json TaskStates = AllocVal["TaskStates"]; TaskStates.is_object()) { for (const auto& [TaskName, TaskState] : TaskStates.object_items()) { if (TaskState["State"].is_string()) { Alloc.TaskState = TaskState["State"].string_value(); } } } Allocs.push_back(std::move(Alloc)); } return Allocs; } std::vector NomadTestClient::ListJobs(std::string_view Prefix) { ZEN_TRACE_CPU("ListNomadJobs"); std::vector Jobs; std::string Url = "v1/jobs"; if (!Prefix.empty()) { Url = fmt::format("v1/jobs?prefix={}", Prefix); } HttpClient::Response Response = m_HttpClient.Get(Url); if (!Response || !Response.IsSuccess()) { ZEN_WARN("NomadTestClient: ListJobs failed"); return Jobs; } std::string Body(Response.AsText()); std::string Err; const json11::Json Json = json11::Json::parse(Body, Err); if (!Err.empty() || !Json.is_array()) { return Jobs; } for (const json11::Json& JobVal : Json.array_items()) { NomadJobInfo Job; Job.Id = JobVal["ID"].string_value(); Job.Status = JobVal["Status"].string_value(); if (const json11::Json Desc = JobVal["StatusDescription"]; Desc.is_string()) { Job.StatusDescription = Desc.string_value(); } Jobs.push_back(std::move(Job)); } return Jobs; } } // namespace zen::nomad