aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/consul/consul.cpp251
-rw-r--r--src/zenutil/include/zenutil/consul.h62
-rw-r--r--src/zenutil/include/zenutil/zenserverprocess.h25
-rw-r--r--src/zenutil/zenserverprocess.cpp33
4 files changed, 367 insertions, 4 deletions
diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp
index 6ddebf97a..272096ebb 100644
--- a/src/zenutil/consul/consul.cpp
+++ b/src/zenutil/consul/consul.cpp
@@ -2,11 +2,14 @@
#include <zenutil/consul.h>
+#include <zencore/compactbinarybuilder.h>
#include <zencore/except_fmt.h>
+#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/process.h>
#include <zencore/string.h>
+#include <zencore/thread.h>
#include <zencore/timer.h>
#include <fmt/format.h>
@@ -30,7 +33,8 @@ struct ConsulProcess::Impl
CreateProcOptions Options;
Options.Flags |= CreateProcOptions::Flag_Windows_NewProcessGroup;
- CreateProcResult Result = CreateProc("consul" ZEN_EXE_SUFFIX_LITERAL, "consul" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options);
+ const std::filesystem::path ConsulExe = GetRunningExecutablePath().parent_path() / ("consul" ZEN_EXE_SUFFIX_LITERAL);
+ CreateProcResult Result = CreateProc(ConsulExe, "consul" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options);
if (Result)
{
@@ -107,9 +111,9 @@ ConsulClient::~ConsulClient()
void
ConsulClient::SetKeyValue(std::string_view Key, std::string_view Value)
{
- IoBuffer ValueBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Value));
- HttpClient::Response Result =
- m_HttpClient.Put(fmt::format("v1/kv/{}", Key), ValueBuffer, {{"Content-Type", "text/plain"}, {"Accept", "application/json"}});
+ IoBuffer ValueBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Value));
+ ValueBuffer.SetContentType(HttpContentType::kText);
+ HttpClient::Response Result = m_HttpClient.Put(fmt::format("v1/kv/{}", Key), ValueBuffer, HttpClient::Accept(HttpContentType::kJSON));
if (!Result)
{
throw runtime_error("ConsulClient::SetKeyValue() failed to set key '{}' ({})", Key, Result.ErrorMessage(""));
@@ -137,4 +141,243 @@ ConsulClient::DeleteKey(std::string_view Key)
}
}
+bool
+ConsulClient::RegisterService(const ServiceRegistrationInfo& Info)
+{
+ using namespace std::literals;
+
+ CbObjectWriter Writer;
+ {
+ Writer.AddString("ID"sv, Info.ServiceId);
+ Writer.AddString("Name"sv, Info.ServiceName);
+ if (!Info.Address.empty())
+ {
+ Writer.AddString("Address"sv, Info.Address);
+ }
+ Writer.AddInteger("Port"sv, Info.Port);
+ if (!Info.Tags.empty())
+ {
+ Writer.BeginArray("Tags"sv);
+ for (const std::pair<std::string, std::string>& Tag : Info.Tags)
+ {
+ Writer.AddString(fmt::format("{}:{}", Tag.first, Tag.second));
+ }
+ Writer.EndArray(); // Tags
+ }
+ Writer.BeginObject("Check"sv);
+ {
+ Writer.AddString("HTTP"sv, fmt::format("http://{}:{}/{}", Info.Address, Info.Port, Info.HealthEndpoint));
+ Writer.AddString("Interval"sv, fmt::format("{}s", Info.HealthIntervalSeconds));
+ Writer.AddString("DeregisterCriticalServiceAfter"sv, fmt::format("{}s", Info.DeregisterAfterSeconds));
+ }
+ Writer.EndObject(); // Check
+ }
+
+ ExtendableStringBuilder<512> SB;
+ CompactBinaryToJson(Writer.Save(), SB);
+
+ IoBuffer PayloadBuffer(IoBuffer::Wrap, SB.Data(), SB.Size());
+ PayloadBuffer.SetContentType(HttpContentType::kJSON);
+ HttpClient::Response Result = m_HttpClient.Put("v1/agent/service/register", PayloadBuffer, HttpClient::Accept(HttpContentType::kJSON));
+
+ if (!Result)
+ {
+ ZEN_WARN("ConsulClient::RegisterService() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage(""));
+ return false;
+ }
+
+ return true;
+}
+
+bool
+ConsulClient::DeregisterService(std::string_view ServiceId)
+{
+ HttpClient::Response Result =
+ m_HttpClient.Put(fmt::format("v1/agent/service/deregister/{}", ServiceId), HttpClient::Accept(HttpContentType::kJSON));
+
+ if (!Result)
+ {
+ ZEN_WARN("ConsulClient::DeregisterService() failed to deregister service '{}' ({})", ServiceId, Result.ErrorMessage(""));
+ return false;
+ }
+
+ return true;
+}
+
+bool
+ConsulClient::FindServiceInJson(std::string_view Json, std::string_view ServiceId)
+{
+ using namespace std::literals;
+
+ std::string JsonError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(Json, JsonError);
+ if (Root && JsonError.empty())
+ {
+ for (CbFieldView RootIterator : Root)
+ {
+ CbObjectView ServiceObject = RootIterator.AsObjectView();
+ if (ServiceObject)
+ {
+ for (CbFieldView ServiceIterator : ServiceObject)
+ {
+ CbObjectView ObjectIterator = ServiceIterator.AsObjectView();
+ if (ObjectIterator["ID"sv].AsString() == ServiceId)
+ {
+ return true;
+ }
+ }
+ }
+ }
+ }
+ return false;
+}
+
+bool
+ConsulClient::HasService(std::string_view ServiceId)
+{
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/services");
+ if (!Result)
+ {
+ return false;
+ }
+ return FindServiceInJson(Result.AsText(), ServiceId);
+}
+
+bool
+ConsulClient::WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int WaitSeconds)
+{
+ // Note: m_HttpClient uses unlimited HTTP timeout (Timeout{0}); the WaitSeconds parameter
+ // governs the server-side bound on the blocking query. Do not add a separate client timeout.
+ HttpClient::KeyValueMap Parameters({{"index", std::to_string(InOutIndex)}, {"wait", fmt::format("{}s", WaitSeconds)}});
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", {}, Parameters);
+ if (!Result)
+ {
+ return false;
+ }
+
+ auto IndexIt = Result.Header->find("X-Consul-Index");
+ if (IndexIt != Result.Header->end())
+ {
+ std::optional<uint64_t> ParsedIndex = ParseInt<uint64_t>(IndexIt->second);
+ if (ParsedIndex.has_value())
+ {
+ InOutIndex = ParsedIndex.value();
+ }
+ }
+
+ return FindServiceInJson(Result.AsText(), ServiceId);
+}
+
+std::string
+ConsulClient::GetAgentServicesJson()
+{
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/services");
+ if (!Result)
+ {
+ return "{}";
+ }
+ return Result.ToText();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ServiceRegistration::ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info) : m_Client(Client), m_Info(Info)
+{
+ m_RegistrationThread = std::thread(&ServiceRegistration::RegistrationLoop, this);
+}
+
+ServiceRegistration::~ServiceRegistration()
+{
+ try
+ {
+ m_StopEvent.Set();
+
+ if (m_RegistrationThread.joinable())
+ {
+ m_RegistrationThread.join();
+ }
+
+ if (m_IsRegistered.load())
+ {
+ if (!m_Client->DeregisterService(m_Info.ServiceId))
+ {
+ ZEN_WARN("ServiceRegistration: Failed to deregister service '{}' during cleanup", m_Info.ServiceId);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR(":~ServiceRegistration() threw exception: {}", Ex.what());
+ }
+}
+
+void
+ServiceRegistration::WaitForReadyEvent(int MaxWaitMS)
+{
+ if (m_IsRegistered.load())
+ {
+ return;
+ }
+ m_ReadyWakeupEvent.Wait(MaxWaitMS);
+}
+
+void
+ServiceRegistration::RegistrationLoop()
+{
+ try
+ {
+ SetCurrentThreadName("ConsulRegistration");
+
+ // Exponential backoff delays: 100ms, 200ms, 400ms
+ constexpr int BackoffDelaysMs[] = {100, 200, 400};
+ constexpr int MaxAttempts = 3;
+ constexpr int RetryIntervalMs = 5000;
+
+ while (true)
+ {
+ bool Succeeded = false;
+ // Try to register with exponential backoff
+ for (int Attempt = 0; Attempt < MaxAttempts; ++Attempt)
+ {
+ if (m_Client->RegisterService(m_Info))
+ {
+ Succeeded = true;
+ break;
+ }
+
+ if (m_StopEvent.Wait(BackoffDelaysMs[Attempt]))
+ {
+ return;
+ }
+ }
+
+ if (Succeeded || m_Client->RegisterService(m_Info))
+ {
+ break;
+ }
+
+ // All attempts failed, log warning and wait before retrying
+ ZEN_WARN("ServiceRegistration: Failed to register service '{}' after {} attempts, retrying in {}s",
+ m_Info.ServiceId,
+ MaxAttempts + 1,
+ RetryIntervalMs / 1000);
+
+ // Wait for 5 seconds before retrying; m_StopEvent.Set() in the destructor
+ // wakes this immediately so the thread exits without polling.
+ if (m_StopEvent.Wait(RetryIntervalMs))
+ {
+ return;
+ }
+ }
+
+ ZEN_INFO("ServiceRegistration: Successfully registered service '{}'", m_Info.ServiceId);
+ m_IsRegistered.store(true);
+ m_ReadyWakeupEvent.Set();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("ServiceRegistration::RegistrationLoop failed with exception: {}", Ex.what());
+ }
+}
+
} // namespace zen::consul
diff --git a/src/zenutil/include/zenutil/consul.h b/src/zenutil/include/zenutil/consul.h
index 08871fa66..b5bfa42d1 100644
--- a/src/zenutil/include/zenutil/consul.h
+++ b/src/zenutil/include/zenutil/consul.h
@@ -5,11 +5,26 @@
#include <zenbase/zenbase.h>
#include <zenhttp/httpclient.h>
+#include <atomic>
+#include <cstdint>
#include <string>
#include <string_view>
+#include <thread>
namespace zen::consul {
+struct ServiceRegistrationInfo
+{
+ std::string ServiceId;
+ std::string ServiceName;
+ std::string Address;
+ uint16_t Port = 0;
+ std::string HealthEndpoint;
+ std::vector<std::pair<std::string, std::string>> Tags;
+ int HealthIntervalSeconds = 10;
+ int DeregisterAfterSeconds = 30;
+};
+
class ConsulClient
{
public:
@@ -23,7 +38,22 @@ public:
std::string GetKeyValue(std::string_view Key);
void DeleteKey(std::string_view Key);
+ bool RegisterService(const ServiceRegistrationInfo& Info);
+ bool DeregisterService(std::string_view ServiceId);
+
+ // Query methods for testing
+ bool HasService(std::string_view ServiceId);
+ std::string GetAgentServicesJson();
+
+ // Blocking query on v1/agent/services. Blocks until the service list changes or
+ // the wait period expires. InOutIndex must be 0 for the first call; it is updated
+ // to the new X-Consul-Index value on success (left unchanged on error).
+ // Returns true if ServiceId is present in the response.
+ bool WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int WaitSeconds);
+
private:
+ static bool FindServiceInJson(std::string_view Json, std::string_view ServiceId);
+
HttpClient m_HttpClient;
};
@@ -44,4 +74,36 @@ private:
std::unique_ptr<Impl> m_Impl;
};
+/**
+ * RAII wrapper for Consul service registration.
+ *
+ * Manages the lifecycle of a Consul service registration with:
+ * - Async registration in a background thread
+ * - Exponential backoff retry on failure (100ms, 200ms, 400ms)
+ * - Continuous background retry every 5 seconds after initial failures until registration succeeds
+ * - Automatic deregistration on destruction
+ */
+class ServiceRegistration
+{
+public:
+ ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info);
+ ~ServiceRegistration();
+
+ ServiceRegistration(const ServiceRegistration&) = delete;
+ ServiceRegistration& operator=(const ServiceRegistration&) = delete;
+
+ bool IsRegistered() const { return m_IsRegistered.load(); }
+ void WaitForReadyEvent(int MaxWaitMS);
+
+private:
+ ConsulClient* m_Client;
+ ServiceRegistrationInfo m_Info;
+ std::atomic<bool> m_IsRegistered{false};
+ std::thread m_RegistrationThread;
+ Event m_ReadyWakeupEvent;
+ Event m_StopEvent;
+
+ void RegistrationLoop();
+};
+
} // namespace zen::consul
diff --git a/src/zenutil/include/zenutil/zenserverprocess.h b/src/zenutil/include/zenutil/zenserverprocess.h
index 2a8617162..1b8750628 100644
--- a/src/zenutil/include/zenutil/zenserverprocess.h
+++ b/src/zenutil/include/zenutil/zenserverprocess.h
@@ -29,9 +29,34 @@ class CbObject;
class ZenServerEnvironment
{
public:
+ enum EStorageTag
+ {
+ Storage
+ };
+ enum EHubTag
+ {
+ Hub
+ };
+ enum ETestTag
+ {
+ Test
+ };
+
ZenServerEnvironment();
~ZenServerEnvironment();
+ ZenServerEnvironment(ZenServerEnvironment&&);
+
+ ZenServerEnvironment(EStorageTag, std::filesystem::path ProgramBaseDir);
+ ZenServerEnvironment(EHubTag,
+ std::filesystem::path ProgramBaseDir,
+ std::filesystem::path TestBaseDir,
+ std::string_view ServerClass = "");
+ ZenServerEnvironment(ETestTag,
+ std::filesystem::path ProgramBaseDir,
+ std::filesystem::path TestBaseDir,
+ std::string_view ServerClass = "");
+
void Initialize(std::filesystem::path ProgramBaseDir);
void InitializeForTest(std::filesystem::path ProgramBaseDir, std::filesystem::path TestBaseDir, std::string_view ServerClass = "");
void InitializeForHub(std::filesystem::path ProgramBaseDir, std::filesystem::path TestBaseDir, std::string_view ServerClass = "");
diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp
index b09c2d89a..b9c50be4f 100644
--- a/src/zenutil/zenserverprocess.cpp
+++ b/src/zenutil/zenserverprocess.cpp
@@ -503,6 +503,39 @@ ZenServerEnvironment::~ZenServerEnvironment()
{
}
+ZenServerEnvironment::ZenServerEnvironment(ZenServerEnvironment&& Other)
+: m_ProgramBaseDir(std::move(Other.m_ProgramBaseDir))
+, m_ChildProcessBaseDir(std::move(Other.m_ChildProcessBaseDir))
+, m_IsInitialized(Other.m_IsInitialized)
+, m_IsTestInstance(Other.m_IsTestInstance)
+, m_IsHubInstance(Other.m_IsHubInstance)
+, m_PassthroughOutput(Other.m_PassthroughOutput)
+, m_ServerClass(std::move(Other.m_ServerClass))
+, m_NextPortNumber(Other.m_NextPortNumber.load())
+{
+}
+
+ZenServerEnvironment::ZenServerEnvironment(EStorageTag, std::filesystem::path ProgramBaseDir)
+{
+ Initialize(ProgramBaseDir);
+}
+
+ZenServerEnvironment::ZenServerEnvironment(EHubTag,
+ std::filesystem::path ProgramBaseDir,
+ std::filesystem::path TestBaseDir,
+ std::string_view ServerClass)
+{
+ InitializeForHub(ProgramBaseDir, TestBaseDir, ServerClass);
+}
+
+ZenServerEnvironment::ZenServerEnvironment(ETestTag,
+ std::filesystem::path ProgramBaseDir,
+ std::filesystem::path TestBaseDir,
+ std::string_view ServerClass)
+{
+ InitializeForTest(ProgramBaseDir, TestBaseDir, ServerClass);
+}
+
void
ZenServerEnvironment::Initialize(std::filesystem::path ProgramBaseDir)
{