diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-11 09:45:31 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-11 09:45:31 +0100 |
| commit | e9d04250225a430cffed28e6a49299e3da542f97 (patch) | |
| tree | 7f99594bd930c072e159458319e822bbb18794f7 /src/zenserver/hub/zenhubserver.cpp | |
| parent | minor zenstore/blockstore fixes (#821) (diff) | |
| download | zen-e9d04250225a430cffed28e6a49299e3da542f97.tar.xz zen-e9d04250225a430cffed28e6a49299e3da542f97.zip | |
hub consul integration (#820)
- Feature: Basic consul integration for zenserver hub mode, restricted to host local consul agent and register/deregister of services
- Feature: Added new options to zenserver hub mode
- `consul-endpoint` - Consul endpoint URL for service registration (empty = disabled)
- `hub-base-port-number` - Base port number for provisioned instances
- `hub-instance-limit` - Maximum number of provisioned instances for this hub
- `hub-use-job-object` - Enable the use of a Windows Job Object for child process management (Windows only)
Diffstat (limited to 'src/zenserver/hub/zenhubserver.cpp')
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 155 |
1 files changed, 153 insertions, 2 deletions
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index 64ab0bd9e..a019ff295 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -3,9 +3,10 @@ #include "zenhubserver.h" #include "frontend/frontend.h" +#include "httphubservice.h" #include "hub.h" -#include "hubservice.h" +#include <zencore/config.h> #include <zencore/fmtutils.h> #include <zencore/memory/llm.h> #include <zencore/memory/memorytrace.h> @@ -38,6 +39,35 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) "Instance ID for use in notifications", cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""), ""); + + Options.add_option("hub", + "", + "consul-endpoint", + "Consul endpoint URL for service registration (empty = disabled)", + cxxopts::value<std::string>(m_ServerOptions.ConsulEndpoint)->default_value(""), + ""); + + Options.add_option("hub", + "", + "hub-base-port-number", + "Base port number for provisioned instances", + cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber)->default_value("21000"), + ""); + + Options.add_option("hub", + "", + "hub-instance-limit", + "Maximum number of provisioned instances for this hub", + cxxopts::value<int>(m_ServerOptions.HubInstanceLimit)->default_value("1000"), + ""); +#if ZEN_PLATFORM_WINDOWS + Options.add_option("hub", + "", + "hub-use-job-object", + "Enable the use of a Windows Job Object for child process management", + cxxopts::value<bool>(m_ServerOptions.HubUseJobObject)->default_value("true"), + ""); +#endif // ZEN_PLATFORM_WINDOWS } void @@ -74,6 +104,56 @@ ZenHubServer::~ZenHubServer() Cleanup(); } +void +ZenHubServer::OnProvisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) +{ + if (m_ConsulClient) + { + consul::ServiceRegistrationInfo ServiceInfo{ + .ServiceId = std::string(ModuleId), + .ServiceName = "zen-storage", + // .Address = "localhost", // Let the consul agent figure out out external address // TODO: Info.BaseUri? + .Port = Info.Port, + .HealthEndpoint = "health", + .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)), + std::make_pair("zen-hub", std::string(HubInstanceId)), + std::make_pair("version", std::string(ZEN_CFG_VERSION))}, + .HealthIntervalSeconds = 10, + .DeregisterAfterSeconds = 30}; + + if (!m_ConsulClient->RegisterService(ServiceInfo)) + { + ZEN_WARN("Failed to register storage server instance for module '{}' with Consul, continuing anyway", ModuleId); + } + else + { + ZEN_INFO("Registered storage server instance for module '{}' at port {} with Consul as '{}'", + ModuleId, + Info.Port, + ServiceInfo.ServiceName); + } + } +} + +void +ZenHubServer::OnDeprovisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) +{ + ZEN_UNUSED(HubInstanceId); + if (m_ConsulClient) + { + if (!m_ConsulClient->DeregisterService(ModuleId)) + { + ZEN_WARN("Failed to deregister storage server instance for module '{}' at port {} from Consul, continuing anyway", + ModuleId, + Info.Port); + } + else + { + ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port); + } + } +} + int ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry) { @@ -96,6 +176,7 @@ ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState: m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; InitializeState(ServerConfig); + InitializeConsulRegistration(ServerConfig, EffectiveBasePort); InitializeServices(ServerConfig); RegisterServices(ServerConfig); @@ -127,6 +208,9 @@ ZenHubServer::Cleanup() m_HubService.reset(); m_ApiService.reset(); m_Hub.reset(); + + m_ConsulRegistration.reset(); + m_ConsulClient.reset(); } catch (const std::exception& Ex) { @@ -147,7 +231,18 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) ZEN_INFO("instantiating Hub"); m_Hub = std::make_unique<Hub>( - Hub::Configuration{.HubBaseDir = ServerConfig.DataDir / "hub", .ChildBaseDir = ServerConfig.DataDir / "servers"}); + Hub::Configuration{.UseJobObject = ServerConfig.HubUseJobObject, + .BasePortNumber = ServerConfig.HubBasePortNumber, + .InstanceLimit = ServerConfig.HubInstanceLimit}, + ZenServerEnvironment(ZenServerEnvironment::Hub, ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers"), + m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)]( + std::string_view ModuleId, + const HubProvisionedInstanceInfo& Info) { OnProvisioned(HubInstanceId, ModuleId, Info); } + : Hub::ProvisionModuleCallbackFunc{}, + m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)]( + std::string_view ModuleId, + const HubProvisionedInstanceInfo& Info) { OnDeprovisioned(HubInstanceId, ModuleId, Info); } + : Hub::ProvisionModuleCallbackFunc{}); ZEN_INFO("instantiating API service"); m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http); @@ -181,6 +276,47 @@ ZenHubServer::RegisterServices(const ZenHubServerConfig& ServerConfig) } void +ZenHubServer::InitializeConsulRegistration(const ZenHubServerConfig& ServerConfig, int EffectivePort) +{ + if (ServerConfig.ConsulEndpoint.empty()) + { + ZEN_INFO("Consul registration disabled (no endpoint configured)"); + return; + } + + ZEN_INFO("Initializing Consul registration with endpoint: {}", ServerConfig.ConsulEndpoint); + + try + { + m_ConsulClient = std::make_unique<consul::ConsulClient>(ServerConfig.ConsulEndpoint); + + consul::ServiceRegistrationInfo Info; + Info.ServiceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId); + Info.ServiceName = "zen-hub"; + // Info.Address = "localhost"; // Let the consul agent figure out out external address // TODO: Info.BaseUri? + Info.Port = static_cast<uint16_t>(EffectivePort); + Info.HealthEndpoint = "hub/health"; + Info.Tags = std::vector<std::pair<std::string, std::string>>{ + std::make_pair("zen-hub", Info.ServiceId), + std::make_pair("version", std::string(ZEN_CFG_VERSION)), + std::make_pair("base-port-number", fmt::format("{}", ServerConfig.HubBasePortNumber)), + std::make_pair("instance-limit", fmt::format("{}", ServerConfig.HubInstanceLimit)), + std::make_pair("use-job-object", fmt::format("{}", ServerConfig.HubUseJobObject))}; + + m_ConsulRegistration = std::make_unique<consul::ServiceRegistration>(m_ConsulClient.get(), Info); + + ZEN_INFO("Consul service registration initiated for service ID: {}", Info.ServiceId); + } + catch (const std::exception& Ex) + { + // REQ-F-12: Hub should start successfully even if Consul registration fails + ZEN_WARN("Failed to initialize Consul registration (hub will continue without it): {}", Ex.what()); + m_ConsulRegistration.reset(); + m_ConsulClient.reset(); + } +} + +void ZenHubServer::Run() { if (m_ProcessMonitor.IsActive()) @@ -228,6 +364,21 @@ ZenHubServer::Run() const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode; + if (m_ConsulRegistration) + { + if (!m_ConsulRegistration->IsRegistered()) + { + ZEN_INFO("Waiting for consul integration to register..."); + m_ConsulRegistration->WaitForReadyEvent(2000); + } + if (!m_ConsulRegistration->IsRegistered()) + { + m_ConsulClient.reset(); + m_ConsulRegistration.reset(); + ZEN_WARN("Consul registration failed, running without consul integration"); + } + } + SetNewState(kRunning); OnReady(); |