// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace zen::consul { ////////////////////////////////////////////////////////////////////////// struct ConsulProcess::Impl { Impl(std::string_view BaseUri, std::string_view Token = "") : m_Token(Token), m_HttpClient(BaseUri) {} ~Impl() = default; void SpawnConsulAgent() { if (m_ProcessHandle.IsValid()) { return; } CreateProcOptions Options; Options.Flags |= CreateProcOptions::Flag_NewProcessGroup; const std::filesystem::path ConsulExe = GetRunningExecutablePath().parent_path() / ("consul" ZEN_EXE_SUFFIX_LITERAL); CreateProcResult Result = CreateProc(ConsulExe, "consul" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options); if (Result) { m_ProcessHandle.Initialize(Result); Stopwatch Timer; // Poll to check when the agent is ready HttpClient::KeyValueMap AdditionalHeaders; if (!m_Token.empty()) { AdditionalHeaders.Entries.emplace("X-Consul-Token", m_Token); } do { Sleep(100); HttpClient::Response Resp = m_HttpClient.Get("v1/status/leader", AdditionalHeaders); if (Resp) { ZEN_INFO("Consul agent started successfully (waited {})", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); return; } } while (Timer.GetElapsedTimeMs() < 10000); } // Report failure! ZEN_WARN("Consul agent failed to start within timeout period"); } void StopConsulAgent() { if (!m_ProcessHandle.IsValid()) { return; } // This waits for the process to exit and also resets the handle m_ProcessHandle.Kill(); } private: ProcessHandle m_ProcessHandle; std::string m_Token; HttpClient m_HttpClient; }; ConsulProcess::ConsulProcess() : m_Impl(std::make_unique("http://localhost:8500/")) { } ConsulProcess::~ConsulProcess() { } void ConsulProcess::SpawnConsulAgent() { m_Impl->SpawnConsulAgent(); } void ConsulProcess::StopConsulAgent() { m_Impl->StopConsulAgent(); } ////////////////////////////////////////////////////////////////////////// ConsulClient::ConsulClient(const Configuration& Config) : m_Config(Config), m_HttpClient(m_Config.BaseUri) { } ConsulClient::~ConsulClient() { } void ConsulClient::SetKeyValue(std::string_view Key, std::string_view Value) { HttpClient::KeyValueMap AdditionalHeaders; ApplyCommonHeaders(AdditionalHeaders); AdditionalHeaders.Entries.emplace(HttpClient::Accept(HttpContentType::kJSON)); IoBuffer ValueBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Value)); ValueBuffer.SetContentType(HttpContentType::kText); HttpClient::Response Result = m_HttpClient.Put(fmt::format("v1/kv/{}", Key), ValueBuffer, AdditionalHeaders); if (!Result) { throw runtime_error("ConsulClient::SetKeyValue() failed to set key '{}' ({})", Key, Result.ErrorMessage("")); } } std::string ConsulClient::GetKeyValue(std::string_view Key) { HttpClient::KeyValueMap AdditionalHeaders; ApplyCommonHeaders(AdditionalHeaders); HttpClient::Response Result = m_HttpClient.Get(fmt::format("v1/kv/{}?raw", Key), AdditionalHeaders); if (!Result) { throw runtime_error("ConsulClient::GetKeyValue() failed to get key '{}' ({})", Key, Result.ErrorMessage("")); } return Result.ToText(); } void ConsulClient::DeleteKey(std::string_view Key) { HttpClient::KeyValueMap AdditionalHeaders; ApplyCommonHeaders(AdditionalHeaders); HttpClient::Response Result = m_HttpClient.Delete(fmt::format("v1/kv/{}", Key), AdditionalHeaders); if (!Result) { throw runtime_error("ConsulClient::DeleteKey() failed to delete key '{}' ({})", Key, Result.ErrorMessage("")); } } bool ConsulClient::RegisterService(const ServiceRegistrationInfo& Info) { using namespace std::literals; HttpClient::KeyValueMap AdditionalHeaders; ApplyCommonHeaders(AdditionalHeaders); AdditionalHeaders.Entries.emplace(HttpClient::Accept(HttpContentType::kJSON)); HttpClient::KeyValueMap AdditionalParameters(std::make_pair("replace-existing-checks", "true")); CbObjectWriter Writer; { Writer.AddString("ID"sv, Info.ServiceId); Writer.AddString("Name"sv, Info.ServiceName); if (!Info.Address.empty()) { Writer.AddString("Address"sv, Info.Address); } Writer.AddInteger("Port"sv, Info.Port); if (!Info.Tags.empty()) { Writer.BeginArray("Tags"sv); for (const std::pair& Tag : Info.Tags) { Writer.AddString(fmt::format("{}:{}", Tag.first, Tag.second)); } Writer.EndArray(); // Tags } if (Info.HealthIntervalSeconds != 0) { // Consul requires Interval whenever HTTP is specified; omit the Check block entirely // when no interval is configured (e.g. during Provisioning). Writer.BeginObject("Check"sv); { Writer.AddString( "HTTP"sv, fmt::format("http://{}:{}/{}", Info.Address.empty() ? "localhost" : Info.Address, Info.Port, Info.HealthEndpoint)); Writer.AddString("Interval"sv, fmt::format("{}s", Info.HealthIntervalSeconds)); if (Info.DeregisterAfterSeconds != 0) { Writer.AddString("DeregisterCriticalServiceAfter"sv, fmt::format("{}s", Info.DeregisterAfterSeconds)); } } Writer.EndObject(); // Check } } ExtendableStringBuilder<512> SB; CompactBinaryToJson(Writer.Save(), SB); IoBuffer PayloadBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()); PayloadBuffer.SetContentType(HttpContentType::kJSON); HttpClient::Response Result = m_HttpClient.Put("v1/agent/service/register", PayloadBuffer, AdditionalHeaders, AdditionalParameters); if (!Result) { ZEN_WARN("ConsulClient::RegisterService() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage("")); return false; } return true; } bool ConsulClient::DeregisterService(std::string_view ServiceId) { using namespace std::literals; HttpClient::KeyValueMap AdditionalHeaders; ApplyCommonHeaders(AdditionalHeaders); AdditionalHeaders.Entries.emplace(HttpClient::Accept(HttpContentType::kJSON)); HttpClient::Response Result = m_HttpClient.Put(fmt::format("v1/agent/service/deregister/{}", ServiceId), IoBuffer{}, AdditionalHeaders); if (Result) { return true; } // Agent deregister failed — fall back to catalog deregister. // This handles cases where the service was registered via a different Consul agent // (e.g. load-balanced endpoint routing to different agents). std::string NodeName = GetNodeName(); if (!NodeName.empty()) { CbObjectWriter Writer; Writer.AddString("Node"sv, NodeName); Writer.AddString("ServiceID"sv, ServiceId); ExtendableStringBuilder<256> SB; CompactBinaryToJson(Writer.Save(), SB); IoBuffer PayloadBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()); PayloadBuffer.SetContentType(HttpContentType::kJSON); HttpClient::Response CatalogResult = m_HttpClient.Put("v1/catalog/deregister", PayloadBuffer, AdditionalHeaders); if (CatalogResult) { ZEN_INFO("ConsulClient::DeregisterService() deregistered service '{}' via catalog fallback (agent error: {})", ServiceId, Result.ErrorMessage("")); return true; } ZEN_WARN("ConsulClient::DeregisterService() failed to deregister service '{}' (agent: {}, catalog: {})", ServiceId, Result.ErrorMessage(""), CatalogResult.ErrorMessage("")); } else { ZEN_WARN( "ConsulClient::DeregisterService() failed to deregister service '{}' (agent: {}, could not determine node name for catalog " "fallback)", ServiceId, Result.ErrorMessage("")); } return false; } std::string ConsulClient::GetNodeName() { using namespace std::literals; HttpClient::KeyValueMap AdditionalHeaders; ApplyCommonHeaders(AdditionalHeaders); HttpClient::Response Result = m_HttpClient.Get("v1/agent/self", AdditionalHeaders); if (!Result) { return {}; } std::string JsonError; CbFieldIterator Root = LoadCompactBinaryFromJson(Result.AsText(), JsonError); if (!Root || !JsonError.empty()) { return {}; } for (CbFieldView Field : Root) { if (Field.GetName() == "Config"sv) { CbObjectView Config = Field.AsObjectView(); if (Config) { return std::string(Config["NodeName"sv].AsString()); } } } return {}; } void ConsulClient::ApplyCommonHeaders(HttpClient::KeyValueMap& InOutHeaderMap) { std::string Token; if (!m_Config.StaticToken.empty()) { Token = m_Config.StaticToken; } else if (!m_Config.TokenEnvName.empty()) { Token = GetEnvVariable(m_Config.TokenEnvName); } if (!Token.empty()) { InOutHeaderMap.Entries.emplace("X-Consul-Token", Token); } } bool ConsulClient::FindServiceInJson(std::string_view Json, std::string_view ServiceId) { using namespace std::literals; std::string JsonError; CbFieldIterator Root = LoadCompactBinaryFromJson(Json, JsonError); if (Root && JsonError.empty()) { for (CbFieldView RootIterator : Root) { CbObjectView ServiceObject = RootIterator.AsObjectView(); if (ServiceObject) { for (CbFieldView ServiceIterator : ServiceObject) { CbObjectView ObjectIterator = ServiceIterator.AsObjectView(); if (ObjectIterator["ID"sv].AsString() == ServiceId) { return true; } } } } } return false; } bool ConsulClient::HasService(std::string_view ServiceId) { HttpClient::KeyValueMap AdditionalHeaders; ApplyCommonHeaders(AdditionalHeaders); HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", AdditionalHeaders); if (!Result) { return false; } return FindServiceInJson(Result.AsText(), ServiceId); } bool ConsulClient::WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int WaitSeconds) { HttpClient::KeyValueMap AdditionalHeaders; ApplyCommonHeaders(AdditionalHeaders); // Note: m_HttpClient uses unlimited HTTP timeout (Timeout{0}); the WaitSeconds parameter // governs the server-side bound on the blocking query. Do not add a separate client timeout. HttpClient::KeyValueMap Parameters({{"index", std::to_string(InOutIndex)}, {"wait", fmt::format("{}s", WaitSeconds)}}); HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", AdditionalHeaders, Parameters); if (!Result) { return false; } auto IndexIt = Result.Header->find("X-Consul-Index"); if (IndexIt != Result.Header->end()) { std::optional ParsedIndex = ParseInt(IndexIt->second); if (ParsedIndex.has_value()) { InOutIndex = ParsedIndex.value(); } } return FindServiceInJson(Result.AsText(), ServiceId); } std::string ConsulClient::GetAgentServicesJson() { HttpClient::KeyValueMap AdditionalHeaders; ApplyCommonHeaders(AdditionalHeaders); HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", AdditionalHeaders); if (!Result) { return "{}"; } return Result.ToText(); } std::string ConsulClient::GetAgentChecksJson() { HttpClient::KeyValueMap AdditionalHeaders; ApplyCommonHeaders(AdditionalHeaders); HttpClient::Response Result = m_HttpClient.Get("v1/agent/checks", AdditionalHeaders); if (!Result) { return "{}"; } return Result.ToText(); } ////////////////////////////////////////////////////////////////////////// ServiceRegistration::ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info) : m_Client(Client), m_Info(Info) { m_RegistrationThread = std::thread(&ServiceRegistration::RegistrationLoop, this); } ServiceRegistration::~ServiceRegistration() { try { m_StopEvent.Set(); if (m_RegistrationThread.joinable()) { m_RegistrationThread.join(); } if (m_IsRegistered.load()) { if (!m_Client->DeregisterService(m_Info.ServiceId)) { ZEN_WARN("ServiceRegistration: Failed to deregister service '{}' during cleanup", m_Info.ServiceId); } } } catch (const std::exception& Ex) { ZEN_ERROR(":~ServiceRegistration() threw exception: {}", Ex.what()); } } void ServiceRegistration::WaitForReadyEvent(int MaxWaitMS) { if (m_IsRegistered.load()) { return; } m_ReadyWakeupEvent.Wait(MaxWaitMS); } void ServiceRegistration::RegistrationLoop() { try { SetCurrentThreadName("ConsulRegistration"); // Exponential backoff delays: 100ms, 200ms, 400ms constexpr int BackoffDelaysMs[] = {100, 200, 400}; constexpr int MaxAttempts = 3; constexpr int RetryIntervalMs = 5000; while (true) { bool Succeeded = false; // Try to register with exponential backoff for (int Attempt = 0; Attempt < MaxAttempts; ++Attempt) { if (m_Client->RegisterService(m_Info)) { Succeeded = true; break; } if (m_StopEvent.Wait(BackoffDelaysMs[Attempt])) { return; } } if (Succeeded || m_Client->RegisterService(m_Info)) { break; } // All attempts failed, log warning and wait before retrying ZEN_WARN("ServiceRegistration: Failed to register service '{}' after {} attempts, retrying in {}s", m_Info.ServiceId, MaxAttempts + 1, RetryIntervalMs / 1000); // Wait for 5 seconds before retrying; m_StopEvent.Set() in the destructor // wakes this immediately so the thread exits without polling. if (m_StopEvent.Wait(RetryIntervalMs)) { return; } } ZEN_INFO("ServiceRegistration: Successfully registered service '{}'", m_Info.ServiceId); m_IsRegistered.store(true); m_ReadyWakeupEvent.Set(); } catch (const std::exception& Ex) { ZEN_ERROR("ServiceRegistration::RegistrationLoop failed with exception: {}", Ex.what()); } } ////////////////////////////////////////////////////////////////////////// // Tests #if ZEN_WITH_TESTS void consul_forcelink() { } struct MockHealthService : public HttpService { std::atomic FailHealth{false}; std::atomic HealthCheckCount{0}; const char* BaseUri() const override { return "/"; } void HandleRequest(HttpServerRequest& Request) override { std::string_view Uri = Request.RelativeUri(); if (Uri == "health/" || Uri == "health") { HealthCheckCount.fetch_add(1); if (FailHealth.load()) { Request.WriteResponse(HttpResponseCode::ServiceUnavailable); } else { Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok"); } return; } Request.WriteResponse(HttpResponseCode::NotFound); } }; struct TestHealthServer { MockHealthService Mock; void Start() { m_TmpDir.emplace(); m_Server = CreateHttpServer(HttpServerConfig{.ServerClass = "asio"}); m_Port = m_Server->Initialize(0, m_TmpDir->Path() / "http"); REQUIRE(m_Port != -1); m_Server->RegisterService(Mock); m_ServerThread = std::thread([this]() { m_Server->Run(false); }); } int Port() const { return m_Port; } ~TestHealthServer() { if (m_Server) { m_Server->RequestExit(); } if (m_ServerThread.joinable()) { m_ServerThread.join(); } if (m_Server) { m_Server->Close(); } } private: std::optional m_TmpDir; Ref m_Server; std::thread m_ServerThread; int m_Port = -1; }; static bool WaitForCondition(std::function Predicate, int TimeoutMs, int PollIntervalMs = 200) { Stopwatch Timer; while (Timer.GetElapsedTimeMs() < static_cast(TimeoutMs)) { if (Predicate()) { return true; } Sleep(PollIntervalMs); } return Predicate(); } static std::string GetCheckStatus(ConsulClient& Client, std::string_view ServiceId) { using namespace std::literals; std::string JsonError; CbFieldIterator ChecksRoot = LoadCompactBinaryFromJson(Client.GetAgentChecksJson(), JsonError); if (!ChecksRoot || !JsonError.empty()) { return {}; } for (CbFieldView F : ChecksRoot) { if (!F.IsObject()) { continue; } for (CbFieldView C : F.AsObjectView()) { CbObjectView Check = C.AsObjectView(); if (Check["ServiceID"sv].AsString() == ServiceId) { return std::string(Check["Status"sv].AsString()); } } } return {}; } TEST_SUITE_BEGIN("util.consul"); TEST_CASE("util.consul.service_lifecycle") { ConsulProcess ConsulProc; ConsulProc.SpawnConsulAgent(); TestHealthServer HealthServer; HealthServer.Start(); ConsulClient Client({.BaseUri = "http://localhost:8500/"}); const std::string ServiceId = "test-health-svc"; ServiceRegistrationInfo Info; Info.ServiceId = ServiceId; Info.ServiceName = "zen-test-health"; Info.Address = "127.0.0.1"; Info.Port = static_cast(HealthServer.Port()); Info.HealthEndpoint = "health/"; Info.HealthIntervalSeconds = 1; Info.DeregisterAfterSeconds = 60; // Phase 1: Register and verify Consul sends health checks to our service REQUIRE(Client.RegisterService(Info)); REQUIRE(Client.HasService(ServiceId)); REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000)); CHECK(HealthServer.Mock.HealthCheckCount.load() >= 1); CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing"); // Phase 2: Explicit deregister REQUIRE(Client.DeregisterService(ServiceId)); CHECK_FALSE(Client.HasService(ServiceId)); // Phase 3: Register again, verify passing, then fail health and verify check goes critical HealthServer.Mock.HealthCheckCount.store(0); HealthServer.Mock.FailHealth.store(false); REQUIRE(Client.RegisterService(Info)); REQUIRE(Client.HasService(ServiceId)); REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000)); CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing"); HealthServer.Mock.FailHealth.store(true); // Wait for Consul to observe the failing check REQUIRE(WaitForCondition([&]() { return GetCheckStatus(Client, ServiceId) == "critical"; }, 10000)); CHECK_EQ(GetCheckStatus(Client, ServiceId), "critical"); // Phase 4: Explicit deregister while critical REQUIRE(Client.DeregisterService(ServiceId)); CHECK_FALSE(Client.HasService(ServiceId)); // Phase 5: Deregister an already-deregistered service - should not crash Client.DeregisterService(ServiceId); CHECK_FALSE(Client.HasService(ServiceId)); ConsulProc.StopConsulAgent(); } TEST_SUITE_END(); #endif } // namespace zen::consul