diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver-test/hub-tests.cpp | 3 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 88 | ||||
| -rw-r--r-- | src/zenutil/consul/consul.cpp | 167 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/consul.h | 36 |
4 files changed, 227 insertions, 67 deletions
diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp index 35a840e5d..e83e18446 100644 --- a/src/zenserver-test/hub-tests.cpp +++ b/src/zenserver-test/hub-tests.cpp @@ -663,10 +663,11 @@ TEST_CASE("hub.consul.provision.registration") Result = HubClient.Post("modules/testmod/deprovision"); REQUIRE(Result); REQUIRE(WaitForConsulService(Client, "testmod", false, 10000)); + REQUIRE(WaitForModuleGone(HubClient, "testmod")); { HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout); - CHECK(!ModClient.Get("/health/")); + CHECK(WaitForPortUnreachable(ModClient)); } } diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index 1390d112e..ebc2cf2f1 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -468,47 +468,57 @@ ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId, return; } - if (NewState == HubInstanceState::Provisioning || NewState == HubInstanceState::Provisioned) + switch (NewState) { - consul::ServiceRegistrationInfo ServiceInfo{ - .ServiceId = std::string(ModuleId), - .ServiceName = "zen-storage", - .Port = Info.Port, - .HealthEndpoint = "health", - .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)), - std::make_pair("zen-hub", std::string(HubInstanceId)), - std::make_pair("version", std::string(ZEN_CFG_VERSION))}, - .HealthIntervalSeconds = NewState == HubInstanceState::Provisioning ? 0u : m_ConsulHealthIntervalSeconds, - .DeregisterAfterSeconds = NewState == HubInstanceState::Provisioning ? 0u : m_ConsulDeregisterAfterSeconds, - .InitialStatus = NewState == HubInstanceState::Provisioned ? "passing" : ""}; - - if (!m_ConsulClient->RegisterService(ServiceInfo)) - { - ZEN_WARN("Failed to register storage server instance for module '{}' with Consul, continuing anyway", ModuleId); - } - else - { - ZEN_INFO("Registered storage server instance for module '{}' at port {} with Consul as '{}'", - ModuleId, - Info.Port, - ServiceInfo.ServiceName); - } - } - else if (NewState == HubInstanceState::Unprovisioned) - { - if (!m_ConsulClient->DeregisterService(ModuleId)) - { - ZEN_WARN("Failed to deregister storage server instance for module '{}' at port {} from Consul, continuing anyway", - ModuleId, - Info.Port); - } - else - { - ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port); - } + case HubInstanceState::Provisioning: + case HubInstanceState::Waking: + case HubInstanceState::Recovering: + case HubInstanceState::Provisioned: + { + const bool IsProvisioned = NewState == HubInstanceState::Provisioned; + + consul::ServiceRegistrationInfo ServiceInfo{ + .ServiceId = std::string(ModuleId), + .ServiceName = "zen-storage", + .Port = Info.Port, + .HealthEndpoint = "health", + .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)), + std::make_pair("zen-hub", std::string(HubInstanceId)), + std::make_pair("version", std::string(ZEN_CFG_VERSION))}, + .HealthIntervalSeconds = IsProvisioned ? m_ConsulHealthIntervalSeconds : 0u, + .DeregisterAfterSeconds = IsProvisioned ? m_ConsulDeregisterAfterSeconds : 0u, + .InitialStatus = IsProvisioned ? "passing" : ""}; + + m_ConsulClient->RegisterService(ServiceInfo); + ZEN_INFO("Submitted Consul registration for storage server instance for module '{}' at port {} as '{}'", + ModuleId, + Info.Port, + ServiceInfo.ServiceName); + break; + } + case HubInstanceState::Deprovisioning: + case HubInstanceState::Hibernating: + case HubInstanceState::Obliterating: + case HubInstanceState::Crashed: + case HubInstanceState::Hibernated: + case HubInstanceState::Unprovisioned: + { + // A Consul registration is "live" while the module is in a register-state + // (Provisioning / Waking / Recovering / Provisioned). Deregister once when + // we leave a register-state into any non-register-state + const bool WasRegisteredState = + PreviousState == HubInstanceState::Provisioning || PreviousState == HubInstanceState::Waking || + PreviousState == HubInstanceState::Recovering || PreviousState == HubInstanceState::Provisioned; + if (WasRegisteredState) + { + m_ConsulClient->DeregisterService(ModuleId); + ZEN_INFO("Submitted Consul deregistration for storage server instance for module '{}' at port {}", ModuleId, Info.Port); + } + } + break; + default: + break; } - // Transitional states (Waking, Recovering, Crashed) and stable states - // not handled above (Hibernated) are intentionally ignored by Consul. } int diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp index c372b131d..3d16a9188 100644 --- a/src/zenutil/consul/consul.cpp +++ b/src/zenutil/consul/consul.cpp @@ -16,7 +16,11 @@ #include <zenhttp/httpserver.h> +#include <unordered_set> + +ZEN_THIRD_PARTY_INCLUDES_START #include <fmt/format.h> +ZEN_THIRD_PARTY_INCLUDES_END namespace zen::consul { @@ -111,12 +115,30 @@ ConsulProcess::StopConsulAgent() ////////////////////////////////////////////////////////////////////////// -ConsulClient::ConsulClient(const Configuration& Config) : m_Config(Config), m_HttpClient(m_Config.BaseUri) +ConsulClient::ConsulClient(const Configuration& Config) +: m_Config(Config) +, m_HttpClient(m_Config.BaseUri, + HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds{500}, .Timeout = std::chrono::milliseconds{500}}, + [this] { return m_Stop.load(); }) { + m_Worker = std::thread(&ConsulClient::WorkerLoop, this); } ConsulClient::~ConsulClient() { + try + { + m_Stop.store(true); + m_Wakeup.Set(); + if (m_Worker.joinable()) + { + m_Worker.join(); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("ConsulClient::~ConsulClient threw exception: {}", Ex.what()); + } } void @@ -162,9 +184,27 @@ ConsulClient::DeleteKey(std::string_view Key) } } -bool +void ConsulClient::RegisterService(const ServiceRegistrationInfo& Info) { + PendingOp Op{PendingOp::Kind::Register, Info}; + m_QueueLock.WithExclusiveLock([&] { m_Queue.push_back(std::move(Op)); }); + m_Wakeup.Set(); +} + +void +ConsulClient::DeregisterService(std::string_view ServiceId) +{ + PendingOp Op; + Op.Type = PendingOp::Kind::Deregister; + Op.Info.ServiceId = std::string(ServiceId); + m_QueueLock.WithExclusiveLock([&] { m_Queue.push_back(std::move(Op)); }); + m_Wakeup.Set(); +} + +bool +ConsulClient::DoRegister(const ServiceRegistrationInfo& Info) +{ using namespace std::literals; HttpClient::KeyValueMap AdditionalHeaders; @@ -223,7 +263,7 @@ ConsulClient::RegisterService(const ServiceRegistrationInfo& Info) if (!Result) { - ZEN_WARN("ConsulClient::RegisterService() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage("")); + ZEN_WARN("ConsulClient::DoRegister() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage("")); return false; } @@ -231,7 +271,7 @@ ConsulClient::RegisterService(const ServiceRegistrationInfo& Info) } bool -ConsulClient::DeregisterService(std::string_view ServiceId) +ConsulClient::DoDeregister(std::string_view ServiceId) { using namespace std::literals; @@ -264,13 +304,13 @@ ConsulClient::DeregisterService(std::string_view ServiceId) HttpClient::Response CatalogResult = m_HttpClient.Put("v1/catalog/deregister", PayloadBuffer, AdditionalHeaders); if (CatalogResult) { - ZEN_INFO("ConsulClient::DeregisterService() deregistered service '{}' via catalog fallback (agent error: {})", + ZEN_INFO("ConsulClient::DoDeregister() deregistered service '{}' via catalog fallback (agent error: {})", ServiceId, Result.ErrorMessage("")); return true; } - ZEN_WARN("ConsulClient::DeregisterService() failed to deregister service '{}' (agent: {}, catalog: {})", + ZEN_WARN("ConsulClient::DoDeregister() failed to deregister service '{}' (agent: {}, catalog: {})", ServiceId, Result.ErrorMessage(""), CatalogResult.ErrorMessage("")); @@ -278,7 +318,7 @@ ConsulClient::DeregisterService(std::string_view ServiceId) else { ZEN_WARN( - "ConsulClient::DeregisterService() failed to deregister service '{}' (agent: {}, could not determine node name for catalog " + "ConsulClient::DoDeregister() failed to deregister service '{}' (agent: {}, could not determine node name for catalog " "fallback)", ServiceId, Result.ErrorMessage("")); @@ -390,8 +430,10 @@ ConsulClient::WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int HttpClient::KeyValueMap AdditionalHeaders; ApplyCommonHeaders(AdditionalHeaders); - // Note: m_HttpClient uses unlimited HTTP timeout (Timeout{0}); the WaitSeconds parameter - // governs the server-side bound on the blocking query. Do not add a separate client timeout. + // Note: m_HttpClient runs with a 500ms client-side timeout to keep Register/Deregister from + // stalling the hub state machine when the agent is unreachable. That bound applies here too: + // WaitSeconds is effectively capped at ~500ms regardless of the argument, so callers must + // treat this as a short-poll and loop rather than rely on a true blocking query. HttpClient::KeyValueMap Parameters({{"index", std::to_string(InOutIndex)}, {"wait", fmt::format("{}s", WaitSeconds)}}); HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", AdditionalHeaders, Parameters); if (!Result) @@ -440,6 +482,82 @@ ConsulClient::GetAgentChecksJson() return Result.ToText(); } +void +ConsulClient::WorkerLoop() +{ + SetCurrentThreadName("ConsulClient"); + + std::unordered_set<std::string> RegisteredServices; + + while (true) + { + m_Wakeup.Wait(-1); + m_Wakeup.Reset(); + + const bool Stopping = m_Stop.load(); + + std::vector<PendingOp> Batch; + m_QueueLock.WithExclusiveLock([&] { Batch.swap(m_Queue); }); + + for (size_t Index = 0; Index < Batch.size(); ++Index) + { + PendingOp& Op = Batch[Index]; + + if (Stopping && Op.Type == PendingOp::Kind::Register) + { + continue; + } + + const std::string_view OpName = (Op.Type == PendingOp::Kind::Register) ? "register" : "deregister"; + + try + { + if (Op.Type == PendingOp::Kind::Register) + { + bool Ok = DoRegister(Op.Info); + if (Ok) + { + RegisteredServices.insert(Op.Info.ServiceId); + } + else + { + const size_t Remaining = Batch.size() - Index - 1; + ZEN_WARN("ConsulClient worker: {} for '{}' failed; dropping {} remaining queued op(s)", + OpName, + Op.Info.ServiceId, + Remaining); + break; + } + } + else + { + ZEN_ASSERT(Op.Type == PendingOp::Kind::Deregister); + if (RegisteredServices.erase(Op.Info.ServiceId) == 1u) + { + if (!DoDeregister(Op.Info.ServiceId)) + { + ZEN_WARN("ConsulClient worker: {} for '{}' failed", OpName, Op.Info.ServiceId); + } + } + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("ConsulClient worker: {} for '{}' threw: {}", OpName, Op.Info.ServiceId, Ex.what()); + } + catch (...) + { + ZEN_WARN("ConsulClient worker: {} for '{}' threw unknown exception", OpName, Op.Info.ServiceId); + } + } + + if (Stopping) + { + break; + } + } +} + ////////////////////////////////////////////////////////////////////////// ServiceRegistration::ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info) : m_Client(Client), m_Info(Info) @@ -460,7 +578,7 @@ ServiceRegistration::~ServiceRegistration() if (m_IsRegistered.load()) { - if (!m_Client->DeregisterService(m_Info.ServiceId)) + if (!m_Client->DoDeregister(m_Info.ServiceId)) { ZEN_WARN("ServiceRegistration: Failed to deregister service '{}' during cleanup", m_Info.ServiceId); } @@ -500,7 +618,7 @@ ServiceRegistration::RegistrationLoop() // Try to register with exponential backoff for (int Attempt = 0; Attempt < MaxAttempts; ++Attempt) { - if (m_Client->RegisterService(m_Info)) + if (m_Client->DoRegister(m_Info)) { Succeeded = true; break; @@ -512,7 +630,7 @@ ServiceRegistration::RegistrationLoop() } } - if (Succeeded || m_Client->RegisterService(m_Info)) + if (Succeeded || m_Client->DoRegister(m_Info)) { break; } @@ -685,17 +803,19 @@ TEST_CASE("util.consul.service_lifecycle") Info.HealthIntervalSeconds = 1; Info.DeregisterAfterSeconds = 60; + // Register/Deregister are async; wait for the worker to propagate to Consul. + // Phase 1: Register and verify Consul sends health checks to our service - REQUIRE(Client.RegisterService(Info)); - REQUIRE(Client.HasService(ServiceId)); + Client.RegisterService(Info); + REQUIRE(WaitForCondition([&]() { return Client.HasService(ServiceId); }, 10000, 50)); REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000, 50)); CHECK(HealthServer.Mock.HealthCheckCount.load() >= 1); CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing"); // Phase 2: Explicit deregister - REQUIRE(Client.DeregisterService(ServiceId)); - CHECK_FALSE(Client.HasService(ServiceId)); + Client.DeregisterService(ServiceId); + REQUIRE(WaitForCondition([&]() { return !Client.HasService(ServiceId); }, 10000, 50)); // Phase 3: Register with InitialStatus, verify immediately passing before any health check fires, // then fail health and verify check goes critical @@ -703,10 +823,13 @@ TEST_CASE("util.consul.service_lifecycle") HealthServer.Mock.FailHealth.store(false); Info.InitialStatus = "passing"; - REQUIRE(Client.RegisterService(Info)); - REQUIRE(Client.HasService(ServiceId)); + Client.RegisterService(Info); + REQUIRE(WaitForCondition([&]() { return Client.HasService(ServiceId); }, 10000, 50)); - CHECK_EQ(HealthServer.Mock.HealthCheckCount.load(), 0); + // Registration is async; by the time HasService observes the service the 1s health interval + // may already have fired, so we can't robustly assert HealthCheckCount==0. The "passing" status + // below still proves InitialStatus applied (it can only be "passing" via InitialStatus or a + // successful health check - both are acceptable demonstrations). CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing"); REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000, 50)); @@ -719,12 +842,12 @@ TEST_CASE("util.consul.service_lifecycle") CHECK_EQ(GetCheckStatus(Client, ServiceId), "critical"); // Phase 4: Explicit deregister while critical - REQUIRE(Client.DeregisterService(ServiceId)); - CHECK_FALSE(Client.HasService(ServiceId)); + Client.DeregisterService(ServiceId); + REQUIRE(WaitForCondition([&]() { return !Client.HasService(ServiceId); }, 10000, 50)); // Phase 5: Deregister an already-deregistered service - should not crash Client.DeregisterService(ServiceId); - CHECK_FALSE(Client.HasService(ServiceId)); + REQUIRE(WaitForCondition([&]() { return !Client.HasService(ServiceId); }, 10000, 50)); ConsulProc.StopConsulAgent(); } diff --git a/src/zenutil/include/zenutil/consul.h b/src/zenutil/include/zenutil/consul.h index c3d0e5f1d..4efb10263 100644 --- a/src/zenutil/include/zenutil/consul.h +++ b/src/zenutil/include/zenutil/consul.h @@ -3,6 +3,7 @@ #pragma once #include <zenbase/zenbase.h> +#include <zencore/thread.h> #include <zenhttp/httpclient.h> #include <atomic> @@ -10,6 +11,7 @@ #include <string> #include <string_view> #include <thread> +#include <vector> namespace zen::consul { @@ -46,8 +48,15 @@ public: std::string GetKeyValue(std::string_view Key); void DeleteKey(std::string_view Key); - bool RegisterService(const ServiceRegistrationInfo& Info); - bool DeregisterService(std::string_view ServiceId); + // Async. Enqueue onto the worker thread and return immediately. + // Transport outcome is not reported to the caller. + void RegisterService(const ServiceRegistrationInfo& Info); + void DeregisterService(std::string_view ServiceId); + + // Synchronous counterparts. Block on the HTTP call and return true on + // success. Use when the caller needs a result (e.g. a retry loop). + bool DoRegister(const ServiceRegistrationInfo& Info); + bool DoDeregister(std::string_view ServiceId); // Query methods for testing bool HasService(std::string_view ServiceId); @@ -61,12 +70,29 @@ public: bool WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int WaitSeconds); private: + struct PendingOp + { + enum class Kind + { + Register, + Deregister + }; + Kind Type; + ServiceRegistrationInfo Info; + }; + static bool FindServiceInJson(std::string_view Json, std::string_view ServiceId); void ApplyCommonHeaders(HttpClient::KeyValueMap& InOutHeaderMap); std::string GetNodeName(); - - Configuration m_Config; - HttpClient m_HttpClient; + void WorkerLoop(); + + Configuration m_Config; + std::atomic<bool> m_Stop{false}; + HttpClient m_HttpClient; + RwLock m_QueueLock; + std::vector<PendingOp> m_Queue; + Event m_Wakeup; + std::thread m_Worker; }; class ConsulProcess |