diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-03 20:45:52 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2026-03-03 20:45:52 +0100 |
| commit | 610dac1a31e19edb5fb9ba9080487ad68ddf85d8 (patch) | |
| tree | 8c384f4dda2eef7530e08e4a45c23b507389000c /src | |
| parent | fix objectstore uri path parsing (#801) (diff) | |
| download | zen-610dac1a31e19edb5fb9ba9080487ad68ddf85d8.tar.xz zen-610dac1a31e19edb5fb9ba9080487ad68ddf85d8.zip | |
initial implementation from
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/diag/diagsvcs.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/hub/hubservice.cpp | 71 | ||||
| -rw-r--r-- | src/zenserver/hub/hubservice.h | 5 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 55 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.h | 9 | ||||
| -rw-r--r-- | src/zenutil/consul/consul.cpp | 150 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/consul.h | 51 |
7 files changed, 335 insertions, 10 deletions
diff --git a/src/zenserver/diag/diagsvcs.cpp b/src/zenserver/diag/diagsvcs.cpp index d8d53b0e3..e6811ef04 100644 --- a/src/zenserver/diag/diagsvcs.cpp +++ b/src/zenserver/diag/diagsvcs.cpp @@ -36,7 +36,7 @@ HttpHealthService::HttpHealthService() "", [](HttpRouterRequest& RoutedReq) { HttpServerRequest& HttpReq = RoutedReq.ServerRequest(); - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, u8"OK!"sv); + HttpReq.WriteResponse(HttpResponseCode::OK); }, HttpVerb::kGet); @@ -118,7 +118,7 @@ HttpHealthService::HandleRequest(HttpServerRequest& Request) ZEN_MEMSCOPE(GetHealthTag()); if (!m_Router.HandleRequest(Request)) { - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, u8"OK!"sv); + Request.WriteResponse(HttpResponseCode::OK); } } diff --git a/src/zenserver/hub/hubservice.cpp b/src/zenserver/hub/hubservice.cpp index bf0e294c5..cc34e86bb 100644 --- a/src/zenserver/hub/hubservice.cpp +++ b/src/zenserver/hub/hubservice.cpp @@ -8,9 +8,9 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> -#include <zencore/process.h> #include <zencore/scopeguard.h> #include <zencore/system.h> +#include <zenutil/consul.h> #include <zenutil/zenserverprocess.h> ZEN_THIRD_PARTY_INCLUDES_START @@ -135,7 +135,8 @@ struct StorageServerInstance StorageServerInstance(ZenServerEnvironment& RunEnvironment, std::string_view ModuleId, std::filesystem::path FileHydrationPath, - std::filesystem::path HydrationTempPath); + std::filesystem::path HydrationTempPath, + consul::ConsulClient* ConsulClient = nullptr); ~StorageServerInstance(); void Provision(); @@ -169,6 +170,7 @@ private: #if ZEN_PLATFORM_WINDOWS JobObject* m_JobObject = nullptr; #endif + consul::ConsulClient* m_ConsulClient; void SpawnServerProcess(); @@ -179,10 +181,12 @@ private: StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, std::string_view ModuleId, std::filesystem::path FileHydrationPath, - std::filesystem::path HydrationTempPath) + std::filesystem::path HydrationTempPath, + consul::ConsulClient* ConsulClient) : m_ModuleId(ModuleId) , m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer) , m_HydrationPath(FileHydrationPath) +, m_ConsulClient(ConsulClient) { m_BaseDir = RunEnvironment.CreateChildDir(ModuleId); m_TempDir = HydrationTempPath / ModuleId; @@ -234,6 +238,28 @@ StorageServerInstance::Provision() SpawnServerProcess(); } + // Register with Consul if client is available + if (m_ConsulClient != nullptr) + { + consul::ServiceRegistrationInfo Info; + Info.ServiceId = m_ModuleId; + Info.ServiceName = "zen-service"; + Info.Address = "localhost"; + Info.Port = GetBasePort(); + Info.HealthEndpoint = "health"; + Info.HealthIntervalSeconds = 10; + Info.DeregisterAfterSeconds = 30; + + if (!m_ConsulClient->RegisterService(Info)) + { + ZEN_WARN("Failed to register storage server instance for module '{}' with Consul, continuing anyway", m_ModuleId); + } + else + { + ZEN_INFO("Registered storage server instance for module '{}' with Consul as '{}'", m_ModuleId, Info.ServiceName); + } + } + m_IsProvisioned = true; } @@ -251,6 +277,19 @@ StorageServerInstance::Deprovision() ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId); + // Deregister from Consul before shutdown + if (m_ConsulClient != nullptr) + { + if (!m_ConsulClient->DeregisterService(m_ModuleId)) + { + ZEN_WARN("Failed to deregister storage server instance for module '{}' from Consul, continuing anyway", m_ModuleId); + } + else + { + ZEN_INFO("Deregistered storage server instance for module '{}' from Consul", m_ModuleId); + } + } + m_ServerInstance.Shutdown(); Dehydrate(); @@ -445,9 +484,12 @@ struct HttpHubService::Impl return false; } - IsNewInstance = true; - auto NewInstance = - std::make_unique<StorageServerInstance>(m_RunEnvironment, ModuleId, m_FileHydrationPath, m_HydrationTempPath); + IsNewInstance = true; + auto NewInstance = std::make_unique<StorageServerInstance>(m_RunEnvironment, + ModuleId, + m_FileHydrationPath, + m_HydrationTempPath, + m_ConsulClient); #if ZEN_PLATFORM_WINDOWS if (m_JobObject.IsValid()) { @@ -625,6 +667,7 @@ private: std::unordered_set<std::string> m_DeprovisioningModules; std::unordered_set<std::string> m_ProvisioningModules; int m_MaxInstanceCount = 0; + consul::ConsulClient* m_ConsulClient = nullptr; void UpdateStats(); // Capacity tracking @@ -635,6 +678,9 @@ private: void UpdateCapacityMetrics(); bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); + +public: + void SetConsulClient(consul::ConsulClient* Client) { m_ConsulClient = Client; } }; HttpHubService::Impl::Impl() @@ -718,7 +764,7 @@ HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem m_Router.AddMatcher("moduleid", [](std::string_view Str) -> bool { for (const auto C : Str) { - if (std::isalnum(C) || C == '-') + if (std::isalnum(C) || C == '-' || C == ':') { // fine } @@ -749,6 +795,11 @@ HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem HttpVerb::kGet); m_Router.RegisterRoute( + "health", + [](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "modules/{moduleid}", [this](HttpRouterRequest& Req) { std::string_view ModuleId = Req.GetCapture(1); @@ -874,6 +925,12 @@ HttpHubService::SetNotificationEndpoint(std::string_view UpstreamNotificationEnd } void +HttpHubService::SetConsulClient(consul::ConsulClient* Client) +{ + m_Impl->SetConsulClient(Client); +} + +void HttpHubService::HandleRequest(zen::HttpServerRequest& Request) { m_Router.HandleRequest(Request); diff --git a/src/zenserver/hub/hubservice.h b/src/zenserver/hub/hubservice.h index ef24bba69..0c5b5adb9 100644 --- a/src/zenserver/hub/hubservice.h +++ b/src/zenserver/hub/hubservice.h @@ -6,6 +6,10 @@ #include "hydration.h" +namespace zen::consul { +class ConsulClient; +} + namespace zen { /** ZenServer Hub Service @@ -34,6 +38,7 @@ public: * crashes or is force-killed. Must be called before Initialize(). No-op on non-Windows. */ void SetUseJobObject(bool Enable); + void SetConsulClient(consul::ConsulClient* Client); private: HttpRequestRouter m_Router; diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index d0a0db417..388008350 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -36,6 +36,13 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) "Instance ID for use in notifications", cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""), ""); + + Options.add_option("hub", + "", + "consul-endpoint", + "Consul endpoint URL for service registration (empty = disabled)", + cxxopts::value<std::string>(m_ServerOptions.ConsulEndpoint)->default_value(""), + ""); } void @@ -97,6 +104,9 @@ ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState: InitializeServices(ServerConfig); RegisterServices(ServerConfig); + // Initialize Consul registration after services are set up (REQ-F-12: non-blocking, failure tolerant) + InitializeConsulRegistration(ServerConfig, EffectiveBasePort); + ZenServerBase::Finalize(); return EffectiveBasePort; @@ -109,6 +119,10 @@ ZenHubServer::Cleanup() ZEN_INFO(ZEN_APP_NAME " cleaning up"); try { + // Deregister from Consul first (destructor triggers deregistration) + m_ConsulRegistration.reset(); + m_ConsulClient.reset(); + m_IoContext.stop(); if (m_IoRunner.joinable()) { @@ -162,6 +176,47 @@ ZenHubServer::RegisterServices(const ZenHubServerConfig& ServerConfig) } void +ZenHubServer::InitializeConsulRegistration(const ZenHubServerConfig& ServerConfig, int EffectivePort) +{ + if (ServerConfig.ConsulEndpoint.empty()) + { + ZEN_INFO("Consul registration disabled (no endpoint configured)"); + return; + } + + ZEN_INFO("Initializing Consul registration with endpoint: {}", ServerConfig.ConsulEndpoint); + + try + { + m_ConsulClient = std::make_unique<consul::ConsulClient>(ServerConfig.ConsulEndpoint); + + consul::ServiceRegistrationInfo Info; + Info.ServiceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId); + Info.ServiceName = "zen-hub"; + Info.Address = "localhost"; + Info.Port = static_cast<uint16_t>(EffectivePort); + Info.HealthEndpoint = "hub/health"; + + m_ConsulRegistration = std::make_unique<consul::ServiceRegistration>(m_ConsulClient.get(), Info); + + // Pass Consul client to hub service for child registration + if (m_HubService) + { + m_HubService->SetConsulClient(m_ConsulClient.get()); + } + + ZEN_INFO("Consul service registration initiated for service ID: {}", Info.ServiceId); + } + catch (const std::exception& Ex) + { + // REQ-F-12: Hub should start successfully even if Consul registration fails + ZEN_WARN("Failed to initialize Consul registration (hub will continue without it): {}", Ex.what()); + m_ConsulRegistration.reset(); + m_ConsulClient.reset(); + } +} + +void ZenHubServer::Run() { if (m_ProcessMonitor.IsActive()) diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h index ac14362f0..435d8d248 100644 --- a/src/zenserver/hub/zenhubserver.h +++ b/src/zenserver/hub/zenhubserver.h @@ -4,6 +4,8 @@ #include "zenserver.h" +#include <zenutil/consul.h> + namespace cxxopts { class Options; } @@ -19,7 +21,8 @@ class HttpHubService; struct ZenHubServerConfig : public ZenServerConfig { std::string UpstreamNotificationEndpoint; - std::string InstanceId; // For use in notifications + std::string InstanceId; // For use in notifications + std::string ConsulEndpoint; // If set, enables Consul service registration }; struct ZenHubServerConfigurator : public ZenServerConfiguratorBase @@ -84,9 +87,13 @@ private: std::unique_ptr<HttpHubService> m_HubService; std::unique_ptr<HttpApiService> m_ApiService; + std::unique_ptr<consul::ConsulClient> m_ConsulClient; + std::unique_ptr<consul::ServiceRegistration> m_ConsulRegistration; + void InitializeState(const ZenHubServerConfig& ServerConfig); void InitializeServices(const ZenHubServerConfig& ServerConfig); void RegisterServices(const ZenHubServerConfig& ServerConfig); + void InitializeConsulRegistration(const ZenHubServerConfig& ServerConfig, int EffectivePort); }; } // namespace zen diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp index 6ddebf97a..ef13137d9 100644 --- a/src/zenutil/consul/consul.cpp +++ b/src/zenutil/consul/consul.cpp @@ -7,6 +7,7 @@ #include <zencore/logging.h> #include <zencore/process.h> #include <zencore/string.h> +#include <zencore/thread.h> #include <zencore/timer.h> #include <fmt/format.h> @@ -137,4 +138,153 @@ ConsulClient::DeleteKey(std::string_view Key) } } +bool +ConsulClient::RegisterService(const ServiceRegistrationInfo& Info) +{ + std::string JsonPayload = fmt::format( + R"({{"ID":"{}","Name":"{}","Address":"{}","Port":{},"Check":{{"HTTP":"http://{}:{}/{}","Interval":"{}s","DeregisterCriticalServiceAfter":"{}s"}}}})", + Info.ServiceId, + Info.ServiceName, + Info.Address, + Info.Port, + Info.Address, + Info.Port, + Info.HealthEndpoint, + Info.HealthIntervalSeconds, + Info.DeregisterAfterSeconds); + + IoBuffer PayloadBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(JsonPayload)); + HttpClient::Response Result = m_HttpClient.Put("v1/agent/service/register", + PayloadBuffer, + {{"Content-Type", "application/json"}, {"Accept", "application/json"}}); + + if (!Result) + { + ZEN_WARN("ConsulClient::RegisterService() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage("")); + return false; + } + + return true; +} + +bool +ConsulClient::DeregisterService(std::string_view ServiceId) +{ + HttpClient::Response Result = + m_HttpClient.Put(fmt::format("v1/agent/service/deregister/{}", ServiceId), IoBuffer{}, {{"Accept", "application/json"}}); + + if (!Result) + { + ZEN_WARN("ConsulClient::DeregisterService() failed to deregister service '{}' ({})", ServiceId, Result.ErrorMessage("")); + return false; + } + + return true; +} + +bool +ConsulClient::HasService(std::string_view ServiceId) +{ + HttpClient::Response Result = m_HttpClient.Get("v1/agent/services"); + if (!Result) + { + return false; + } + + std::string ServicesJson = Result.ToText(); + // Simple string search for the service ID in the JSON response + return ServicesJson.find(fmt::format("\"{}\"", ServiceId)) != std::string::npos; +} + +std::string +ConsulClient::GetAgentServicesJson() +{ + HttpClient::Response Result = m_HttpClient.Get("v1/agent/services"); + if (!Result) + { + return "{}"; + } + return Result.ToText(); +} + +////////////////////////////////////////////////////////////////////////// + +ServiceRegistration::ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info) : m_Client(Client), m_Info(Info) +{ + m_RegistrationThread = std::thread(&ServiceRegistration::RegistrationLoop, this); +} + +ServiceRegistration::~ServiceRegistration() +{ + m_ShouldStop.store(true); + + if (m_RegistrationThread.joinable()) + { + m_RegistrationThread.join(); + } + + if (m_IsRegistered.load()) + { + if (!m_Client->DeregisterService(m_Info.ServiceId)) + { + ZEN_WARN("ServiceRegistration: Failed to deregister service '{}' during cleanup", m_Info.ServiceId); + } + } +} + +void +ServiceRegistration::RegistrationLoop() +{ + SetCurrentThreadName("ConsulRegistration"); + + // Exponential backoff delays: 100ms, 200ms, 400ms + constexpr int BackoffDelaysMs[] = {100, 200, 400}; + constexpr int MaxAttempts = 3; + constexpr int RetryIntervalMs = 5000; + + while (!m_ShouldStop.load()) + { + // Try to register with exponential backoff + bool Registered = false; + + for (int Attempt = 0; Attempt < MaxAttempts && !m_ShouldStop.load(); ++Attempt) + { + if (m_Client->RegisterService(m_Info)) + { + m_IsRegistered.store(true); + ZEN_INFO("ServiceRegistration: Successfully registered service '{}'", m_Info.ServiceId); + Registered = true; + break; + } + + // Apply backoff delay before next attempt (except after last attempt) + if (Attempt < MaxAttempts - 1) + { + Sleep(BackoffDelaysMs[Attempt]); + } + } + + if (Registered) + { + // Registration succeeded, exit the loop + return; + } + + // All attempts failed, log warning and wait before retrying + ZEN_WARN("ServiceRegistration: Failed to register service '{}' after {} attempts, retrying in {}s", + m_Info.ServiceId, + MaxAttempts, + RetryIntervalMs / 1000); + + // Wait for 5 seconds before retrying, checking for stop signal periodically + constexpr int CheckIntervalMs = 100; + int WaitedMs = 0; + while (WaitedMs < RetryIntervalMs && !m_ShouldStop.load()) + { + Sleep(CheckIntervalMs); + WaitedMs += CheckIntervalMs; + } + } +} + } // namespace zen::consul diff --git a/src/zenutil/include/zenutil/consul.h b/src/zenutil/include/zenutil/consul.h index 08871fa66..74e6abce4 100644 --- a/src/zenutil/include/zenutil/consul.h +++ b/src/zenutil/include/zenutil/consul.h @@ -5,11 +5,25 @@ #include <zenbase/zenbase.h> #include <zenhttp/httpclient.h> +#include <atomic> +#include <cstdint> #include <string> #include <string_view> +#include <thread> namespace zen::consul { +struct ServiceRegistrationInfo +{ + std::string ServiceId; + std::string ServiceName; + std::string Address; + uint16_t Port = 0; + std::string HealthEndpoint; + int HealthIntervalSeconds = 10; + int DeregisterAfterSeconds = 30; +}; + class ConsulClient { public: @@ -23,6 +37,13 @@ public: std::string GetKeyValue(std::string_view Key); void DeleteKey(std::string_view Key); + bool RegisterService(const ServiceRegistrationInfo& Info); + bool DeregisterService(std::string_view ServiceId); + + // Query methods for testing + bool HasService(std::string_view ServiceId); + std::string GetAgentServicesJson(); + private: HttpClient m_HttpClient; }; @@ -44,4 +65,34 @@ private: std::unique_ptr<Impl> m_Impl; }; +/** + * RAII wrapper for Consul service registration. + * + * Manages the lifecycle of a Consul service registration with: + * - Async registration in a background thread + * - Exponential backoff retry on failure (100ms, 200ms, 400ms) + * - Continuous background retry every 5 seconds after initial failures + * - Automatic deregistration on destruction + */ +class ServiceRegistration +{ +public: + ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info); + ~ServiceRegistration(); + + ServiceRegistration(const ServiceRegistration&) = delete; + ServiceRegistration& operator=(const ServiceRegistration&) = delete; + + bool IsRegistered() const { return m_IsRegistered.load(); } + +private: + ConsulClient* m_Client; + ServiceRegistrationInfo m_Info; + std::atomic<bool> m_IsRegistered{false}; + std::atomic<bool> m_ShouldStop{false}; + std::thread m_RegistrationThread; + + void RegistrationLoop(); +}; + } // namespace zen::consul |