diff options
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/consul/consul.cpp | 251 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/consul.h | 62 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/zenserverprocess.h | 25 | ||||
| -rw-r--r-- | src/zenutil/zenserverprocess.cpp | 33 |
4 files changed, 367 insertions, 4 deletions
diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp index 6ddebf97a..272096ebb 100644 --- a/src/zenutil/consul/consul.cpp +++ b/src/zenutil/consul/consul.cpp @@ -2,11 +2,14 @@ #include <zenutil/consul.h> +#include <zencore/compactbinarybuilder.h> #include <zencore/except_fmt.h> +#include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/process.h> #include <zencore/string.h> +#include <zencore/thread.h> #include <zencore/timer.h> #include <fmt/format.h> @@ -30,7 +33,8 @@ struct ConsulProcess::Impl CreateProcOptions Options; Options.Flags |= CreateProcOptions::Flag_Windows_NewProcessGroup; - CreateProcResult Result = CreateProc("consul" ZEN_EXE_SUFFIX_LITERAL, "consul" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options); + const std::filesystem::path ConsulExe = GetRunningExecutablePath().parent_path() / ("consul" ZEN_EXE_SUFFIX_LITERAL); + CreateProcResult Result = CreateProc(ConsulExe, "consul" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options); if (Result) { @@ -107,9 +111,9 @@ ConsulClient::~ConsulClient() void ConsulClient::SetKeyValue(std::string_view Key, std::string_view Value) { - IoBuffer ValueBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Value)); - HttpClient::Response Result = - m_HttpClient.Put(fmt::format("v1/kv/{}", Key), ValueBuffer, {{"Content-Type", "text/plain"}, {"Accept", "application/json"}}); + IoBuffer ValueBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Value)); + ValueBuffer.SetContentType(HttpContentType::kText); + HttpClient::Response Result = m_HttpClient.Put(fmt::format("v1/kv/{}", Key), ValueBuffer, HttpClient::Accept(HttpContentType::kJSON)); if (!Result) { throw runtime_error("ConsulClient::SetKeyValue() failed to set key '{}' ({})", Key, Result.ErrorMessage("")); @@ -137,4 +141,243 @@ ConsulClient::DeleteKey(std::string_view Key) } } +bool +ConsulClient::RegisterService(const ServiceRegistrationInfo& Info) +{ + using namespace std::literals; + + CbObjectWriter Writer; + { + Writer.AddString("ID"sv, Info.ServiceId); + Writer.AddString("Name"sv, Info.ServiceName); + if (!Info.Address.empty()) + { + Writer.AddString("Address"sv, Info.Address); + } + Writer.AddInteger("Port"sv, Info.Port); + if (!Info.Tags.empty()) + { + Writer.BeginArray("Tags"sv); + for (const std::pair<std::string, std::string>& Tag : Info.Tags) + { + Writer.AddString(fmt::format("{}:{}", Tag.first, Tag.second)); + } + Writer.EndArray(); // Tags + } + Writer.BeginObject("Check"sv); + { + Writer.AddString("HTTP"sv, fmt::format("http://{}:{}/{}", Info.Address, Info.Port, Info.HealthEndpoint)); + Writer.AddString("Interval"sv, fmt::format("{}s", Info.HealthIntervalSeconds)); + Writer.AddString("DeregisterCriticalServiceAfter"sv, fmt::format("{}s", Info.DeregisterAfterSeconds)); + } + Writer.EndObject(); // Check + } + + ExtendableStringBuilder<512> SB; + CompactBinaryToJson(Writer.Save(), SB); + + IoBuffer PayloadBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()); + PayloadBuffer.SetContentType(HttpContentType::kJSON); + HttpClient::Response Result = m_HttpClient.Put("v1/agent/service/register", PayloadBuffer, HttpClient::Accept(HttpContentType::kJSON)); + + if (!Result) + { + ZEN_WARN("ConsulClient::RegisterService() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage("")); + return false; + } + + return true; +} + +bool +ConsulClient::DeregisterService(std::string_view ServiceId) +{ + HttpClient::Response Result = + m_HttpClient.Put(fmt::format("v1/agent/service/deregister/{}", ServiceId), HttpClient::Accept(HttpContentType::kJSON)); + + if (!Result) + { + ZEN_WARN("ConsulClient::DeregisterService() failed to deregister service '{}' ({})", ServiceId, Result.ErrorMessage("")); + return false; + } + + return true; +} + +bool +ConsulClient::FindServiceInJson(std::string_view Json, std::string_view ServiceId) +{ + using namespace std::literals; + + std::string JsonError; + CbFieldIterator Root = LoadCompactBinaryFromJson(Json, JsonError); + if (Root && JsonError.empty()) + { + for (CbFieldView RootIterator : Root) + { + CbObjectView ServiceObject = RootIterator.AsObjectView(); + if (ServiceObject) + { + for (CbFieldView ServiceIterator : ServiceObject) + { + CbObjectView ObjectIterator = ServiceIterator.AsObjectView(); + if (ObjectIterator["ID"sv].AsString() == ServiceId) + { + return true; + } + } + } + } + } + return false; +} + +bool +ConsulClient::HasService(std::string_view ServiceId) +{ + HttpClient::Response Result = m_HttpClient.Get("v1/agent/services"); + if (!Result) + { + return false; + } + return FindServiceInJson(Result.AsText(), ServiceId); +} + +bool +ConsulClient::WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int WaitSeconds) +{ + // Note: m_HttpClient uses unlimited HTTP timeout (Timeout{0}); the WaitSeconds parameter + // governs the server-side bound on the blocking query. Do not add a separate client timeout. + HttpClient::KeyValueMap Parameters({{"index", std::to_string(InOutIndex)}, {"wait", fmt::format("{}s", WaitSeconds)}}); + HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", {}, Parameters); + if (!Result) + { + return false; + } + + auto IndexIt = Result.Header->find("X-Consul-Index"); + if (IndexIt != Result.Header->end()) + { + std::optional<uint64_t> ParsedIndex = ParseInt<uint64_t>(IndexIt->second); + if (ParsedIndex.has_value()) + { + InOutIndex = ParsedIndex.value(); + } + } + + return FindServiceInJson(Result.AsText(), ServiceId); +} + +std::string +ConsulClient::GetAgentServicesJson() +{ + HttpClient::Response Result = m_HttpClient.Get("v1/agent/services"); + if (!Result) + { + return "{}"; + } + return Result.ToText(); +} + +////////////////////////////////////////////////////////////////////////// + +ServiceRegistration::ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info) : m_Client(Client), m_Info(Info) +{ + m_RegistrationThread = std::thread(&ServiceRegistration::RegistrationLoop, this); +} + +ServiceRegistration::~ServiceRegistration() +{ + try + { + m_StopEvent.Set(); + + if (m_RegistrationThread.joinable()) + { + m_RegistrationThread.join(); + } + + if (m_IsRegistered.load()) + { + if (!m_Client->DeregisterService(m_Info.ServiceId)) + { + ZEN_WARN("ServiceRegistration: Failed to deregister service '{}' during cleanup", m_Info.ServiceId); + } + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR(":~ServiceRegistration() threw exception: {}", Ex.what()); + } +} + +void +ServiceRegistration::WaitForReadyEvent(int MaxWaitMS) +{ + if (m_IsRegistered.load()) + { + return; + } + m_ReadyWakeupEvent.Wait(MaxWaitMS); +} + +void +ServiceRegistration::RegistrationLoop() +{ + try + { + SetCurrentThreadName("ConsulRegistration"); + + // Exponential backoff delays: 100ms, 200ms, 400ms + constexpr int BackoffDelaysMs[] = {100, 200, 400}; + constexpr int MaxAttempts = 3; + constexpr int RetryIntervalMs = 5000; + + while (true) + { + bool Succeeded = false; + // Try to register with exponential backoff + for (int Attempt = 0; Attempt < MaxAttempts; ++Attempt) + { + if (m_Client->RegisterService(m_Info)) + { + Succeeded = true; + break; + } + + if (m_StopEvent.Wait(BackoffDelaysMs[Attempt])) + { + return; + } + } + + if (Succeeded || m_Client->RegisterService(m_Info)) + { + break; + } + + // All attempts failed, log warning and wait before retrying + ZEN_WARN("ServiceRegistration: Failed to register service '{}' after {} attempts, retrying in {}s", + m_Info.ServiceId, + MaxAttempts + 1, + RetryIntervalMs / 1000); + + // Wait for 5 seconds before retrying; m_StopEvent.Set() in the destructor + // wakes this immediately so the thread exits without polling. + if (m_StopEvent.Wait(RetryIntervalMs)) + { + return; + } + } + + ZEN_INFO("ServiceRegistration: Successfully registered service '{}'", m_Info.ServiceId); + m_IsRegistered.store(true); + m_ReadyWakeupEvent.Set(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("ServiceRegistration::RegistrationLoop failed with exception: {}", Ex.what()); + } +} + } // namespace zen::consul diff --git a/src/zenutil/include/zenutil/consul.h b/src/zenutil/include/zenutil/consul.h index 08871fa66..b5bfa42d1 100644 --- a/src/zenutil/include/zenutil/consul.h +++ b/src/zenutil/include/zenutil/consul.h @@ -5,11 +5,26 @@ #include <zenbase/zenbase.h> #include <zenhttp/httpclient.h> +#include <atomic> +#include <cstdint> #include <string> #include <string_view> +#include <thread> namespace zen::consul { +struct ServiceRegistrationInfo +{ + std::string ServiceId; + std::string ServiceName; + std::string Address; + uint16_t Port = 0; + std::string HealthEndpoint; + std::vector<std::pair<std::string, std::string>> Tags; + int HealthIntervalSeconds = 10; + int DeregisterAfterSeconds = 30; +}; + class ConsulClient { public: @@ -23,7 +38,22 @@ public: std::string GetKeyValue(std::string_view Key); void DeleteKey(std::string_view Key); + bool RegisterService(const ServiceRegistrationInfo& Info); + bool DeregisterService(std::string_view ServiceId); + + // Query methods for testing + bool HasService(std::string_view ServiceId); + std::string GetAgentServicesJson(); + + // Blocking query on v1/agent/services. Blocks until the service list changes or + // the wait period expires. InOutIndex must be 0 for the first call; it is updated + // to the new X-Consul-Index value on success (left unchanged on error). + // Returns true if ServiceId is present in the response. + bool WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int WaitSeconds); + private: + static bool FindServiceInJson(std::string_view Json, std::string_view ServiceId); + HttpClient m_HttpClient; }; @@ -44,4 +74,36 @@ private: std::unique_ptr<Impl> m_Impl; }; +/** + * RAII wrapper for Consul service registration. + * + * Manages the lifecycle of a Consul service registration with: + * - Async registration in a background thread + * - Exponential backoff retry on failure (100ms, 200ms, 400ms) + * - Continuous background retry every 5 seconds after initial failures until registration succeeds + * - Automatic deregistration on destruction + */ +class ServiceRegistration +{ +public: + ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info); + ~ServiceRegistration(); + + ServiceRegistration(const ServiceRegistration&) = delete; + ServiceRegistration& operator=(const ServiceRegistration&) = delete; + + bool IsRegistered() const { return m_IsRegistered.load(); } + void WaitForReadyEvent(int MaxWaitMS); + +private: + ConsulClient* m_Client; + ServiceRegistrationInfo m_Info; + std::atomic<bool> m_IsRegistered{false}; + std::thread m_RegistrationThread; + Event m_ReadyWakeupEvent; + Event m_StopEvent; + + void RegistrationLoop(); +}; + } // namespace zen::consul diff --git a/src/zenutil/include/zenutil/zenserverprocess.h b/src/zenutil/include/zenutil/zenserverprocess.h index 2a8617162..1b8750628 100644 --- a/src/zenutil/include/zenutil/zenserverprocess.h +++ b/src/zenutil/include/zenutil/zenserverprocess.h @@ -29,9 +29,34 @@ class CbObject; class ZenServerEnvironment { public: + enum EStorageTag + { + Storage + }; + enum EHubTag + { + Hub + }; + enum ETestTag + { + Test + }; + ZenServerEnvironment(); ~ZenServerEnvironment(); + ZenServerEnvironment(ZenServerEnvironment&&); + + ZenServerEnvironment(EStorageTag, std::filesystem::path ProgramBaseDir); + ZenServerEnvironment(EHubTag, + std::filesystem::path ProgramBaseDir, + std::filesystem::path TestBaseDir, + std::string_view ServerClass = ""); + ZenServerEnvironment(ETestTag, + std::filesystem::path ProgramBaseDir, + std::filesystem::path TestBaseDir, + std::string_view ServerClass = ""); + void Initialize(std::filesystem::path ProgramBaseDir); void InitializeForTest(std::filesystem::path ProgramBaseDir, std::filesystem::path TestBaseDir, std::string_view ServerClass = ""); void InitializeForHub(std::filesystem::path ProgramBaseDir, std::filesystem::path TestBaseDir, std::string_view ServerClass = ""); diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index b09c2d89a..b9c50be4f 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -503,6 +503,39 @@ ZenServerEnvironment::~ZenServerEnvironment() { } +ZenServerEnvironment::ZenServerEnvironment(ZenServerEnvironment&& Other) +: m_ProgramBaseDir(std::move(Other.m_ProgramBaseDir)) +, m_ChildProcessBaseDir(std::move(Other.m_ChildProcessBaseDir)) +, m_IsInitialized(Other.m_IsInitialized) +, m_IsTestInstance(Other.m_IsTestInstance) +, m_IsHubInstance(Other.m_IsHubInstance) +, m_PassthroughOutput(Other.m_PassthroughOutput) +, m_ServerClass(std::move(Other.m_ServerClass)) +, m_NextPortNumber(Other.m_NextPortNumber.load()) +{ +} + +ZenServerEnvironment::ZenServerEnvironment(EStorageTag, std::filesystem::path ProgramBaseDir) +{ + Initialize(ProgramBaseDir); +} + +ZenServerEnvironment::ZenServerEnvironment(EHubTag, + std::filesystem::path ProgramBaseDir, + std::filesystem::path TestBaseDir, + std::string_view ServerClass) +{ + InitializeForHub(ProgramBaseDir, TestBaseDir, ServerClass); +} + +ZenServerEnvironment::ZenServerEnvironment(ETestTag, + std::filesystem::path ProgramBaseDir, + std::filesystem::path TestBaseDir, + std::string_view ServerClass) +{ + InitializeForTest(ProgramBaseDir, TestBaseDir, ServerClass); +} + void ZenServerEnvironment::Initialize(std::filesystem::path ProgramBaseDir) { |