diff options
| author | Dan Engelbrecht <[email protected]> | 2026-04-02 09:58:42 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-02 09:58:42 +0200 |
| commit | b5fa9fd16fbaa1ceac95ff3a7f4c8e9f414ee525 (patch) | |
| tree | 87275c9383bea83f7e384a47b668fdea3d65b46f /src/zenutil | |
| parent | add provision button to hub ui (#915) (diff) | |
| download | zen-b5fa9fd16fbaa1ceac95ff3a7f4c8e9f414ee525.tar.xz zen-b5fa9fd16fbaa1ceac95ff3a7f4c8e9f414ee525.zip | |
s3 and consul fixes (#916)
* fix endpoint for stats/hub in compute/hub.html page
* fix api token call failure for imds (using wrong overload for Put)
* add "localhost" to healt check url in consul when no address is given
* add consul fallback deregister if normal deregister fails
* add consul registration unit test
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/cloud/imdscredentials.cpp | 2 | ||||
| -rw-r--r-- | src/zenutil/consul/consul.cpp | 278 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/consul.h | 3 | ||||
| -rw-r--r-- | src/zenutil/zenutil.cpp | 2 |
4 files changed, 279 insertions, 6 deletions
diff --git a/src/zenutil/cloud/imdscredentials.cpp b/src/zenutil/cloud/imdscredentials.cpp index dde1dc019..5a6cf45d2 100644 --- a/src/zenutil/cloud/imdscredentials.cpp +++ b/src/zenutil/cloud/imdscredentials.cpp @@ -115,7 +115,7 @@ ImdsCredentialProvider::FetchToken() HttpClient::KeyValueMap Headers; Headers->emplace("X-aws-ec2-metadata-token-ttl-seconds", "21600"); - HttpClient::Response Response = m_HttpClient.Put("/latest/api/token", Headers); + HttpClient::Response Response = m_HttpClient.Put("/latest/api/token", IoBuffer{}, Headers); if (!Response.IsSuccess()) { ZEN_WARN("IMDS token request failed: {}", Response.ErrorMessage("PUT /latest/api/token")); diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp index c0cea20c0..951beed65 100644 --- a/src/zenutil/consul/consul.cpp +++ b/src/zenutil/consul/consul.cpp @@ -9,9 +9,13 @@ #include <zencore/logging.h> #include <zencore/process.h> #include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> #include <zencore/thread.h> #include <zencore/timer.h> +#include <zenhttp/httpserver.h> + #include <fmt/format.h> namespace zen::consul { @@ -193,7 +197,9 @@ ConsulClient::RegisterService(const ServiceRegistrationInfo& Info) // when no interval is configured (e.g. during Provisioning). Writer.BeginObject("Check"sv); { - Writer.AddString("HTTP"sv, fmt::format("http://{}:{}/{}", Info.Address, Info.Port, Info.HealthEndpoint)); + Writer.AddString( + "HTTP"sv, + fmt::format("http://{}:{}/{}", Info.Address.empty() ? "localhost" : Info.Address, Info.Port, Info.HealthEndpoint)); Writer.AddString("Interval"sv, fmt::format("{}s", Info.HealthIntervalSeconds)); if (Info.DeregisterAfterSeconds != 0) { @@ -223,19 +229,94 @@ ConsulClient::RegisterService(const ServiceRegistrationInfo& Info) bool ConsulClient::DeregisterService(std::string_view ServiceId) { + using namespace std::literals; + HttpClient::KeyValueMap AdditionalHeaders; ApplyCommonHeaders(AdditionalHeaders); AdditionalHeaders.Entries.emplace(HttpClient::Accept(HttpContentType::kJSON)); - HttpClient::Response Result = m_HttpClient.Put(fmt::format("v1/agent/service/deregister/{}", ServiceId), AdditionalHeaders); + HttpClient::Response Result = m_HttpClient.Put(fmt::format("v1/agent/service/deregister/{}", ServiceId), IoBuffer{}, AdditionalHeaders); + if (Result) + { + return true; + } + + // Agent deregister failed — fall back to catalog deregister. + // This handles cases where the service was registered via a different Consul agent + // (e.g. load-balanced endpoint routing to different agents). + std::string NodeName = GetNodeName(); + if (!NodeName.empty()) + { + CbObjectWriter Writer; + Writer.AddString("Node"sv, NodeName); + Writer.AddString("ServiceID"sv, ServiceId); + + ExtendableStringBuilder<256> SB; + CompactBinaryToJson(Writer.Save(), SB); + + IoBuffer PayloadBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()); + PayloadBuffer.SetContentType(HttpContentType::kJSON); + + HttpClient::Response CatalogResult = m_HttpClient.Put("v1/catalog/deregister", PayloadBuffer, AdditionalHeaders); + if (CatalogResult) + { + ZEN_INFO("ConsulClient::DeregisterService() deregistered service '{}' via catalog fallback (agent error: {})", + ServiceId, + Result.ErrorMessage("")); + return true; + } + + ZEN_WARN("ConsulClient::DeregisterService() failed to deregister service '{}' (agent: {}, catalog: {})", + ServiceId, + Result.ErrorMessage(""), + CatalogResult.ErrorMessage("")); + } + else + { + ZEN_WARN( + "ConsulClient::DeregisterService() failed to deregister service '{}' (agent: {}, could not determine node name for catalog " + "fallback)", + ServiceId, + Result.ErrorMessage("")); + } + return false; +} + +std::string +ConsulClient::GetNodeName() +{ + using namespace std::literals; + + HttpClient::KeyValueMap AdditionalHeaders; + ApplyCommonHeaders(AdditionalHeaders); + + HttpClient::Response Result = m_HttpClient.Get("v1/agent/self", AdditionalHeaders); if (!Result) { - ZEN_WARN("ConsulClient::DeregisterService() failed to deregister service '{}' ({})", ServiceId, Result.ErrorMessage("")); - return false; + return {}; } - return true; + std::string JsonError; + CbFieldIterator Root = LoadCompactBinaryFromJson(Result.AsText(), JsonError); + if (!Root || !JsonError.empty()) + { + return {}; + } + + for (CbFieldView Field : Root) + { + if (Field.GetName() == "Config"sv) + { + CbObjectView Config = Field.AsObjectView(); + if (Config) + { + return std::string(Config["NodeName"sv].AsString()); + } + } + } + + return {}; } void @@ -456,4 +537,191 @@ ServiceRegistration::RegistrationLoop() } } +////////////////////////////////////////////////////////////////////////// +// Tests + +#if ZEN_WITH_TESTS + +void +consul_forcelink() +{ +} + +struct MockHealthService : public HttpService +{ + std::atomic<bool> FailHealth{false}; + std::atomic<int> HealthCheckCount{0}; + + const char* BaseUri() const override { return "/"; } + + void HandleRequest(HttpServerRequest& Request) override + { + std::string_view Uri = Request.RelativeUri(); + if (Uri == "health/" || Uri == "health") + { + HealthCheckCount.fetch_add(1); + if (FailHealth.load()) + { + Request.WriteResponse(HttpResponseCode::ServiceUnavailable); + } + else + { + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok"); + } + return; + } + Request.WriteResponse(HttpResponseCode::NotFound); + } +}; + +struct TestHealthServer +{ + MockHealthService Mock; + + void Start() + { + m_TmpDir.emplace(); + m_Server = CreateHttpServer(HttpServerConfig{.ServerClass = "asio"}); + m_Port = m_Server->Initialize(0, m_TmpDir->Path() / "http"); + REQUIRE(m_Port != -1); + m_Server->RegisterService(Mock); + m_ServerThread = std::thread([this]() { m_Server->Run(false); }); + } + + int Port() const { return m_Port; } + + ~TestHealthServer() + { + if (m_Server) + { + m_Server->RequestExit(); + } + if (m_ServerThread.joinable()) + { + m_ServerThread.join(); + } + if (m_Server) + { + m_Server->Close(); + } + } + +private: + std::optional<ScopedTemporaryDirectory> m_TmpDir; + Ref<HttpServer> m_Server; + std::thread m_ServerThread; + int m_Port = -1; +}; + +static bool +WaitForCondition(std::function<bool()> Predicate, int TimeoutMs, int PollIntervalMs = 200) +{ + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs)) + { + if (Predicate()) + { + return true; + } + Sleep(PollIntervalMs); + } + return Predicate(); +} + +static std::string +GetCheckStatus(ConsulClient& Client, std::string_view ServiceId) +{ + using namespace std::literals; + + std::string JsonError; + CbFieldIterator ChecksRoot = LoadCompactBinaryFromJson(Client.GetAgentChecksJson(), JsonError); + if (!ChecksRoot || !JsonError.empty()) + { + return {}; + } + + for (CbFieldView F : ChecksRoot) + { + if (!F.IsObject()) + { + continue; + } + for (CbFieldView C : F.AsObjectView()) + { + CbObjectView Check = C.AsObjectView(); + if (Check["ServiceID"sv].AsString() == ServiceId) + { + return std::string(Check["Status"sv].AsString()); + } + } + } + return {}; +} + +TEST_SUITE_BEGIN("util.consul"); + +TEST_CASE("util.consul.service_lifecycle") +{ + ConsulProcess ConsulProc; + ConsulProc.SpawnConsulAgent(); + + TestHealthServer HealthServer; + HealthServer.Start(); + + ConsulClient Client({.BaseUri = "http://localhost:8500/"}); + + const std::string ServiceId = "test-health-svc"; + + ServiceRegistrationInfo Info; + Info.ServiceId = ServiceId; + Info.ServiceName = "zen-test-health"; + Info.Address = "127.0.0.1"; + Info.Port = static_cast<uint16_t>(HealthServer.Port()); + Info.HealthEndpoint = "health/"; + Info.HealthIntervalSeconds = 1; + Info.DeregisterAfterSeconds = 60; + + // Phase 1: Register and verify Consul sends health checks to our service + REQUIRE(Client.RegisterService(Info)); + REQUIRE(Client.HasService(ServiceId)); + + REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000)); + CHECK(HealthServer.Mock.HealthCheckCount.load() >= 1); + CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing"); + + // Phase 2: Explicit deregister + REQUIRE(Client.DeregisterService(ServiceId)); + CHECK_FALSE(Client.HasService(ServiceId)); + + // Phase 3: Register again, verify passing, then fail health and verify check goes critical + HealthServer.Mock.HealthCheckCount.store(0); + HealthServer.Mock.FailHealth.store(false); + + REQUIRE(Client.RegisterService(Info)); + REQUIRE(Client.HasService(ServiceId)); + + REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000)); + CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing"); + + HealthServer.Mock.FailHealth.store(true); + + // Wait for Consul to observe the failing check + REQUIRE(WaitForCondition([&]() { return GetCheckStatus(Client, ServiceId) == "critical"; }, 10000)); + CHECK_EQ(GetCheckStatus(Client, ServiceId), "critical"); + + // Phase 4: Explicit deregister while critical + REQUIRE(Client.DeregisterService(ServiceId)); + CHECK_FALSE(Client.HasService(ServiceId)); + + // Phase 5: Deregister an already-deregistered service - should not crash + Client.DeregisterService(ServiceId); + CHECK_FALSE(Client.HasService(ServiceId)); + + ConsulProc.StopConsulAgent(); +} + +TEST_SUITE_END(); + +#endif + } // namespace zen::consul diff --git a/src/zenutil/include/zenutil/consul.h b/src/zenutil/include/zenutil/consul.h index f48e5b212..7517ddd1e 100644 --- a/src/zenutil/include/zenutil/consul.h +++ b/src/zenutil/include/zenutil/consul.h @@ -62,6 +62,7 @@ public: private: static bool FindServiceInJson(std::string_view Json, std::string_view ServiceId); void ApplyCommonHeaders(HttpClient::KeyValueMap& InOutHeaderMap); + std::string GetNodeName(); Configuration m_Config; HttpClient m_HttpClient; @@ -116,4 +117,6 @@ private: void RegistrationLoop(); }; +void consul_forcelink(); + } // namespace zen::consul diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index 2ca380c75..516eec3a9 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -5,6 +5,7 @@ #if ZEN_WITH_TESTS # include <zenutil/cloud/imdscredentials.h> +# include <zenutil/consul.h> # include <zenutil/cloud/s3client.h> # include <zenutil/cloud/sigv4.h> # include <zenutil/config/commandlineoptions.h> @@ -20,6 +21,7 @@ zenutil_forcelinktests() { cache::rpcrecord_forcelink(); commandlineoptions_forcelink(); + consul::consul_forcelink(); imdscredentials_forcelink(); logstreamlistener_forcelink(); subprocessmanager_forcelink(); |