aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub/zenhubserver.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-11 09:45:31 +0100
committerGitHub Enterprise <[email protected]>2026-03-11 09:45:31 +0100
commite9d04250225a430cffed28e6a49299e3da542f97 (patch)
tree7f99594bd930c072e159458319e822bbb18794f7 /src/zenserver/hub/zenhubserver.cpp
parentminor zenstore/blockstore fixes (#821) (diff)
downloadzen-e9d04250225a430cffed28e6a49299e3da542f97.tar.xz
zen-e9d04250225a430cffed28e6a49299e3da542f97.zip
hub consul integration (#820)
- Feature: Basic consul integration for zenserver hub mode, restricted to host local consul agent and register/deregister of services - Feature: Added new options to zenserver hub mode - `consul-endpoint` - Consul endpoint URL for service registration (empty = disabled) - `hub-base-port-number` - Base port number for provisioned instances - `hub-instance-limit` - Maximum number of provisioned instances for this hub - `hub-use-job-object` - Enable the use of a Windows Job Object for child process management (Windows only)
Diffstat (limited to 'src/zenserver/hub/zenhubserver.cpp')
-rw-r--r--src/zenserver/hub/zenhubserver.cpp155
1 files changed, 153 insertions, 2 deletions
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index 64ab0bd9e..a019ff295 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -3,9 +3,10 @@
#include "zenhubserver.h"
#include "frontend/frontend.h"
+#include "httphubservice.h"
#include "hub.h"
-#include "hubservice.h"
+#include <zencore/config.h>
#include <zencore/fmtutils.h>
#include <zencore/memory/llm.h>
#include <zencore/memory/memorytrace.h>
@@ -38,6 +39,35 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
"Instance ID for use in notifications",
cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""),
"");
+
+ Options.add_option("hub",
+ "",
+ "consul-endpoint",
+ "Consul endpoint URL for service registration (empty = disabled)",
+ cxxopts::value<std::string>(m_ServerOptions.ConsulEndpoint)->default_value(""),
+ "");
+
+ Options.add_option("hub",
+ "",
+ "hub-base-port-number",
+ "Base port number for provisioned instances",
+ cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber)->default_value("21000"),
+ "");
+
+ Options.add_option("hub",
+ "",
+ "hub-instance-limit",
+ "Maximum number of provisioned instances for this hub",
+ cxxopts::value<int>(m_ServerOptions.HubInstanceLimit)->default_value("1000"),
+ "");
+#if ZEN_PLATFORM_WINDOWS
+ Options.add_option("hub",
+ "",
+ "hub-use-job-object",
+ "Enable the use of a Windows Job Object for child process management",
+ cxxopts::value<bool>(m_ServerOptions.HubUseJobObject)->default_value("true"),
+ "");
+#endif // ZEN_PLATFORM_WINDOWS
}
void
@@ -74,6 +104,56 @@ ZenHubServer::~ZenHubServer()
Cleanup();
}
+void
+ZenHubServer::OnProvisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info)
+{
+ if (m_ConsulClient)
+ {
+ consul::ServiceRegistrationInfo ServiceInfo{
+ .ServiceId = std::string(ModuleId),
+ .ServiceName = "zen-storage",
+ // .Address = "localhost", // Let the consul agent figure out out external address // TODO: Info.BaseUri?
+ .Port = Info.Port,
+ .HealthEndpoint = "health",
+ .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)),
+ std::make_pair("zen-hub", std::string(HubInstanceId)),
+ std::make_pair("version", std::string(ZEN_CFG_VERSION))},
+ .HealthIntervalSeconds = 10,
+ .DeregisterAfterSeconds = 30};
+
+ if (!m_ConsulClient->RegisterService(ServiceInfo))
+ {
+ ZEN_WARN("Failed to register storage server instance for module '{}' with Consul, continuing anyway", ModuleId);
+ }
+ else
+ {
+ ZEN_INFO("Registered storage server instance for module '{}' at port {} with Consul as '{}'",
+ ModuleId,
+ Info.Port,
+ ServiceInfo.ServiceName);
+ }
+ }
+}
+
+void
+ZenHubServer::OnDeprovisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info)
+{
+ ZEN_UNUSED(HubInstanceId);
+ if (m_ConsulClient)
+ {
+ if (!m_ConsulClient->DeregisterService(ModuleId))
+ {
+ ZEN_WARN("Failed to deregister storage server instance for module '{}' at port {} from Consul, continuing anyway",
+ ModuleId,
+ Info.Port);
+ }
+ else
+ {
+ ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port);
+ }
+ }
+}
+
int
ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry)
{
@@ -96,6 +176,7 @@ ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState:
m_DebugOptionForcedCrash = ServerConfig.ShouldCrash;
InitializeState(ServerConfig);
+ InitializeConsulRegistration(ServerConfig, EffectiveBasePort);
InitializeServices(ServerConfig);
RegisterServices(ServerConfig);
@@ -127,6 +208,9 @@ ZenHubServer::Cleanup()
m_HubService.reset();
m_ApiService.reset();
m_Hub.reset();
+
+ m_ConsulRegistration.reset();
+ m_ConsulClient.reset();
}
catch (const std::exception& Ex)
{
@@ -147,7 +231,18 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
ZEN_INFO("instantiating Hub");
m_Hub = std::make_unique<Hub>(
- Hub::Configuration{.HubBaseDir = ServerConfig.DataDir / "hub", .ChildBaseDir = ServerConfig.DataDir / "servers"});
+ Hub::Configuration{.UseJobObject = ServerConfig.HubUseJobObject,
+ .BasePortNumber = ServerConfig.HubBasePortNumber,
+ .InstanceLimit = ServerConfig.HubInstanceLimit},
+ ZenServerEnvironment(ZenServerEnvironment::Hub, ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers"),
+ m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](
+ std::string_view ModuleId,
+ const HubProvisionedInstanceInfo& Info) { OnProvisioned(HubInstanceId, ModuleId, Info); }
+ : Hub::ProvisionModuleCallbackFunc{},
+ m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](
+ std::string_view ModuleId,
+ const HubProvisionedInstanceInfo& Info) { OnDeprovisioned(HubInstanceId, ModuleId, Info); }
+ : Hub::ProvisionModuleCallbackFunc{});
ZEN_INFO("instantiating API service");
m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http);
@@ -181,6 +276,47 @@ ZenHubServer::RegisterServices(const ZenHubServerConfig& ServerConfig)
}
void
+ZenHubServer::InitializeConsulRegistration(const ZenHubServerConfig& ServerConfig, int EffectivePort)
+{
+ if (ServerConfig.ConsulEndpoint.empty())
+ {
+ ZEN_INFO("Consul registration disabled (no endpoint configured)");
+ return;
+ }
+
+ ZEN_INFO("Initializing Consul registration with endpoint: {}", ServerConfig.ConsulEndpoint);
+
+ try
+ {
+ m_ConsulClient = std::make_unique<consul::ConsulClient>(ServerConfig.ConsulEndpoint);
+
+ consul::ServiceRegistrationInfo Info;
+ Info.ServiceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId);
+ Info.ServiceName = "zen-hub";
+ // Info.Address = "localhost"; // Let the consul agent figure out out external address // TODO: Info.BaseUri?
+ Info.Port = static_cast<uint16_t>(EffectivePort);
+ Info.HealthEndpoint = "hub/health";
+ Info.Tags = std::vector<std::pair<std::string, std::string>>{
+ std::make_pair("zen-hub", Info.ServiceId),
+ std::make_pair("version", std::string(ZEN_CFG_VERSION)),
+ std::make_pair("base-port-number", fmt::format("{}", ServerConfig.HubBasePortNumber)),
+ std::make_pair("instance-limit", fmt::format("{}", ServerConfig.HubInstanceLimit)),
+ std::make_pair("use-job-object", fmt::format("{}", ServerConfig.HubUseJobObject))};
+
+ m_ConsulRegistration = std::make_unique<consul::ServiceRegistration>(m_ConsulClient.get(), Info);
+
+ ZEN_INFO("Consul service registration initiated for service ID: {}", Info.ServiceId);
+ }
+ catch (const std::exception& Ex)
+ {
+ // REQ-F-12: Hub should start successfully even if Consul registration fails
+ ZEN_WARN("Failed to initialize Consul registration (hub will continue without it): {}", Ex.what());
+ m_ConsulRegistration.reset();
+ m_ConsulClient.reset();
+ }
+}
+
+void
ZenHubServer::Run()
{
if (m_ProcessMonitor.IsActive())
@@ -228,6 +364,21 @@ ZenHubServer::Run()
const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode;
+ if (m_ConsulRegistration)
+ {
+ if (!m_ConsulRegistration->IsRegistered())
+ {
+ ZEN_INFO("Waiting for consul integration to register...");
+ m_ConsulRegistration->WaitForReadyEvent(2000);
+ }
+ if (!m_ConsulRegistration->IsRegistered())
+ {
+ m_ConsulClient.reset();
+ m_ConsulRegistration.reset();
+ ZEN_WARN("Consul registration failed, running without consul integration");
+ }
+ }
+
SetNewState(kRunning);
OnReady();