diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-03 20:45:52 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2026-03-03 20:45:52 +0100 |
| commit | 610dac1a31e19edb5fb9ba9080487ad68ddf85d8 (patch) | |
| tree | 8c384f4dda2eef7530e08e4a45c23b507389000c /src/zenutil | |
| parent | fix objectstore uri path parsing (#801) (diff) | |
| download | zen-610dac1a31e19edb5fb9ba9080487ad68ddf85d8.tar.xz zen-610dac1a31e19edb5fb9ba9080487ad68ddf85d8.zip | |
initial implementation from
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/consul/consul.cpp | 150 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/consul.h | 51 |
2 files changed, 201 insertions, 0 deletions
diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp index 6ddebf97a..ef13137d9 100644 --- a/src/zenutil/consul/consul.cpp +++ b/src/zenutil/consul/consul.cpp @@ -7,6 +7,7 @@ #include <zencore/logging.h> #include <zencore/process.h> #include <zencore/string.h> +#include <zencore/thread.h> #include <zencore/timer.h> #include <fmt/format.h> @@ -137,4 +138,153 @@ ConsulClient::DeleteKey(std::string_view Key) } } +bool +ConsulClient::RegisterService(const ServiceRegistrationInfo& Info) +{ + std::string JsonPayload = fmt::format( + R"({{"ID":"{}","Name":"{}","Address":"{}","Port":{},"Check":{{"HTTP":"http://{}:{}/{}","Interval":"{}s","DeregisterCriticalServiceAfter":"{}s"}}}})", + Info.ServiceId, + Info.ServiceName, + Info.Address, + Info.Port, + Info.Address, + Info.Port, + Info.HealthEndpoint, + Info.HealthIntervalSeconds, + Info.DeregisterAfterSeconds); + + IoBuffer PayloadBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(JsonPayload)); + HttpClient::Response Result = m_HttpClient.Put("v1/agent/service/register", + PayloadBuffer, + {{"Content-Type", "application/json"}, {"Accept", "application/json"}}); + + if (!Result) + { + ZEN_WARN("ConsulClient::RegisterService() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage("")); + return false; + } + + return true; +} + +bool +ConsulClient::DeregisterService(std::string_view ServiceId) +{ + HttpClient::Response Result = + m_HttpClient.Put(fmt::format("v1/agent/service/deregister/{}", ServiceId), IoBuffer{}, {{"Accept", "application/json"}}); + + if (!Result) + { + ZEN_WARN("ConsulClient::DeregisterService() failed to deregister service '{}' ({})", ServiceId, Result.ErrorMessage("")); + return false; + } + + return true; +} + +bool +ConsulClient::HasService(std::string_view ServiceId) +{ + HttpClient::Response Result = m_HttpClient.Get("v1/agent/services"); + if (!Result) + { + return false; + } + + std::string ServicesJson = Result.ToText(); + // Simple string search for the service ID in the JSON response + return ServicesJson.find(fmt::format("\"{}\"", ServiceId)) != std::string::npos; +} + +std::string +ConsulClient::GetAgentServicesJson() +{ + HttpClient::Response Result = m_HttpClient.Get("v1/agent/services"); + if (!Result) + { + return "{}"; + } + return Result.ToText(); +} + +////////////////////////////////////////////////////////////////////////// + +ServiceRegistration::ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info) : m_Client(Client), m_Info(Info) +{ + m_RegistrationThread = std::thread(&ServiceRegistration::RegistrationLoop, this); +} + +ServiceRegistration::~ServiceRegistration() +{ + m_ShouldStop.store(true); + + if (m_RegistrationThread.joinable()) + { + m_RegistrationThread.join(); + } + + if (m_IsRegistered.load()) + { + if (!m_Client->DeregisterService(m_Info.ServiceId)) + { + ZEN_WARN("ServiceRegistration: Failed to deregister service '{}' during cleanup", m_Info.ServiceId); + } + } +} + +void +ServiceRegistration::RegistrationLoop() +{ + SetCurrentThreadName("ConsulRegistration"); + + // Exponential backoff delays: 100ms, 200ms, 400ms + constexpr int BackoffDelaysMs[] = {100, 200, 400}; + constexpr int MaxAttempts = 3; + constexpr int RetryIntervalMs = 5000; + + while (!m_ShouldStop.load()) + { + // Try to register with exponential backoff + bool Registered = false; + + for (int Attempt = 0; Attempt < MaxAttempts && !m_ShouldStop.load(); ++Attempt) + { + if (m_Client->RegisterService(m_Info)) + { + m_IsRegistered.store(true); + ZEN_INFO("ServiceRegistration: Successfully registered service '{}'", m_Info.ServiceId); + Registered = true; + break; + } + + // Apply backoff delay before next attempt (except after last attempt) + if (Attempt < MaxAttempts - 1) + { + Sleep(BackoffDelaysMs[Attempt]); + } + } + + if (Registered) + { + // Registration succeeded, exit the loop + return; + } + + // All attempts failed, log warning and wait before retrying + ZEN_WARN("ServiceRegistration: Failed to register service '{}' after {} attempts, retrying in {}s", + m_Info.ServiceId, + MaxAttempts, + RetryIntervalMs / 1000); + + // Wait for 5 seconds before retrying, checking for stop signal periodically + constexpr int CheckIntervalMs = 100; + int WaitedMs = 0; + while (WaitedMs < RetryIntervalMs && !m_ShouldStop.load()) + { + Sleep(CheckIntervalMs); + WaitedMs += CheckIntervalMs; + } + } +} + } // namespace zen::consul diff --git a/src/zenutil/include/zenutil/consul.h b/src/zenutil/include/zenutil/consul.h index 08871fa66..74e6abce4 100644 --- a/src/zenutil/include/zenutil/consul.h +++ b/src/zenutil/include/zenutil/consul.h @@ -5,11 +5,25 @@ #include <zenbase/zenbase.h> #include <zenhttp/httpclient.h> +#include <atomic> +#include <cstdint> #include <string> #include <string_view> +#include <thread> namespace zen::consul { +struct ServiceRegistrationInfo +{ + std::string ServiceId; + std::string ServiceName; + std::string Address; + uint16_t Port = 0; + std::string HealthEndpoint; + int HealthIntervalSeconds = 10; + int DeregisterAfterSeconds = 30; +}; + class ConsulClient { public: @@ -23,6 +37,13 @@ public: std::string GetKeyValue(std::string_view Key); void DeleteKey(std::string_view Key); + bool RegisterService(const ServiceRegistrationInfo& Info); + bool DeregisterService(std::string_view ServiceId); + + // Query methods for testing + bool HasService(std::string_view ServiceId); + std::string GetAgentServicesJson(); + private: HttpClient m_HttpClient; }; @@ -44,4 +65,34 @@ private: std::unique_ptr<Impl> m_Impl; }; +/** + * RAII wrapper for Consul service registration. + * + * Manages the lifecycle of a Consul service registration with: + * - Async registration in a background thread + * - Exponential backoff retry on failure (100ms, 200ms, 400ms) + * - Continuous background retry every 5 seconds after initial failures + * - Automatic deregistration on destruction + */ +class ServiceRegistration +{ +public: + ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info); + ~ServiceRegistration(); + + ServiceRegistration(const ServiceRegistration&) = delete; + ServiceRegistration& operator=(const ServiceRegistration&) = delete; + + bool IsRegistered() const { return m_IsRegistered.load(); } + +private: + ConsulClient* m_Client; + ServiceRegistrationInfo m_Info; + std::atomic<bool> m_IsRegistered{false}; + std::atomic<bool> m_ShouldStop{false}; + std::thread m_RegistrationThread; + + void RegistrationLoop(); +}; + } // namespace zen::consul |