diff options
Diffstat (limited to 'src/zenutil/consul')
| -rw-r--r-- | src/zenutil/consul/consul.cpp | 740 |
1 files changed, 730 insertions, 10 deletions
diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp index 6ddebf97a..762f06817 100644 --- a/src/zenutil/consul/consul.cpp +++ b/src/zenutil/consul/consul.cpp @@ -2,14 +2,25 @@ #include <zenutil/consul.h> +#include <zencore/compactbinarybuilder.h> #include <zencore/except_fmt.h> +#include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/process.h> #include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> +#include <zencore/thread.h> #include <zencore/timer.h> +#include <zenhttp/httpserver.h> + +#include <unordered_set> + +ZEN_THIRD_PARTY_INCLUDES_START #include <fmt/format.h> +ZEN_THIRD_PARTY_INCLUDES_END namespace zen::consul { @@ -17,7 +28,7 @@ namespace zen::consul { struct ConsulProcess::Impl { - Impl(std::string_view BaseUri) : m_HttpClient(BaseUri) {} + Impl(std::string_view BaseUri, std::string_view Token = "") : m_Token(Token), m_HttpClient(BaseUri) {} ~Impl() = default; void SpawnConsulAgent() @@ -28,9 +39,10 @@ struct ConsulProcess::Impl } CreateProcOptions Options; - Options.Flags |= CreateProcOptions::Flag_Windows_NewProcessGroup; + Options.Flags |= CreateProcOptions::Flag_NewProcessGroup; - CreateProcResult Result = CreateProc("consul" ZEN_EXE_SUFFIX_LITERAL, "consul" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options); + const std::filesystem::path ConsulExe = GetRunningExecutablePath().parent_path() / ("consul" ZEN_EXE_SUFFIX_LITERAL); + CreateProcResult Result = CreateProc(ConsulExe, "consul" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options); if (Result) { @@ -40,10 +52,16 @@ struct ConsulProcess::Impl // Poll to check when the agent is ready + HttpClient::KeyValueMap AdditionalHeaders; + if (!m_Token.empty()) + { + AdditionalHeaders.Entries.emplace("X-Consul-Token", m_Token); + } + do { Sleep(100); - HttpClient::Response Resp = m_HttpClient.Get("v1/status/leader"); + HttpClient::Response Resp = m_HttpClient.Get("v1/status/leader", AdditionalHeaders); if (Resp) { ZEN_INFO("Consul agent started successfully (waited {})", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); @@ -71,6 +89,7 @@ struct ConsulProcess::Impl private: ProcessHandle m_ProcessHandle; + std::string m_Token; HttpClient m_HttpClient; }; @@ -96,20 +115,42 @@ ConsulProcess::StopConsulAgent() ////////////////////////////////////////////////////////////////////////// -ConsulClient::ConsulClient(std::string_view BaseUri) : m_HttpClient(BaseUri) +ConsulClient::ConsulClient(const Configuration& Config) +: m_Config(Config) +, m_HttpClient(m_Config.BaseUri, HttpClientSettings{.ConnectTimeout = m_Config.ConnectTimeout, .Timeout = m_Config.Timeout}, [this] { + return m_Stop.load(); +}) { + m_Worker = std::thread(&ConsulClient::WorkerLoop, this); } ConsulClient::~ConsulClient() { + try + { + m_Stop.store(true); + m_Wakeup.Set(); + if (m_Worker.joinable()) + { + m_Worker.join(); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("ConsulClient::~ConsulClient threw exception: {}", Ex.what()); + } } void ConsulClient::SetKeyValue(std::string_view Key, std::string_view Value) { - IoBuffer ValueBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Value)); - HttpClient::Response Result = - m_HttpClient.Put(fmt::format("v1/kv/{}", Key), ValueBuffer, {{"Content-Type", "text/plain"}, {"Accept", "application/json"}}); + HttpClient::KeyValueMap AdditionalHeaders; + ApplyCommonHeaders(AdditionalHeaders); + AdditionalHeaders.Entries.emplace(HttpClient::Accept(HttpContentType::kJSON)); + + IoBuffer ValueBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Value)); + ValueBuffer.SetContentType(HttpContentType::kText); + HttpClient::Response Result = m_HttpClient.Put(fmt::format("v1/kv/{}", Key), ValueBuffer, AdditionalHeaders); if (!Result) { throw runtime_error("ConsulClient::SetKeyValue() failed to set key '{}' ({})", Key, Result.ErrorMessage("")); @@ -119,7 +160,10 @@ ConsulClient::SetKeyValue(std::string_view Key, std::string_view Value) std::string ConsulClient::GetKeyValue(std::string_view Key) { - HttpClient::Response Result = m_HttpClient.Get(fmt::format("v1/kv/{}?raw", Key)); + HttpClient::KeyValueMap AdditionalHeaders; + ApplyCommonHeaders(AdditionalHeaders); + + HttpClient::Response Result = m_HttpClient.Get(fmt::format("v1/kv/{}?raw", Key), AdditionalHeaders); if (!Result) { throw runtime_error("ConsulClient::GetKeyValue() failed to get key '{}' ({})", Key, Result.ErrorMessage("")); @@ -130,11 +174,687 @@ ConsulClient::GetKeyValue(std::string_view Key) void ConsulClient::DeleteKey(std::string_view Key) { - HttpClient::Response Result = m_HttpClient.Delete(fmt::format("v1/kv/{}", Key)); + HttpClient::KeyValueMap AdditionalHeaders; + ApplyCommonHeaders(AdditionalHeaders); + + HttpClient::Response Result = m_HttpClient.Delete(fmt::format("v1/kv/{}", Key), AdditionalHeaders); if (!Result) { throw runtime_error("ConsulClient::DeleteKey() failed to delete key '{}' ({})", Key, Result.ErrorMessage("")); } } +void +ConsulClient::RegisterService(const ServiceRegistrationInfo& Info) +{ + PendingOp Op{PendingOp::Kind::Register, Info}; + m_QueueLock.WithExclusiveLock([&] { m_Queue.push_back(std::move(Op)); }); + m_Wakeup.Set(); +} + +void +ConsulClient::DeregisterService(std::string_view ServiceId) +{ + PendingOp Op; + Op.Type = PendingOp::Kind::Deregister; + Op.Info.ServiceId = std::string(ServiceId); + m_QueueLock.WithExclusiveLock([&] { m_Queue.push_back(std::move(Op)); }); + m_Wakeup.Set(); +} + +bool +ConsulClient::DoRegister(const ServiceRegistrationInfo& Info) +{ + using namespace std::literals; + + HttpClient::KeyValueMap AdditionalHeaders; + ApplyCommonHeaders(AdditionalHeaders); + AdditionalHeaders.Entries.emplace(HttpClient::Accept(HttpContentType::kJSON)); + + HttpClient::KeyValueMap AdditionalParameters(std::make_pair<std::string, std::string>("replace-existing-checks", "true")); + + CbObjectWriter Writer; + { + Writer.AddString("ID"sv, Info.ServiceId); + Writer.AddString("Name"sv, Info.ServiceName); + if (!Info.Address.empty()) + { + Writer.AddString("Address"sv, Info.Address); + } + Writer.AddInteger("Port"sv, Info.Port); + if (!Info.Tags.empty()) + { + Writer.BeginArray("Tags"sv); + for (const std::pair<std::string, std::string>& Tag : Info.Tags) + { + Writer.AddString(fmt::format("{}:{}", Tag.first, Tag.second)); + } + Writer.EndArray(); // Tags + } + if (Info.HealthIntervalSeconds != 0) + { + // Consul requires Interval whenever HTTP is specified; omit the Check block entirely + // when no interval is configured (e.g. during Provisioning). + Writer.BeginObject("Check"sv); + { + Writer.AddString( + "HTTP"sv, + fmt::format("http://{}:{}/{}", Info.Address.empty() ? "localhost" : Info.Address, Info.Port, Info.HealthEndpoint)); + Writer.AddString("Interval"sv, fmt::format("{}s", Info.HealthIntervalSeconds)); + if (Info.DeregisterAfterSeconds != 0) + { + Writer.AddString("DeregisterCriticalServiceAfter"sv, fmt::format("{}s", Info.DeregisterAfterSeconds)); + } + if (!Info.InitialStatus.empty()) + { + Writer.AddString("Status"sv, Info.InitialStatus); + } + } + Writer.EndObject(); // Check + } + } + + ExtendableStringBuilder<512> SB; + CompactBinaryToJson(Writer.Save(), SB); + + IoBuffer PayloadBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()); + PayloadBuffer.SetContentType(HttpContentType::kJSON); + HttpClient::Response Result = m_HttpClient.Put("v1/agent/service/register", PayloadBuffer, AdditionalHeaders, AdditionalParameters); + + if (!Result) + { + ZEN_WARN("ConsulClient::DoRegister() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage("")); + return false; + } + + return true; +} + +bool +ConsulClient::DoDeregister(std::string_view ServiceId) +{ + using namespace std::literals; + + HttpClient::KeyValueMap AdditionalHeaders; + ApplyCommonHeaders(AdditionalHeaders); + AdditionalHeaders.Entries.emplace(HttpClient::Accept(HttpContentType::kJSON)); + + HttpClient::Response Result = m_HttpClient.Put(fmt::format("v1/agent/service/deregister/{}", ServiceId), IoBuffer{}, AdditionalHeaders); + if (Result) + { + return true; + } + + // Agent deregister failed - fall back to catalog deregister. + // This handles cases where the service was registered via a different Consul agent + // (e.g. load-balanced endpoint routing to different agents). + std::string NodeName = GetNodeName(); + if (!NodeName.empty()) + { + CbObjectWriter Writer; + Writer.AddString("Node"sv, NodeName); + Writer.AddString("ServiceID"sv, ServiceId); + + ExtendableStringBuilder<256> SB; + CompactBinaryToJson(Writer.Save(), SB); + + IoBuffer PayloadBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()); + PayloadBuffer.SetContentType(HttpContentType::kJSON); + + HttpClient::Response CatalogResult = m_HttpClient.Put("v1/catalog/deregister", PayloadBuffer, AdditionalHeaders); + if (CatalogResult) + { + ZEN_INFO("ConsulClient::DoDeregister() deregistered service '{}' via catalog fallback (agent error: {})", + ServiceId, + Result.ErrorMessage("")); + return true; + } + + ZEN_WARN("ConsulClient::DoDeregister() failed to deregister service '{}' (agent: {}, catalog: {})", + ServiceId, + Result.ErrorMessage(""), + CatalogResult.ErrorMessage("")); + } + else + { + ZEN_WARN( + "ConsulClient::DoDeregister() failed to deregister service '{}' (agent: {}, could not determine node name for catalog " + "fallback)", + ServiceId, + Result.ErrorMessage("")); + } + + return false; +} + +std::string +ConsulClient::GetNodeName() +{ + using namespace std::literals; + + HttpClient::KeyValueMap AdditionalHeaders; + ApplyCommonHeaders(AdditionalHeaders); + + HttpClient::Response Result = m_HttpClient.Get("v1/agent/self", AdditionalHeaders); + if (!Result) + { + return {}; + } + + std::string JsonError; + CbFieldIterator Root = LoadCompactBinaryFromJson(Result.AsText(), JsonError); + if (!Root || !JsonError.empty()) + { + return {}; + } + + for (CbFieldView Field : Root) + { + if (Field.GetName() == "Config"sv) + { + CbObjectView Config = Field.AsObjectView(); + if (Config) + { + return std::string(Config["NodeName"sv].AsString()); + } + } + } + + return {}; +} + +void +ConsulClient::ApplyCommonHeaders(HttpClient::KeyValueMap& InOutHeaderMap) +{ + std::string Token; + if (!m_Config.StaticToken.empty()) + { + Token = m_Config.StaticToken; + } + else if (!m_Config.TokenEnvName.empty()) + { + Token = GetEnvVariable(m_Config.TokenEnvName); + } + + if (!Token.empty()) + { + InOutHeaderMap.Entries.emplace("X-Consul-Token", Token); + } +} + +bool +ConsulClient::FindServiceInJson(std::string_view Json, std::string_view ServiceId) +{ + using namespace std::literals; + + std::string JsonError; + CbFieldIterator Root = LoadCompactBinaryFromJson(Json, JsonError); + if (Root && JsonError.empty()) + { + for (CbFieldView RootIterator : Root) + { + CbObjectView ServiceObject = RootIterator.AsObjectView(); + if (ServiceObject) + { + for (CbFieldView ServiceIterator : ServiceObject) + { + CbObjectView ObjectIterator = ServiceIterator.AsObjectView(); + if (ObjectIterator["ID"sv].AsString() == ServiceId) + { + return true; + } + } + } + } + } + return false; +} + +bool +ConsulClient::HasService(std::string_view ServiceId) +{ + HttpClient::KeyValueMap AdditionalHeaders; + ApplyCommonHeaders(AdditionalHeaders); + + HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", AdditionalHeaders); + if (!Result) + { + return false; + } + return FindServiceInJson(Result.AsText(), ServiceId); +} + +bool +ConsulClient::WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int WaitSeconds) +{ + HttpClient::KeyValueMap AdditionalHeaders; + ApplyCommonHeaders(AdditionalHeaders); + + // Note: m_HttpClient runs with a 500ms client-side timeout to keep Register/Deregister from + // stalling the hub state machine when the agent is unreachable. That bound applies here too: + // WaitSeconds is effectively capped at ~500ms regardless of the argument, so callers must + // treat this as a short-poll and loop rather than rely on a true blocking query. + HttpClient::KeyValueMap Parameters({{"index", std::to_string(InOutIndex)}, {"wait", fmt::format("{}s", WaitSeconds)}}); + HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", AdditionalHeaders, Parameters); + if (!Result) + { + return false; + } + + auto IndexIt = Result.Header->find("X-Consul-Index"); + if (IndexIt != Result.Header->end()) + { + std::optional<uint64_t> ParsedIndex = ParseInt<uint64_t>(IndexIt->second); + if (ParsedIndex.has_value()) + { + InOutIndex = ParsedIndex.value(); + } + } + + return FindServiceInJson(Result.AsText(), ServiceId); +} + +std::string +ConsulClient::GetAgentServicesJson() +{ + HttpClient::KeyValueMap AdditionalHeaders; + ApplyCommonHeaders(AdditionalHeaders); + + HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", AdditionalHeaders); + if (!Result) + { + return "{}"; + } + return Result.ToText(); +} + +std::string +ConsulClient::GetAgentChecksJson() +{ + HttpClient::KeyValueMap AdditionalHeaders; + ApplyCommonHeaders(AdditionalHeaders); + + HttpClient::Response Result = m_HttpClient.Get("v1/agent/checks", AdditionalHeaders); + if (!Result) + { + return "{}"; + } + return Result.ToText(); +} + +void +ConsulClient::WorkerLoop() +{ + SetCurrentThreadName("ConsulClient"); + + std::unordered_set<std::string> RegisteredServices; + + while (true) + { + m_Wakeup.Wait(-1); + m_Wakeup.Reset(); + + const bool Stopping = m_Stop.load(); + + std::vector<PendingOp> Batch; + m_QueueLock.WithExclusiveLock([&] { Batch.swap(m_Queue); }); + + for (size_t Index = 0; Index < Batch.size(); ++Index) + { + PendingOp& Op = Batch[Index]; + + if (Stopping && Op.Type == PendingOp::Kind::Register) + { + continue; + } + + const std::string_view OpName = (Op.Type == PendingOp::Kind::Register) ? "register" : "deregister"; + + try + { + if (Op.Type == PendingOp::Kind::Register) + { + bool Ok = DoRegister(Op.Info); + if (Ok) + { + RegisteredServices.insert(Op.Info.ServiceId); + } + else + { + const size_t Remaining = Batch.size() - Index - 1; + ZEN_WARN("ConsulClient worker: {} for '{}' failed; dropping {} remaining queued op(s)", + OpName, + Op.Info.ServiceId, + Remaining); + break; + } + } + else + { + ZEN_ASSERT(Op.Type == PendingOp::Kind::Deregister); + if (RegisteredServices.erase(Op.Info.ServiceId) == 1u) + { + if (!DoDeregister(Op.Info.ServiceId)) + { + ZEN_WARN("ConsulClient worker: {} for '{}' failed", OpName, Op.Info.ServiceId); + } + } + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("ConsulClient worker: {} for '{}' threw: {}", OpName, Op.Info.ServiceId, Ex.what()); + } + catch (...) + { + ZEN_WARN("ConsulClient worker: {} for '{}' threw unknown exception", OpName, Op.Info.ServiceId); + } + } + + if (Stopping) + { + break; + } + } +} + +////////////////////////////////////////////////////////////////////////// + +ServiceRegistration::ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info) : m_Client(Client), m_Info(Info) +{ + m_RegistrationThread = std::thread(&ServiceRegistration::RegistrationLoop, this); +} + +ServiceRegistration::~ServiceRegistration() +{ + try + { + m_StopEvent.Set(); + + if (m_RegistrationThread.joinable()) + { + m_RegistrationThread.join(); + } + + if (m_IsRegistered.load()) + { + if (!m_Client->DoDeregister(m_Info.ServiceId)) + { + ZEN_WARN("ServiceRegistration: Failed to deregister service '{}' during cleanup", m_Info.ServiceId); + } + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR(":~ServiceRegistration() threw exception: {}", Ex.what()); + } +} + +void +ServiceRegistration::WaitForReadyEvent(int MaxWaitMS) +{ + if (m_IsRegistered.load()) + { + return; + } + m_ReadyWakeupEvent.Wait(MaxWaitMS); +} + +void +ServiceRegistration::RegistrationLoop() +{ + try + { + SetCurrentThreadName("ConsulRegistration"); + + // Exponential backoff delays: 100ms, 200ms, 400ms + constexpr int BackoffDelaysMs[] = {100, 200, 400}; + constexpr int MaxAttempts = 3; + constexpr int RetryIntervalMs = 5000; + + while (true) + { + bool Succeeded = false; + // Try to register with exponential backoff + for (int Attempt = 0; Attempt < MaxAttempts; ++Attempt) + { + if (m_Client->DoRegister(m_Info)) + { + Succeeded = true; + break; + } + + if (m_StopEvent.Wait(BackoffDelaysMs[Attempt])) + { + return; + } + } + + if (Succeeded || m_Client->DoRegister(m_Info)) + { + break; + } + + // All attempts failed, log warning and wait before retrying + ZEN_WARN("ServiceRegistration: Failed to register service '{}' after {} attempts, retrying in {}s", + m_Info.ServiceId, + MaxAttempts + 1, + RetryIntervalMs / 1000); + + // Wait for 5 seconds before retrying; m_StopEvent.Set() in the destructor + // wakes this immediately so the thread exits without polling. + if (m_StopEvent.Wait(RetryIntervalMs)) + { + return; + } + } + + ZEN_INFO("ServiceRegistration: Successfully registered service '{}'", m_Info.ServiceId); + m_IsRegistered.store(true); + m_ReadyWakeupEvent.Set(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("ServiceRegistration::RegistrationLoop failed with exception: {}", Ex.what()); + } +} + +////////////////////////////////////////////////////////////////////////// +// Tests + +#if ZEN_WITH_TESTS + +void +consul_forcelink() +{ +} + +struct MockHealthService : public HttpService +{ + std::atomic<bool> FailHealth{false}; + std::atomic<int> HealthCheckCount{0}; + + const char* BaseUri() const override { return "/"; } + + void HandleRequest(HttpServerRequest& Request) override + { + std::string_view Uri = Request.RelativeUri(); + if (Uri == "health/" || Uri == "health") + { + HealthCheckCount.fetch_add(1); + if (FailHealth.load()) + { + Request.WriteResponse(HttpResponseCode::ServiceUnavailable); + } + else + { + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok"); + } + return; + } + Request.WriteResponse(HttpResponseCode::NotFound); + } +}; + +struct TestHealthServer +{ + MockHealthService Mock; + + void Start() + { + m_TmpDir.emplace(); + m_Server = CreateHttpServer(HttpServerConfig{.ServerClass = "asio"}); + m_Port = m_Server->Initialize(0, m_TmpDir->Path() / "http"); + REQUIRE(m_Port != -1); + m_Server->RegisterService(Mock); + m_ServerThread = std::thread([this]() { m_Server->Run(false); }); + } + + int Port() const { return m_Port; } + + ~TestHealthServer() + { + if (m_Server) + { + m_Server->RequestExit(); + } + if (m_ServerThread.joinable()) + { + m_ServerThread.join(); + } + if (m_Server) + { + m_Server->Close(); + } + } + +private: + std::optional<ScopedTemporaryDirectory> m_TmpDir; + Ref<HttpServer> m_Server; + std::thread m_ServerThread; + int m_Port = -1; +}; + +static bool +WaitForCondition(std::function<bool()> Predicate, int TimeoutMs, int PollIntervalMs = 200) +{ + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs)) + { + if (Predicate()) + { + return true; + } + Sleep(PollIntervalMs); + } + return Predicate(); +} + +static std::string +GetCheckStatus(ConsulClient& Client, std::string_view ServiceId) +{ + using namespace std::literals; + + std::string JsonError; + CbFieldIterator ChecksRoot = LoadCompactBinaryFromJson(Client.GetAgentChecksJson(), JsonError); + if (!ChecksRoot || !JsonError.empty()) + { + return {}; + } + + for (CbFieldView F : ChecksRoot) + { + if (!F.IsObject()) + { + continue; + } + for (CbFieldView C : F.AsObjectView()) + { + CbObjectView Check = C.AsObjectView(); + if (Check["ServiceID"sv].AsString() == ServiceId) + { + return std::string(Check["Status"sv].AsString()); + } + } + } + return {}; +} + +TEST_SUITE_BEGIN("util.consul"); + +TEST_CASE("util.consul.service_lifecycle") +{ + ConsulProcess ConsulProc; + ConsulProc.SpawnConsulAgent(); + + TestHealthServer HealthServer; + HealthServer.Start(); + + ConsulClient Client( + {.BaseUri = "http://localhost:8500/", .ConnectTimeout = std::chrono::seconds{5}, .Timeout = std::chrono::seconds{5}}); + + const std::string ServiceId = "test-health-svc"; + + ServiceRegistrationInfo Info; + Info.ServiceId = ServiceId; + Info.ServiceName = "zen-test-health"; + Info.Address = "127.0.0.1"; + Info.Port = static_cast<uint16_t>(HealthServer.Port()); + Info.HealthEndpoint = "health/"; + Info.HealthIntervalSeconds = 1; + Info.DeregisterAfterSeconds = 60; + + // Register/Deregister are async; wait for the worker to propagate to Consul. + + // Phase 1: Register and verify Consul sends health checks to our service + Client.RegisterService(Info); + REQUIRE(WaitForCondition([&]() { return Client.HasService(ServiceId); }, 10000, 50)); + + REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000, 50)); + CHECK(HealthServer.Mock.HealthCheckCount.load() >= 1); + CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing"); + + // Phase 2: Explicit deregister + Client.DeregisterService(ServiceId); + REQUIRE(WaitForCondition([&]() { return !Client.HasService(ServiceId); }, 10000, 50)); + + // Phase 3: Register with InitialStatus, verify immediately passing before any health check fires, + // then fail health and verify check goes critical + HealthServer.Mock.HealthCheckCount.store(0); + HealthServer.Mock.FailHealth.store(false); + + Info.InitialStatus = "passing"; + Client.RegisterService(Info); + REQUIRE(WaitForCondition([&]() { return Client.HasService(ServiceId); }, 10000, 50)); + + // Registration is async; by the time HasService observes the service the 1s health interval + // may already have fired, so we can't robustly assert HealthCheckCount==0. The "passing" status + // below still proves InitialStatus applied (it can only be "passing" via InitialStatus or a + // successful health check - both are acceptable demonstrations). + CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing"); + + REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000, 50)); + CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing"); + + HealthServer.Mock.FailHealth.store(true); + + // Wait for Consul to observe the failing check + REQUIRE(WaitForCondition([&]() { return GetCheckStatus(Client, ServiceId) == "critical"; }, 10000, 50)); + CHECK_EQ(GetCheckStatus(Client, ServiceId), "critical"); + + // Phase 4: Explicit deregister while critical + Client.DeregisterService(ServiceId); + REQUIRE(WaitForCondition([&]() { return !Client.HasService(ServiceId); }, 10000, 50)); + + // Phase 5: Deregister an already-deregistered service - should not crash + Client.DeregisterService(ServiceId); + REQUIRE(WaitForCondition([&]() { return !Client.HasService(ServiceId); }, 10000, 50)); + + ConsulProc.StopConsulAgent(); +} + +TEST_SUITE_END(); + +#endif + } // namespace zen::consul |