aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-03 20:45:52 +0100
committerDan Engelbrecht <[email protected]>2026-03-03 20:45:52 +0100
commit610dac1a31e19edb5fb9ba9080487ad68ddf85d8 (patch)
tree8c384f4dda2eef7530e08e4a45c23b507389000c /src
parentfix objectstore uri path parsing (#801) (diff)
downloadzen-610dac1a31e19edb5fb9ba9080487ad68ddf85d8.tar.xz
zen-610dac1a31e19edb5fb9ba9080487ad68ddf85d8.zip
initial implementation from
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/diag/diagsvcs.cpp4
-rw-r--r--src/zenserver/hub/hubservice.cpp71
-rw-r--r--src/zenserver/hub/hubservice.h5
-rw-r--r--src/zenserver/hub/zenhubserver.cpp55
-rw-r--r--src/zenserver/hub/zenhubserver.h9
-rw-r--r--src/zenutil/consul/consul.cpp150
-rw-r--r--src/zenutil/include/zenutil/consul.h51
7 files changed, 335 insertions, 10 deletions
diff --git a/src/zenserver/diag/diagsvcs.cpp b/src/zenserver/diag/diagsvcs.cpp
index d8d53b0e3..e6811ef04 100644
--- a/src/zenserver/diag/diagsvcs.cpp
+++ b/src/zenserver/diag/diagsvcs.cpp
@@ -36,7 +36,7 @@ HttpHealthService::HttpHealthService()
"",
[](HttpRouterRequest& RoutedReq) {
HttpServerRequest& HttpReq = RoutedReq.ServerRequest();
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, u8"OK!"sv);
+ HttpReq.WriteResponse(HttpResponseCode::OK);
},
HttpVerb::kGet);
@@ -118,7 +118,7 @@ HttpHealthService::HandleRequest(HttpServerRequest& Request)
ZEN_MEMSCOPE(GetHealthTag());
if (!m_Router.HandleRequest(Request))
{
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, u8"OK!"sv);
+ Request.WriteResponse(HttpResponseCode::OK);
}
}
diff --git a/src/zenserver/hub/hubservice.cpp b/src/zenserver/hub/hubservice.cpp
index bf0e294c5..cc34e86bb 100644
--- a/src/zenserver/hub/hubservice.cpp
+++ b/src/zenserver/hub/hubservice.cpp
@@ -8,9 +8,9 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
-#include <zencore/process.h>
#include <zencore/scopeguard.h>
#include <zencore/system.h>
+#include <zenutil/consul.h>
#include <zenutil/zenserverprocess.h>
ZEN_THIRD_PARTY_INCLUDES_START
@@ -135,7 +135,8 @@ struct StorageServerInstance
StorageServerInstance(ZenServerEnvironment& RunEnvironment,
std::string_view ModuleId,
std::filesystem::path FileHydrationPath,
- std::filesystem::path HydrationTempPath);
+ std::filesystem::path HydrationTempPath,
+ consul::ConsulClient* ConsulClient = nullptr);
~StorageServerInstance();
void Provision();
@@ -169,6 +170,7 @@ private:
#if ZEN_PLATFORM_WINDOWS
JobObject* m_JobObject = nullptr;
#endif
+ consul::ConsulClient* m_ConsulClient;
void SpawnServerProcess();
@@ -179,10 +181,12 @@ private:
StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment,
std::string_view ModuleId,
std::filesystem::path FileHydrationPath,
- std::filesystem::path HydrationTempPath)
+ std::filesystem::path HydrationTempPath,
+ consul::ConsulClient* ConsulClient)
: m_ModuleId(ModuleId)
, m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer)
, m_HydrationPath(FileHydrationPath)
+, m_ConsulClient(ConsulClient)
{
m_BaseDir = RunEnvironment.CreateChildDir(ModuleId);
m_TempDir = HydrationTempPath / ModuleId;
@@ -234,6 +238,28 @@ StorageServerInstance::Provision()
SpawnServerProcess();
}
+ // Register with Consul if client is available
+ if (m_ConsulClient != nullptr)
+ {
+ consul::ServiceRegistrationInfo Info;
+ Info.ServiceId = m_ModuleId;
+ Info.ServiceName = "zen-service";
+ Info.Address = "localhost";
+ Info.Port = GetBasePort();
+ Info.HealthEndpoint = "health";
+ Info.HealthIntervalSeconds = 10;
+ Info.DeregisterAfterSeconds = 30;
+
+ if (!m_ConsulClient->RegisterService(Info))
+ {
+ ZEN_WARN("Failed to register storage server instance for module '{}' with Consul, continuing anyway", m_ModuleId);
+ }
+ else
+ {
+ ZEN_INFO("Registered storage server instance for module '{}' with Consul as '{}'", m_ModuleId, Info.ServiceName);
+ }
+ }
+
m_IsProvisioned = true;
}
@@ -251,6 +277,19 @@ StorageServerInstance::Deprovision()
ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId);
+ // Deregister from Consul before shutdown
+ if (m_ConsulClient != nullptr)
+ {
+ if (!m_ConsulClient->DeregisterService(m_ModuleId))
+ {
+ ZEN_WARN("Failed to deregister storage server instance for module '{}' from Consul, continuing anyway", m_ModuleId);
+ }
+ else
+ {
+ ZEN_INFO("Deregistered storage server instance for module '{}' from Consul", m_ModuleId);
+ }
+ }
+
m_ServerInstance.Shutdown();
Dehydrate();
@@ -445,9 +484,12 @@ struct HttpHubService::Impl
return false;
}
- IsNewInstance = true;
- auto NewInstance =
- std::make_unique<StorageServerInstance>(m_RunEnvironment, ModuleId, m_FileHydrationPath, m_HydrationTempPath);
+ IsNewInstance = true;
+ auto NewInstance = std::make_unique<StorageServerInstance>(m_RunEnvironment,
+ ModuleId,
+ m_FileHydrationPath,
+ m_HydrationTempPath,
+ m_ConsulClient);
#if ZEN_PLATFORM_WINDOWS
if (m_JobObject.IsValid())
{
@@ -625,6 +667,7 @@ private:
std::unordered_set<std::string> m_DeprovisioningModules;
std::unordered_set<std::string> m_ProvisioningModules;
int m_MaxInstanceCount = 0;
+ consul::ConsulClient* m_ConsulClient = nullptr;
void UpdateStats();
// Capacity tracking
@@ -635,6 +678,9 @@ private:
void UpdateCapacityMetrics();
bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason);
+
+public:
+ void SetConsulClient(consul::ConsulClient* Client) { m_ConsulClient = Client; }
};
HttpHubService::Impl::Impl()
@@ -718,7 +764,7 @@ HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem
m_Router.AddMatcher("moduleid", [](std::string_view Str) -> bool {
for (const auto C : Str)
{
- if (std::isalnum(C) || C == '-')
+ if (std::isalnum(C) || C == '-' || C == ':')
{
// fine
}
@@ -749,6 +795,11 @@ HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem
HttpVerb::kGet);
m_Router.RegisterRoute(
+ "health",
+ [](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
"modules/{moduleid}",
[this](HttpRouterRequest& Req) {
std::string_view ModuleId = Req.GetCapture(1);
@@ -874,6 +925,12 @@ HttpHubService::SetNotificationEndpoint(std::string_view UpstreamNotificationEnd
}
void
+HttpHubService::SetConsulClient(consul::ConsulClient* Client)
+{
+ m_Impl->SetConsulClient(Client);
+}
+
+void
HttpHubService::HandleRequest(zen::HttpServerRequest& Request)
{
m_Router.HandleRequest(Request);
diff --git a/src/zenserver/hub/hubservice.h b/src/zenserver/hub/hubservice.h
index ef24bba69..0c5b5adb9 100644
--- a/src/zenserver/hub/hubservice.h
+++ b/src/zenserver/hub/hubservice.h
@@ -6,6 +6,10 @@
#include "hydration.h"
+namespace zen::consul {
+class ConsulClient;
+}
+
namespace zen {
/** ZenServer Hub Service
@@ -34,6 +38,7 @@ public:
* crashes or is force-killed. Must be called before Initialize(). No-op on non-Windows.
*/
void SetUseJobObject(bool Enable);
+ void SetConsulClient(consul::ConsulClient* Client);
private:
HttpRequestRouter m_Router;
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index d0a0db417..388008350 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -36,6 +36,13 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
"Instance ID for use in notifications",
cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""),
"");
+
+ Options.add_option("hub",
+ "",
+ "consul-endpoint",
+ "Consul endpoint URL for service registration (empty = disabled)",
+ cxxopts::value<std::string>(m_ServerOptions.ConsulEndpoint)->default_value(""),
+ "");
}
void
@@ -97,6 +104,9 @@ ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState:
InitializeServices(ServerConfig);
RegisterServices(ServerConfig);
+ // Initialize Consul registration after services are set up (REQ-F-12: non-blocking, failure tolerant)
+ InitializeConsulRegistration(ServerConfig, EffectiveBasePort);
+
ZenServerBase::Finalize();
return EffectiveBasePort;
@@ -109,6 +119,10 @@ ZenHubServer::Cleanup()
ZEN_INFO(ZEN_APP_NAME " cleaning up");
try
{
+ // Deregister from Consul first (destructor triggers deregistration)
+ m_ConsulRegistration.reset();
+ m_ConsulClient.reset();
+
m_IoContext.stop();
if (m_IoRunner.joinable())
{
@@ -162,6 +176,47 @@ ZenHubServer::RegisterServices(const ZenHubServerConfig& ServerConfig)
}
void
+ZenHubServer::InitializeConsulRegistration(const ZenHubServerConfig& ServerConfig, int EffectivePort)
+{
+ if (ServerConfig.ConsulEndpoint.empty())
+ {
+ ZEN_INFO("Consul registration disabled (no endpoint configured)");
+ return;
+ }
+
+ ZEN_INFO("Initializing Consul registration with endpoint: {}", ServerConfig.ConsulEndpoint);
+
+ try
+ {
+ m_ConsulClient = std::make_unique<consul::ConsulClient>(ServerConfig.ConsulEndpoint);
+
+ consul::ServiceRegistrationInfo Info;
+ Info.ServiceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId);
+ Info.ServiceName = "zen-hub";
+ Info.Address = "localhost";
+ Info.Port = static_cast<uint16_t>(EffectivePort);
+ Info.HealthEndpoint = "hub/health";
+
+ m_ConsulRegistration = std::make_unique<consul::ServiceRegistration>(m_ConsulClient.get(), Info);
+
+ // Pass Consul client to hub service for child registration
+ if (m_HubService)
+ {
+ m_HubService->SetConsulClient(m_ConsulClient.get());
+ }
+
+ ZEN_INFO("Consul service registration initiated for service ID: {}", Info.ServiceId);
+ }
+ catch (const std::exception& Ex)
+ {
+ // REQ-F-12: Hub should start successfully even if Consul registration fails
+ ZEN_WARN("Failed to initialize Consul registration (hub will continue without it): {}", Ex.what());
+ m_ConsulRegistration.reset();
+ m_ConsulClient.reset();
+ }
+}
+
+void
ZenHubServer::Run()
{
if (m_ProcessMonitor.IsActive())
diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h
index ac14362f0..435d8d248 100644
--- a/src/zenserver/hub/zenhubserver.h
+++ b/src/zenserver/hub/zenhubserver.h
@@ -4,6 +4,8 @@
#include "zenserver.h"
+#include <zenutil/consul.h>
+
namespace cxxopts {
class Options;
}
@@ -19,7 +21,8 @@ class HttpHubService;
struct ZenHubServerConfig : public ZenServerConfig
{
std::string UpstreamNotificationEndpoint;
- std::string InstanceId; // For use in notifications
+ std::string InstanceId; // For use in notifications
+ std::string ConsulEndpoint; // If set, enables Consul service registration
};
struct ZenHubServerConfigurator : public ZenServerConfiguratorBase
@@ -84,9 +87,13 @@ private:
std::unique_ptr<HttpHubService> m_HubService;
std::unique_ptr<HttpApiService> m_ApiService;
+ std::unique_ptr<consul::ConsulClient> m_ConsulClient;
+ std::unique_ptr<consul::ServiceRegistration> m_ConsulRegistration;
+
void InitializeState(const ZenHubServerConfig& ServerConfig);
void InitializeServices(const ZenHubServerConfig& ServerConfig);
void RegisterServices(const ZenHubServerConfig& ServerConfig);
+ void InitializeConsulRegistration(const ZenHubServerConfig& ServerConfig, int EffectivePort);
};
} // namespace zen
diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp
index 6ddebf97a..ef13137d9 100644
--- a/src/zenutil/consul/consul.cpp
+++ b/src/zenutil/consul/consul.cpp
@@ -7,6 +7,7 @@
#include <zencore/logging.h>
#include <zencore/process.h>
#include <zencore/string.h>
+#include <zencore/thread.h>
#include <zencore/timer.h>
#include <fmt/format.h>
@@ -137,4 +138,153 @@ ConsulClient::DeleteKey(std::string_view Key)
}
}
+bool
+ConsulClient::RegisterService(const ServiceRegistrationInfo& Info)
+{
+ std::string JsonPayload = fmt::format(
+ R"({{"ID":"{}","Name":"{}","Address":"{}","Port":{},"Check":{{"HTTP":"http://{}:{}/{}","Interval":"{}s","DeregisterCriticalServiceAfter":"{}s"}}}})",
+ Info.ServiceId,
+ Info.ServiceName,
+ Info.Address,
+ Info.Port,
+ Info.Address,
+ Info.Port,
+ Info.HealthEndpoint,
+ Info.HealthIntervalSeconds,
+ Info.DeregisterAfterSeconds);
+
+ IoBuffer PayloadBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(JsonPayload));
+ HttpClient::Response Result = m_HttpClient.Put("v1/agent/service/register",
+ PayloadBuffer,
+ {{"Content-Type", "application/json"}, {"Accept", "application/json"}});
+
+ if (!Result)
+ {
+ ZEN_WARN("ConsulClient::RegisterService() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage(""));
+ return false;
+ }
+
+ return true;
+}
+
+bool
+ConsulClient::DeregisterService(std::string_view ServiceId)
+{
+ HttpClient::Response Result =
+ m_HttpClient.Put(fmt::format("v1/agent/service/deregister/{}", ServiceId), IoBuffer{}, {{"Accept", "application/json"}});
+
+ if (!Result)
+ {
+ ZEN_WARN("ConsulClient::DeregisterService() failed to deregister service '{}' ({})", ServiceId, Result.ErrorMessage(""));
+ return false;
+ }
+
+ return true;
+}
+
+bool
+ConsulClient::HasService(std::string_view ServiceId)
+{
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/services");
+ if (!Result)
+ {
+ return false;
+ }
+
+ std::string ServicesJson = Result.ToText();
+ // Simple string search for the service ID in the JSON response
+ return ServicesJson.find(fmt::format("\"{}\"", ServiceId)) != std::string::npos;
+}
+
+std::string
+ConsulClient::GetAgentServicesJson()
+{
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/services");
+ if (!Result)
+ {
+ return "{}";
+ }
+ return Result.ToText();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ServiceRegistration::ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info) : m_Client(Client), m_Info(Info)
+{
+ m_RegistrationThread = std::thread(&ServiceRegistration::RegistrationLoop, this);
+}
+
+ServiceRegistration::~ServiceRegistration()
+{
+ m_ShouldStop.store(true);
+
+ if (m_RegistrationThread.joinable())
+ {
+ m_RegistrationThread.join();
+ }
+
+ if (m_IsRegistered.load())
+ {
+ if (!m_Client->DeregisterService(m_Info.ServiceId))
+ {
+ ZEN_WARN("ServiceRegistration: Failed to deregister service '{}' during cleanup", m_Info.ServiceId);
+ }
+ }
+}
+
+void
+ServiceRegistration::RegistrationLoop()
+{
+ SetCurrentThreadName("ConsulRegistration");
+
+ // Exponential backoff delays: 100ms, 200ms, 400ms
+ constexpr int BackoffDelaysMs[] = {100, 200, 400};
+ constexpr int MaxAttempts = 3;
+ constexpr int RetryIntervalMs = 5000;
+
+ while (!m_ShouldStop.load())
+ {
+ // Try to register with exponential backoff
+ bool Registered = false;
+
+ for (int Attempt = 0; Attempt < MaxAttempts && !m_ShouldStop.load(); ++Attempt)
+ {
+ if (m_Client->RegisterService(m_Info))
+ {
+ m_IsRegistered.store(true);
+ ZEN_INFO("ServiceRegistration: Successfully registered service '{}'", m_Info.ServiceId);
+ Registered = true;
+ break;
+ }
+
+ // Apply backoff delay before next attempt (except after last attempt)
+ if (Attempt < MaxAttempts - 1)
+ {
+ Sleep(BackoffDelaysMs[Attempt]);
+ }
+ }
+
+ if (Registered)
+ {
+ // Registration succeeded, exit the loop
+ return;
+ }
+
+ // All attempts failed, log warning and wait before retrying
+ ZEN_WARN("ServiceRegistration: Failed to register service '{}' after {} attempts, retrying in {}s",
+ m_Info.ServiceId,
+ MaxAttempts,
+ RetryIntervalMs / 1000);
+
+ // Wait for 5 seconds before retrying, checking for stop signal periodically
+ constexpr int CheckIntervalMs = 100;
+ int WaitedMs = 0;
+ while (WaitedMs < RetryIntervalMs && !m_ShouldStop.load())
+ {
+ Sleep(CheckIntervalMs);
+ WaitedMs += CheckIntervalMs;
+ }
+ }
+}
+
} // namespace zen::consul
diff --git a/src/zenutil/include/zenutil/consul.h b/src/zenutil/include/zenutil/consul.h
index 08871fa66..74e6abce4 100644
--- a/src/zenutil/include/zenutil/consul.h
+++ b/src/zenutil/include/zenutil/consul.h
@@ -5,11 +5,25 @@
#include <zenbase/zenbase.h>
#include <zenhttp/httpclient.h>
+#include <atomic>
+#include <cstdint>
#include <string>
#include <string_view>
+#include <thread>
namespace zen::consul {
+struct ServiceRegistrationInfo
+{
+ std::string ServiceId;
+ std::string ServiceName;
+ std::string Address;
+ uint16_t Port = 0;
+ std::string HealthEndpoint;
+ int HealthIntervalSeconds = 10;
+ int DeregisterAfterSeconds = 30;
+};
+
class ConsulClient
{
public:
@@ -23,6 +37,13 @@ public:
std::string GetKeyValue(std::string_view Key);
void DeleteKey(std::string_view Key);
+ bool RegisterService(const ServiceRegistrationInfo& Info);
+ bool DeregisterService(std::string_view ServiceId);
+
+ // Query methods for testing
+ bool HasService(std::string_view ServiceId);
+ std::string GetAgentServicesJson();
+
private:
HttpClient m_HttpClient;
};
@@ -44,4 +65,34 @@ private:
std::unique_ptr<Impl> m_Impl;
};
+/**
+ * RAII wrapper for Consul service registration.
+ *
+ * Manages the lifecycle of a Consul service registration with:
+ * - Async registration in a background thread
+ * - Exponential backoff retry on failure (100ms, 200ms, 400ms)
+ * - Continuous background retry every 5 seconds after initial failures
+ * - Automatic deregistration on destruction
+ */
+class ServiceRegistration
+{
+public:
+ ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info);
+ ~ServiceRegistration();
+
+ ServiceRegistration(const ServiceRegistration&) = delete;
+ ServiceRegistration& operator=(const ServiceRegistration&) = delete;
+
+ bool IsRegistered() const { return m_IsRegistered.load(); }
+
+private:
+ ConsulClient* m_Client;
+ ServiceRegistrationInfo m_Info;
+ std::atomic<bool> m_IsRegistered{false};
+ std::atomic<bool> m_ShouldStop{false};
+ std::thread m_RegistrationThread;
+
+ void RegistrationLoop();
+};
+
} // namespace zen::consul