aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-03 20:45:52 +0100
committerDan Engelbrecht <[email protected]>2026-03-03 20:45:52 +0100
commit610dac1a31e19edb5fb9ba9080487ad68ddf85d8 (patch)
tree8c384f4dda2eef7530e08e4a45c23b507389000c /src/zenutil
parentfix objectstore uri path parsing (#801) (diff)
downloadzen-610dac1a31e19edb5fb9ba9080487ad68ddf85d8.tar.xz
zen-610dac1a31e19edb5fb9ba9080487ad68ddf85d8.zip
initial implementation from
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/consul/consul.cpp150
-rw-r--r--src/zenutil/include/zenutil/consul.h51
2 files changed, 201 insertions, 0 deletions
diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp
index 6ddebf97a..ef13137d9 100644
--- a/src/zenutil/consul/consul.cpp
+++ b/src/zenutil/consul/consul.cpp
@@ -7,6 +7,7 @@
#include <zencore/logging.h>
#include <zencore/process.h>
#include <zencore/string.h>
+#include <zencore/thread.h>
#include <zencore/timer.h>
#include <fmt/format.h>
@@ -137,4 +138,153 @@ ConsulClient::DeleteKey(std::string_view Key)
}
}
+bool
+ConsulClient::RegisterService(const ServiceRegistrationInfo& Info)
+{
+ std::string JsonPayload = fmt::format(
+ R"({{"ID":"{}","Name":"{}","Address":"{}","Port":{},"Check":{{"HTTP":"http://{}:{}/{}","Interval":"{}s","DeregisterCriticalServiceAfter":"{}s"}}}})",
+ Info.ServiceId,
+ Info.ServiceName,
+ Info.Address,
+ Info.Port,
+ Info.Address,
+ Info.Port,
+ Info.HealthEndpoint,
+ Info.HealthIntervalSeconds,
+ Info.DeregisterAfterSeconds);
+
+ IoBuffer PayloadBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(JsonPayload));
+ HttpClient::Response Result = m_HttpClient.Put("v1/agent/service/register",
+ PayloadBuffer,
+ {{"Content-Type", "application/json"}, {"Accept", "application/json"}});
+
+ if (!Result)
+ {
+ ZEN_WARN("ConsulClient::RegisterService() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage(""));
+ return false;
+ }
+
+ return true;
+}
+
+bool
+ConsulClient::DeregisterService(std::string_view ServiceId)
+{
+ HttpClient::Response Result =
+ m_HttpClient.Put(fmt::format("v1/agent/service/deregister/{}", ServiceId), IoBuffer{}, {{"Accept", "application/json"}});
+
+ if (!Result)
+ {
+ ZEN_WARN("ConsulClient::DeregisterService() failed to deregister service '{}' ({})", ServiceId, Result.ErrorMessage(""));
+ return false;
+ }
+
+ return true;
+}
+
+bool
+ConsulClient::HasService(std::string_view ServiceId)
+{
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/services");
+ if (!Result)
+ {
+ return false;
+ }
+
+ std::string ServicesJson = Result.ToText();
+ // Simple string search for the service ID in the JSON response
+ return ServicesJson.find(fmt::format("\"{}\"", ServiceId)) != std::string::npos;
+}
+
+std::string
+ConsulClient::GetAgentServicesJson()
+{
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/services");
+ if (!Result)
+ {
+ return "{}";
+ }
+ return Result.ToText();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ServiceRegistration::ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info) : m_Client(Client), m_Info(Info)
+{
+ m_RegistrationThread = std::thread(&ServiceRegistration::RegistrationLoop, this);
+}
+
+ServiceRegistration::~ServiceRegistration()
+{
+ m_ShouldStop.store(true);
+
+ if (m_RegistrationThread.joinable())
+ {
+ m_RegistrationThread.join();
+ }
+
+ if (m_IsRegistered.load())
+ {
+ if (!m_Client->DeregisterService(m_Info.ServiceId))
+ {
+ ZEN_WARN("ServiceRegistration: Failed to deregister service '{}' during cleanup", m_Info.ServiceId);
+ }
+ }
+}
+
+void
+ServiceRegistration::RegistrationLoop()
+{
+ SetCurrentThreadName("ConsulRegistration");
+
+ // Exponential backoff delays: 100ms, 200ms, 400ms
+ constexpr int BackoffDelaysMs[] = {100, 200, 400};
+ constexpr int MaxAttempts = 3;
+ constexpr int RetryIntervalMs = 5000;
+
+ while (!m_ShouldStop.load())
+ {
+ // Try to register with exponential backoff
+ bool Registered = false;
+
+ for (int Attempt = 0; Attempt < MaxAttempts && !m_ShouldStop.load(); ++Attempt)
+ {
+ if (m_Client->RegisterService(m_Info))
+ {
+ m_IsRegistered.store(true);
+ ZEN_INFO("ServiceRegistration: Successfully registered service '{}'", m_Info.ServiceId);
+ Registered = true;
+ break;
+ }
+
+ // Apply backoff delay before next attempt (except after last attempt)
+ if (Attempt < MaxAttempts - 1)
+ {
+ Sleep(BackoffDelaysMs[Attempt]);
+ }
+ }
+
+ if (Registered)
+ {
+ // Registration succeeded, exit the loop
+ return;
+ }
+
+ // All attempts failed, log warning and wait before retrying
+ ZEN_WARN("ServiceRegistration: Failed to register service '{}' after {} attempts, retrying in {}s",
+ m_Info.ServiceId,
+ MaxAttempts,
+ RetryIntervalMs / 1000);
+
+ // Wait for 5 seconds before retrying, checking for stop signal periodically
+ constexpr int CheckIntervalMs = 100;
+ int WaitedMs = 0;
+ while (WaitedMs < RetryIntervalMs && !m_ShouldStop.load())
+ {
+ Sleep(CheckIntervalMs);
+ WaitedMs += CheckIntervalMs;
+ }
+ }
+}
+
} // namespace zen::consul
diff --git a/src/zenutil/include/zenutil/consul.h b/src/zenutil/include/zenutil/consul.h
index 08871fa66..74e6abce4 100644
--- a/src/zenutil/include/zenutil/consul.h
+++ b/src/zenutil/include/zenutil/consul.h
@@ -5,11 +5,25 @@
#include <zenbase/zenbase.h>
#include <zenhttp/httpclient.h>
+#include <atomic>
+#include <cstdint>
#include <string>
#include <string_view>
+#include <thread>
namespace zen::consul {
+struct ServiceRegistrationInfo
+{
+ std::string ServiceId;
+ std::string ServiceName;
+ std::string Address;
+ uint16_t Port = 0;
+ std::string HealthEndpoint;
+ int HealthIntervalSeconds = 10;
+ int DeregisterAfterSeconds = 30;
+};
+
class ConsulClient
{
public:
@@ -23,6 +37,13 @@ public:
std::string GetKeyValue(std::string_view Key);
void DeleteKey(std::string_view Key);
+ bool RegisterService(const ServiceRegistrationInfo& Info);
+ bool DeregisterService(std::string_view ServiceId);
+
+ // Query methods for testing
+ bool HasService(std::string_view ServiceId);
+ std::string GetAgentServicesJson();
+
private:
HttpClient m_HttpClient;
};
@@ -44,4 +65,34 @@ private:
std::unique_ptr<Impl> m_Impl;
};
+/**
+ * RAII wrapper for Consul service registration.
+ *
+ * Manages the lifecycle of a Consul service registration with:
+ * - Async registration in a background thread
+ * - Exponential backoff retry on failure (100ms, 200ms, 400ms)
+ * - Continuous background retry every 5 seconds after initial failures
+ * - Automatic deregistration on destruction
+ */
+class ServiceRegistration
+{
+public:
+ ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info);
+ ~ServiceRegistration();
+
+ ServiceRegistration(const ServiceRegistration&) = delete;
+ ServiceRegistration& operator=(const ServiceRegistration&) = delete;
+
+ bool IsRegistered() const { return m_IsRegistered.load(); }
+
+private:
+ ConsulClient* m_Client;
+ ServiceRegistrationInfo m_Info;
+ std::atomic<bool> m_IsRegistered{false};
+ std::atomic<bool> m_ShouldStop{false};
+ std::thread m_RegistrationThread;
+
+ void RegistrationLoop();
+};
+
} // namespace zen::consul