aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/consul
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/consul')
-rw-r--r--src/zenutil/consul/consul.cpp740
1 files changed, 730 insertions, 10 deletions
diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp
index 6ddebf97a..762f06817 100644
--- a/src/zenutil/consul/consul.cpp
+++ b/src/zenutil/consul/consul.cpp
@@ -2,14 +2,25 @@
#include <zenutil/consul.h>
+#include <zencore/compactbinarybuilder.h>
#include <zencore/except_fmt.h>
+#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/process.h>
#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
+#include <zencore/thread.h>
#include <zencore/timer.h>
+#include <zenhttp/httpserver.h>
+
+#include <unordered_set>
+
+ZEN_THIRD_PARTY_INCLUDES_START
#include <fmt/format.h>
+ZEN_THIRD_PARTY_INCLUDES_END
namespace zen::consul {
@@ -17,7 +28,7 @@ namespace zen::consul {
struct ConsulProcess::Impl
{
- Impl(std::string_view BaseUri) : m_HttpClient(BaseUri) {}
+ Impl(std::string_view BaseUri, std::string_view Token = "") : m_Token(Token), m_HttpClient(BaseUri) {}
~Impl() = default;
void SpawnConsulAgent()
@@ -28,9 +39,10 @@ struct ConsulProcess::Impl
}
CreateProcOptions Options;
- Options.Flags |= CreateProcOptions::Flag_Windows_NewProcessGroup;
+ Options.Flags |= CreateProcOptions::Flag_NewProcessGroup;
- CreateProcResult Result = CreateProc("consul" ZEN_EXE_SUFFIX_LITERAL, "consul" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options);
+ const std::filesystem::path ConsulExe = GetRunningExecutablePath().parent_path() / ("consul" ZEN_EXE_SUFFIX_LITERAL);
+ CreateProcResult Result = CreateProc(ConsulExe, "consul" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options);
if (Result)
{
@@ -40,10 +52,16 @@ struct ConsulProcess::Impl
// Poll to check when the agent is ready
+ HttpClient::KeyValueMap AdditionalHeaders;
+ if (!m_Token.empty())
+ {
+ AdditionalHeaders.Entries.emplace("X-Consul-Token", m_Token);
+ }
+
do
{
Sleep(100);
- HttpClient::Response Resp = m_HttpClient.Get("v1/status/leader");
+ HttpClient::Response Resp = m_HttpClient.Get("v1/status/leader", AdditionalHeaders);
if (Resp)
{
ZEN_INFO("Consul agent started successfully (waited {})", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
@@ -71,6 +89,7 @@ struct ConsulProcess::Impl
private:
ProcessHandle m_ProcessHandle;
+ std::string m_Token;
HttpClient m_HttpClient;
};
@@ -96,20 +115,42 @@ ConsulProcess::StopConsulAgent()
//////////////////////////////////////////////////////////////////////////
-ConsulClient::ConsulClient(std::string_view BaseUri) : m_HttpClient(BaseUri)
+ConsulClient::ConsulClient(const Configuration& Config)
+: m_Config(Config)
+, m_HttpClient(m_Config.BaseUri, HttpClientSettings{.ConnectTimeout = m_Config.ConnectTimeout, .Timeout = m_Config.Timeout}, [this] {
+ return m_Stop.load();
+})
{
+ m_Worker = std::thread(&ConsulClient::WorkerLoop, this);
}
ConsulClient::~ConsulClient()
{
+ try
+ {
+ m_Stop.store(true);
+ m_Wakeup.Set();
+ if (m_Worker.joinable())
+ {
+ m_Worker.join();
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("ConsulClient::~ConsulClient threw exception: {}", Ex.what());
+ }
}
void
ConsulClient::SetKeyValue(std::string_view Key, std::string_view Value)
{
- IoBuffer ValueBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Value));
- HttpClient::Response Result =
- m_HttpClient.Put(fmt::format("v1/kv/{}", Key), ValueBuffer, {{"Content-Type", "text/plain"}, {"Accept", "application/json"}});
+ HttpClient::KeyValueMap AdditionalHeaders;
+ ApplyCommonHeaders(AdditionalHeaders);
+ AdditionalHeaders.Entries.emplace(HttpClient::Accept(HttpContentType::kJSON));
+
+ IoBuffer ValueBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Value));
+ ValueBuffer.SetContentType(HttpContentType::kText);
+ HttpClient::Response Result = m_HttpClient.Put(fmt::format("v1/kv/{}", Key), ValueBuffer, AdditionalHeaders);
if (!Result)
{
throw runtime_error("ConsulClient::SetKeyValue() failed to set key '{}' ({})", Key, Result.ErrorMessage(""));
@@ -119,7 +160,10 @@ ConsulClient::SetKeyValue(std::string_view Key, std::string_view Value)
std::string
ConsulClient::GetKeyValue(std::string_view Key)
{
- HttpClient::Response Result = m_HttpClient.Get(fmt::format("v1/kv/{}?raw", Key));
+ HttpClient::KeyValueMap AdditionalHeaders;
+ ApplyCommonHeaders(AdditionalHeaders);
+
+ HttpClient::Response Result = m_HttpClient.Get(fmt::format("v1/kv/{}?raw", Key), AdditionalHeaders);
if (!Result)
{
throw runtime_error("ConsulClient::GetKeyValue() failed to get key '{}' ({})", Key, Result.ErrorMessage(""));
@@ -130,11 +174,687 @@ ConsulClient::GetKeyValue(std::string_view Key)
void
ConsulClient::DeleteKey(std::string_view Key)
{
- HttpClient::Response Result = m_HttpClient.Delete(fmt::format("v1/kv/{}", Key));
+ HttpClient::KeyValueMap AdditionalHeaders;
+ ApplyCommonHeaders(AdditionalHeaders);
+
+ HttpClient::Response Result = m_HttpClient.Delete(fmt::format("v1/kv/{}", Key), AdditionalHeaders);
if (!Result)
{
throw runtime_error("ConsulClient::DeleteKey() failed to delete key '{}' ({})", Key, Result.ErrorMessage(""));
}
}
+void
+ConsulClient::RegisterService(const ServiceRegistrationInfo& Info)
+{
+ PendingOp Op{PendingOp::Kind::Register, Info};
+ m_QueueLock.WithExclusiveLock([&] { m_Queue.push_back(std::move(Op)); });
+ m_Wakeup.Set();
+}
+
+void
+ConsulClient::DeregisterService(std::string_view ServiceId)
+{
+ PendingOp Op;
+ Op.Type = PendingOp::Kind::Deregister;
+ Op.Info.ServiceId = std::string(ServiceId);
+ m_QueueLock.WithExclusiveLock([&] { m_Queue.push_back(std::move(Op)); });
+ m_Wakeup.Set();
+}
+
+bool
+ConsulClient::DoRegister(const ServiceRegistrationInfo& Info)
+{
+ using namespace std::literals;
+
+ HttpClient::KeyValueMap AdditionalHeaders;
+ ApplyCommonHeaders(AdditionalHeaders);
+ AdditionalHeaders.Entries.emplace(HttpClient::Accept(HttpContentType::kJSON));
+
+ HttpClient::KeyValueMap AdditionalParameters(std::make_pair<std::string, std::string>("replace-existing-checks", "true"));
+
+ CbObjectWriter Writer;
+ {
+ Writer.AddString("ID"sv, Info.ServiceId);
+ Writer.AddString("Name"sv, Info.ServiceName);
+ if (!Info.Address.empty())
+ {
+ Writer.AddString("Address"sv, Info.Address);
+ }
+ Writer.AddInteger("Port"sv, Info.Port);
+ if (!Info.Tags.empty())
+ {
+ Writer.BeginArray("Tags"sv);
+ for (const std::pair<std::string, std::string>& Tag : Info.Tags)
+ {
+ Writer.AddString(fmt::format("{}:{}", Tag.first, Tag.second));
+ }
+ Writer.EndArray(); // Tags
+ }
+ if (Info.HealthIntervalSeconds != 0)
+ {
+ // Consul requires Interval whenever HTTP is specified; omit the Check block entirely
+ // when no interval is configured (e.g. during Provisioning).
+ Writer.BeginObject("Check"sv);
+ {
+ Writer.AddString(
+ "HTTP"sv,
+ fmt::format("http://{}:{}/{}", Info.Address.empty() ? "localhost" : Info.Address, Info.Port, Info.HealthEndpoint));
+ Writer.AddString("Interval"sv, fmt::format("{}s", Info.HealthIntervalSeconds));
+ if (Info.DeregisterAfterSeconds != 0)
+ {
+ Writer.AddString("DeregisterCriticalServiceAfter"sv, fmt::format("{}s", Info.DeregisterAfterSeconds));
+ }
+ if (!Info.InitialStatus.empty())
+ {
+ Writer.AddString("Status"sv, Info.InitialStatus);
+ }
+ }
+ Writer.EndObject(); // Check
+ }
+ }
+
+ ExtendableStringBuilder<512> SB;
+ CompactBinaryToJson(Writer.Save(), SB);
+
+ IoBuffer PayloadBuffer(IoBuffer::Wrap, SB.Data(), SB.Size());
+ PayloadBuffer.SetContentType(HttpContentType::kJSON);
+ HttpClient::Response Result = m_HttpClient.Put("v1/agent/service/register", PayloadBuffer, AdditionalHeaders, AdditionalParameters);
+
+ if (!Result)
+ {
+ ZEN_WARN("ConsulClient::DoRegister() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage(""));
+ return false;
+ }
+
+ return true;
+}
+
+bool
+ConsulClient::DoDeregister(std::string_view ServiceId)
+{
+ using namespace std::literals;
+
+ HttpClient::KeyValueMap AdditionalHeaders;
+ ApplyCommonHeaders(AdditionalHeaders);
+ AdditionalHeaders.Entries.emplace(HttpClient::Accept(HttpContentType::kJSON));
+
+ HttpClient::Response Result = m_HttpClient.Put(fmt::format("v1/agent/service/deregister/{}", ServiceId), IoBuffer{}, AdditionalHeaders);
+ if (Result)
+ {
+ return true;
+ }
+
+ // Agent deregister failed - fall back to catalog deregister.
+ // This handles cases where the service was registered via a different Consul agent
+ // (e.g. load-balanced endpoint routing to different agents).
+ std::string NodeName = GetNodeName();
+ if (!NodeName.empty())
+ {
+ CbObjectWriter Writer;
+ Writer.AddString("Node"sv, NodeName);
+ Writer.AddString("ServiceID"sv, ServiceId);
+
+ ExtendableStringBuilder<256> SB;
+ CompactBinaryToJson(Writer.Save(), SB);
+
+ IoBuffer PayloadBuffer(IoBuffer::Wrap, SB.Data(), SB.Size());
+ PayloadBuffer.SetContentType(HttpContentType::kJSON);
+
+ HttpClient::Response CatalogResult = m_HttpClient.Put("v1/catalog/deregister", PayloadBuffer, AdditionalHeaders);
+ if (CatalogResult)
+ {
+ ZEN_INFO("ConsulClient::DoDeregister() deregistered service '{}' via catalog fallback (agent error: {})",
+ ServiceId,
+ Result.ErrorMessage(""));
+ return true;
+ }
+
+ ZEN_WARN("ConsulClient::DoDeregister() failed to deregister service '{}' (agent: {}, catalog: {})",
+ ServiceId,
+ Result.ErrorMessage(""),
+ CatalogResult.ErrorMessage(""));
+ }
+ else
+ {
+ ZEN_WARN(
+ "ConsulClient::DoDeregister() failed to deregister service '{}' (agent: {}, could not determine node name for catalog "
+ "fallback)",
+ ServiceId,
+ Result.ErrorMessage(""));
+ }
+
+ return false;
+}
+
+std::string
+ConsulClient::GetNodeName()
+{
+ using namespace std::literals;
+
+ HttpClient::KeyValueMap AdditionalHeaders;
+ ApplyCommonHeaders(AdditionalHeaders);
+
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/self", AdditionalHeaders);
+ if (!Result)
+ {
+ return {};
+ }
+
+ std::string JsonError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(Result.AsText(), JsonError);
+ if (!Root || !JsonError.empty())
+ {
+ return {};
+ }
+
+ for (CbFieldView Field : Root)
+ {
+ if (Field.GetName() == "Config"sv)
+ {
+ CbObjectView Config = Field.AsObjectView();
+ if (Config)
+ {
+ return std::string(Config["NodeName"sv].AsString());
+ }
+ }
+ }
+
+ return {};
+}
+
+void
+ConsulClient::ApplyCommonHeaders(HttpClient::KeyValueMap& InOutHeaderMap)
+{
+ std::string Token;
+ if (!m_Config.StaticToken.empty())
+ {
+ Token = m_Config.StaticToken;
+ }
+ else if (!m_Config.TokenEnvName.empty())
+ {
+ Token = GetEnvVariable(m_Config.TokenEnvName);
+ }
+
+ if (!Token.empty())
+ {
+ InOutHeaderMap.Entries.emplace("X-Consul-Token", Token);
+ }
+}
+
+bool
+ConsulClient::FindServiceInJson(std::string_view Json, std::string_view ServiceId)
+{
+ using namespace std::literals;
+
+ std::string JsonError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(Json, JsonError);
+ if (Root && JsonError.empty())
+ {
+ for (CbFieldView RootIterator : Root)
+ {
+ CbObjectView ServiceObject = RootIterator.AsObjectView();
+ if (ServiceObject)
+ {
+ for (CbFieldView ServiceIterator : ServiceObject)
+ {
+ CbObjectView ObjectIterator = ServiceIterator.AsObjectView();
+ if (ObjectIterator["ID"sv].AsString() == ServiceId)
+ {
+ return true;
+ }
+ }
+ }
+ }
+ }
+ return false;
+}
+
+bool
+ConsulClient::HasService(std::string_view ServiceId)
+{
+ HttpClient::KeyValueMap AdditionalHeaders;
+ ApplyCommonHeaders(AdditionalHeaders);
+
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", AdditionalHeaders);
+ if (!Result)
+ {
+ return false;
+ }
+ return FindServiceInJson(Result.AsText(), ServiceId);
+}
+
+bool
+ConsulClient::WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int WaitSeconds)
+{
+ HttpClient::KeyValueMap AdditionalHeaders;
+ ApplyCommonHeaders(AdditionalHeaders);
+
+ // Note: m_HttpClient runs with a 500ms client-side timeout to keep Register/Deregister from
+ // stalling the hub state machine when the agent is unreachable. That bound applies here too:
+ // WaitSeconds is effectively capped at ~500ms regardless of the argument, so callers must
+ // treat this as a short-poll and loop rather than rely on a true blocking query.
+ HttpClient::KeyValueMap Parameters({{"index", std::to_string(InOutIndex)}, {"wait", fmt::format("{}s", WaitSeconds)}});
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", AdditionalHeaders, Parameters);
+ if (!Result)
+ {
+ return false;
+ }
+
+ auto IndexIt = Result.Header->find("X-Consul-Index");
+ if (IndexIt != Result.Header->end())
+ {
+ std::optional<uint64_t> ParsedIndex = ParseInt<uint64_t>(IndexIt->second);
+ if (ParsedIndex.has_value())
+ {
+ InOutIndex = ParsedIndex.value();
+ }
+ }
+
+ return FindServiceInJson(Result.AsText(), ServiceId);
+}
+
+std::string
+ConsulClient::GetAgentServicesJson()
+{
+ HttpClient::KeyValueMap AdditionalHeaders;
+ ApplyCommonHeaders(AdditionalHeaders);
+
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", AdditionalHeaders);
+ if (!Result)
+ {
+ return "{}";
+ }
+ return Result.ToText();
+}
+
+std::string
+ConsulClient::GetAgentChecksJson()
+{
+ HttpClient::KeyValueMap AdditionalHeaders;
+ ApplyCommonHeaders(AdditionalHeaders);
+
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/checks", AdditionalHeaders);
+ if (!Result)
+ {
+ return "{}";
+ }
+ return Result.ToText();
+}
+
+void
+ConsulClient::WorkerLoop()
+{
+ SetCurrentThreadName("ConsulClient");
+
+ std::unordered_set<std::string> RegisteredServices;
+
+ while (true)
+ {
+ m_Wakeup.Wait(-1);
+ m_Wakeup.Reset();
+
+ const bool Stopping = m_Stop.load();
+
+ std::vector<PendingOp> Batch;
+ m_QueueLock.WithExclusiveLock([&] { Batch.swap(m_Queue); });
+
+ for (size_t Index = 0; Index < Batch.size(); ++Index)
+ {
+ PendingOp& Op = Batch[Index];
+
+ if (Stopping && Op.Type == PendingOp::Kind::Register)
+ {
+ continue;
+ }
+
+ const std::string_view OpName = (Op.Type == PendingOp::Kind::Register) ? "register" : "deregister";
+
+ try
+ {
+ if (Op.Type == PendingOp::Kind::Register)
+ {
+ bool Ok = DoRegister(Op.Info);
+ if (Ok)
+ {
+ RegisteredServices.insert(Op.Info.ServiceId);
+ }
+ else
+ {
+ const size_t Remaining = Batch.size() - Index - 1;
+ ZEN_WARN("ConsulClient worker: {} for '{}' failed; dropping {} remaining queued op(s)",
+ OpName,
+ Op.Info.ServiceId,
+ Remaining);
+ break;
+ }
+ }
+ else
+ {
+ ZEN_ASSERT(Op.Type == PendingOp::Kind::Deregister);
+ if (RegisteredServices.erase(Op.Info.ServiceId) == 1u)
+ {
+ if (!DoDeregister(Op.Info.ServiceId))
+ {
+ ZEN_WARN("ConsulClient worker: {} for '{}' failed", OpName, Op.Info.ServiceId);
+ }
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("ConsulClient worker: {} for '{}' threw: {}", OpName, Op.Info.ServiceId, Ex.what());
+ }
+ catch (...)
+ {
+ ZEN_WARN("ConsulClient worker: {} for '{}' threw unknown exception", OpName, Op.Info.ServiceId);
+ }
+ }
+
+ if (Stopping)
+ {
+ break;
+ }
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ServiceRegistration::ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info) : m_Client(Client), m_Info(Info)
+{
+ m_RegistrationThread = std::thread(&ServiceRegistration::RegistrationLoop, this);
+}
+
+ServiceRegistration::~ServiceRegistration()
+{
+ try
+ {
+ m_StopEvent.Set();
+
+ if (m_RegistrationThread.joinable())
+ {
+ m_RegistrationThread.join();
+ }
+
+ if (m_IsRegistered.load())
+ {
+ if (!m_Client->DoDeregister(m_Info.ServiceId))
+ {
+ ZEN_WARN("ServiceRegistration: Failed to deregister service '{}' during cleanup", m_Info.ServiceId);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR(":~ServiceRegistration() threw exception: {}", Ex.what());
+ }
+}
+
+void
+ServiceRegistration::WaitForReadyEvent(int MaxWaitMS)
+{
+ if (m_IsRegistered.load())
+ {
+ return;
+ }
+ m_ReadyWakeupEvent.Wait(MaxWaitMS);
+}
+
+void
+ServiceRegistration::RegistrationLoop()
+{
+ try
+ {
+ SetCurrentThreadName("ConsulRegistration");
+
+ // Exponential backoff delays: 100ms, 200ms, 400ms
+ constexpr int BackoffDelaysMs[] = {100, 200, 400};
+ constexpr int MaxAttempts = 3;
+ constexpr int RetryIntervalMs = 5000;
+
+ while (true)
+ {
+ bool Succeeded = false;
+ // Try to register with exponential backoff
+ for (int Attempt = 0; Attempt < MaxAttempts; ++Attempt)
+ {
+ if (m_Client->DoRegister(m_Info))
+ {
+ Succeeded = true;
+ break;
+ }
+
+ if (m_StopEvent.Wait(BackoffDelaysMs[Attempt]))
+ {
+ return;
+ }
+ }
+
+ if (Succeeded || m_Client->DoRegister(m_Info))
+ {
+ break;
+ }
+
+ // All attempts failed, log warning and wait before retrying
+ ZEN_WARN("ServiceRegistration: Failed to register service '{}' after {} attempts, retrying in {}s",
+ m_Info.ServiceId,
+ MaxAttempts + 1,
+ RetryIntervalMs / 1000);
+
+ // Wait for 5 seconds before retrying; m_StopEvent.Set() in the destructor
+ // wakes this immediately so the thread exits without polling.
+ if (m_StopEvent.Wait(RetryIntervalMs))
+ {
+ return;
+ }
+ }
+
+ ZEN_INFO("ServiceRegistration: Successfully registered service '{}'", m_Info.ServiceId);
+ m_IsRegistered.store(true);
+ m_ReadyWakeupEvent.Set();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("ServiceRegistration::RegistrationLoop failed with exception: {}", Ex.what());
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+// Tests
+
+#if ZEN_WITH_TESTS
+
+void
+consul_forcelink()
+{
+}
+
+struct MockHealthService : public HttpService
+{
+ std::atomic<bool> FailHealth{false};
+ std::atomic<int> HealthCheckCount{0};
+
+ const char* BaseUri() const override { return "/"; }
+
+ void HandleRequest(HttpServerRequest& Request) override
+ {
+ std::string_view Uri = Request.RelativeUri();
+ if (Uri == "health/" || Uri == "health")
+ {
+ HealthCheckCount.fetch_add(1);
+ if (FailHealth.load())
+ {
+ Request.WriteResponse(HttpResponseCode::ServiceUnavailable);
+ }
+ else
+ {
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok");
+ }
+ return;
+ }
+ Request.WriteResponse(HttpResponseCode::NotFound);
+ }
+};
+
+struct TestHealthServer
+{
+ MockHealthService Mock;
+
+ void Start()
+ {
+ m_TmpDir.emplace();
+ m_Server = CreateHttpServer(HttpServerConfig{.ServerClass = "asio"});
+ m_Port = m_Server->Initialize(0, m_TmpDir->Path() / "http");
+ REQUIRE(m_Port != -1);
+ m_Server->RegisterService(Mock);
+ m_ServerThread = std::thread([this]() { m_Server->Run(false); });
+ }
+
+ int Port() const { return m_Port; }
+
+ ~TestHealthServer()
+ {
+ if (m_Server)
+ {
+ m_Server->RequestExit();
+ }
+ if (m_ServerThread.joinable())
+ {
+ m_ServerThread.join();
+ }
+ if (m_Server)
+ {
+ m_Server->Close();
+ }
+ }
+
+private:
+ std::optional<ScopedTemporaryDirectory> m_TmpDir;
+ Ref<HttpServer> m_Server;
+ std::thread m_ServerThread;
+ int m_Port = -1;
+};
+
+static bool
+WaitForCondition(std::function<bool()> Predicate, int TimeoutMs, int PollIntervalMs = 200)
+{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs))
+ {
+ if (Predicate())
+ {
+ return true;
+ }
+ Sleep(PollIntervalMs);
+ }
+ return Predicate();
+}
+
+static std::string
+GetCheckStatus(ConsulClient& Client, std::string_view ServiceId)
+{
+ using namespace std::literals;
+
+ std::string JsonError;
+ CbFieldIterator ChecksRoot = LoadCompactBinaryFromJson(Client.GetAgentChecksJson(), JsonError);
+ if (!ChecksRoot || !JsonError.empty())
+ {
+ return {};
+ }
+
+ for (CbFieldView F : ChecksRoot)
+ {
+ if (!F.IsObject())
+ {
+ continue;
+ }
+ for (CbFieldView C : F.AsObjectView())
+ {
+ CbObjectView Check = C.AsObjectView();
+ if (Check["ServiceID"sv].AsString() == ServiceId)
+ {
+ return std::string(Check["Status"sv].AsString());
+ }
+ }
+ }
+ return {};
+}
+
+TEST_SUITE_BEGIN("util.consul");
+
+TEST_CASE("util.consul.service_lifecycle")
+{
+ ConsulProcess ConsulProc;
+ ConsulProc.SpawnConsulAgent();
+
+ TestHealthServer HealthServer;
+ HealthServer.Start();
+
+ ConsulClient Client(
+ {.BaseUri = "http://localhost:8500/", .ConnectTimeout = std::chrono::seconds{5}, .Timeout = std::chrono::seconds{5}});
+
+ const std::string ServiceId = "test-health-svc";
+
+ ServiceRegistrationInfo Info;
+ Info.ServiceId = ServiceId;
+ Info.ServiceName = "zen-test-health";
+ Info.Address = "127.0.0.1";
+ Info.Port = static_cast<uint16_t>(HealthServer.Port());
+ Info.HealthEndpoint = "health/";
+ Info.HealthIntervalSeconds = 1;
+ Info.DeregisterAfterSeconds = 60;
+
+ // Register/Deregister are async; wait for the worker to propagate to Consul.
+
+ // Phase 1: Register and verify Consul sends health checks to our service
+ Client.RegisterService(Info);
+ REQUIRE(WaitForCondition([&]() { return Client.HasService(ServiceId); }, 10000, 50));
+
+ REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000, 50));
+ CHECK(HealthServer.Mock.HealthCheckCount.load() >= 1);
+ CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing");
+
+ // Phase 2: Explicit deregister
+ Client.DeregisterService(ServiceId);
+ REQUIRE(WaitForCondition([&]() { return !Client.HasService(ServiceId); }, 10000, 50));
+
+ // Phase 3: Register with InitialStatus, verify immediately passing before any health check fires,
+ // then fail health and verify check goes critical
+ HealthServer.Mock.HealthCheckCount.store(0);
+ HealthServer.Mock.FailHealth.store(false);
+
+ Info.InitialStatus = "passing";
+ Client.RegisterService(Info);
+ REQUIRE(WaitForCondition([&]() { return Client.HasService(ServiceId); }, 10000, 50));
+
+ // Registration is async; by the time HasService observes the service the 1s health interval
+ // may already have fired, so we can't robustly assert HealthCheckCount==0. The "passing" status
+ // below still proves InitialStatus applied (it can only be "passing" via InitialStatus or a
+ // successful health check - both are acceptable demonstrations).
+ CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing");
+
+ REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000, 50));
+ CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing");
+
+ HealthServer.Mock.FailHealth.store(true);
+
+ // Wait for Consul to observe the failing check
+ REQUIRE(WaitForCondition([&]() { return GetCheckStatus(Client, ServiceId) == "critical"; }, 10000, 50));
+ CHECK_EQ(GetCheckStatus(Client, ServiceId), "critical");
+
+ // Phase 4: Explicit deregister while critical
+ Client.DeregisterService(ServiceId);
+ REQUIRE(WaitForCondition([&]() { return !Client.HasService(ServiceId); }, 10000, 50));
+
+ // Phase 5: Deregister an already-deregistered service - should not crash
+ Client.DeregisterService(ServiceId);
+ REQUIRE(WaitForCondition([&]() { return !Client.HasService(ServiceId); }, 10000, 50));
+
+ ConsulProc.StopConsulAgent();
+}
+
+TEST_SUITE_END();
+
+#endif
+
} // namespace zen::consul