// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include namespace zen::consul { ////////////////////////////////////////////////////////////////////////// struct ConsulProcess::Impl { Impl(std::string_view BaseUri) : m_HttpClient(BaseUri) {} ~Impl() = default; void SpawnConsulAgent() { if (m_ProcessHandle.IsValid()) { return; } CreateProcOptions Options; Options.Flags |= CreateProcOptions::Flag_Windows_NewProcessGroup; CreateProcResult Result = CreateProc("consul" ZEN_EXE_SUFFIX_LITERAL, "consul" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options); if (Result) { m_ProcessHandle.Initialize(Result); Stopwatch Timer; // Poll to check when the agent is ready do { Sleep(100); HttpClient::Response Resp = m_HttpClient.Get("v1/status/leader"); if (Resp) { ZEN_INFO("Consul agent started successfully (waited {})", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); return; } } while (Timer.GetElapsedTimeMs() < 10000); } // Report failure! ZEN_WARN("Consul agent failed to start within timeout period"); } void StopConsulAgent() { if (!m_ProcessHandle.IsValid()) { return; } // This waits for the process to exit and also resets the handle m_ProcessHandle.Kill(); } private: ProcessHandle m_ProcessHandle; HttpClient m_HttpClient; }; ConsulProcess::ConsulProcess() : m_Impl(std::make_unique("http://localhost:8500/")) { } ConsulProcess::~ConsulProcess() { } void ConsulProcess::SpawnConsulAgent() { m_Impl->SpawnConsulAgent(); } void ConsulProcess::StopConsulAgent() { m_Impl->StopConsulAgent(); } ////////////////////////////////////////////////////////////////////////// ConsulClient::ConsulClient(std::string_view BaseUri) : m_HttpClient(BaseUri) { } ConsulClient::~ConsulClient() { } void ConsulClient::SetKeyValue(std::string_view Key, std::string_view Value) { IoBuffer ValueBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Value)); HttpClient::Response Result = m_HttpClient.Put(fmt::format("v1/kv/{}", Key), ValueBuffer, {{"Content-Type", "text/plain"}, {"Accept", "application/json"}}); if (!Result) { throw runtime_error("ConsulClient::SetKeyValue() failed to set key '{}' ({})", Key, Result.ErrorMessage("")); } } std::string ConsulClient::GetKeyValue(std::string_view Key) { HttpClient::Response Result = m_HttpClient.Get(fmt::format("v1/kv/{}?raw", Key)); if (!Result) { throw runtime_error("ConsulClient::GetKeyValue() failed to get key '{}' ({})", Key, Result.ErrorMessage("")); } return Result.ToText(); } void ConsulClient::DeleteKey(std::string_view Key) { HttpClient::Response Result = m_HttpClient.Delete(fmt::format("v1/kv/{}", Key)); if (!Result) { throw runtime_error("ConsulClient::DeleteKey() failed to delete key '{}' ({})", Key, Result.ErrorMessage("")); } } bool ConsulClient::RegisterService(const ServiceRegistrationInfo& Info) { std::string JsonPayload = fmt::format( R"({{"ID":"{}","Name":"{}","Address":"{}","Port":{},"Check":{{"HTTP":"http://{}:{}/{}","Interval":"{}s","DeregisterCriticalServiceAfter":"{}s"}}}})", Info.ServiceId, Info.ServiceName, Info.Address, Info.Port, Info.Address, Info.Port, Info.HealthEndpoint, Info.HealthIntervalSeconds, Info.DeregisterAfterSeconds); IoBuffer PayloadBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(JsonPayload)); HttpClient::Response Result = m_HttpClient.Put("v1/agent/service/register", PayloadBuffer, {{"Content-Type", "application/json"}, {"Accept", "application/json"}}); if (!Result) { ZEN_WARN("ConsulClient::RegisterService() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage("")); return false; } return true; } bool ConsulClient::DeregisterService(std::string_view ServiceId) { HttpClient::Response Result = m_HttpClient.Put(fmt::format("v1/agent/service/deregister/{}", ServiceId), IoBuffer{}, {{"Accept", "application/json"}}); if (!Result) { ZEN_WARN("ConsulClient::DeregisterService() failed to deregister service '{}' ({})", ServiceId, Result.ErrorMessage("")); return false; } return true; } bool ConsulClient::HasService(std::string_view ServiceId) { HttpClient::Response Result = m_HttpClient.Get("v1/agent/services"); if (!Result) { return false; } std::string ServicesJson = Result.ToText(); // Simple string search for the service ID in the JSON response return ServicesJson.find(fmt::format("\"{}\"", ServiceId)) != std::string::npos; } std::string ConsulClient::GetAgentServicesJson() { HttpClient::Response Result = m_HttpClient.Get("v1/agent/services"); if (!Result) { return "{}"; } return Result.ToText(); } ////////////////////////////////////////////////////////////////////////// ServiceRegistration::ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info) : m_Client(Client), m_Info(Info) { m_RegistrationThread = std::thread(&ServiceRegistration::RegistrationLoop, this); } ServiceRegistration::~ServiceRegistration() { m_ShouldStop.store(true); if (m_RegistrationThread.joinable()) { m_RegistrationThread.join(); } if (m_IsRegistered.load()) { if (!m_Client->DeregisterService(m_Info.ServiceId)) { ZEN_WARN("ServiceRegistration: Failed to deregister service '{}' during cleanup", m_Info.ServiceId); } } } void ServiceRegistration::RegistrationLoop() { SetCurrentThreadName("ConsulRegistration"); // Exponential backoff delays: 100ms, 200ms, 400ms constexpr int BackoffDelaysMs[] = {100, 200, 400}; constexpr int MaxAttempts = 3; constexpr int RetryIntervalMs = 5000; while (!m_ShouldStop.load()) { // Try to register with exponential backoff bool Registered = false; for (int Attempt = 0; Attempt < MaxAttempts && !m_ShouldStop.load(); ++Attempt) { if (m_Client->RegisterService(m_Info)) { m_IsRegistered.store(true); ZEN_INFO("ServiceRegistration: Successfully registered service '{}'", m_Info.ServiceId); Registered = true; break; } // Apply backoff delay before next attempt (except after last attempt) if (Attempt < MaxAttempts - 1) { Sleep(BackoffDelaysMs[Attempt]); } } if (Registered) { // Registration succeeded, exit the loop return; } // All attempts failed, log warning and wait before retrying ZEN_WARN("ServiceRegistration: Failed to register service '{}' after {} attempts, retrying in {}s", m_Info.ServiceId, MaxAttempts, RetryIntervalMs / 1000); // Wait for 5 seconds before retrying, checking for stop signal periodically constexpr int CheckIntervalMs = 100; int WaitedMs = 0; while (WaitedMs < RetryIntervalMs && !m_ShouldStop.load()) { Sleep(CheckIntervalMs); WaitedMs += CheckIntervalMs; } } } } // namespace zen::consul