aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-04-21 10:43:57 +0200
committerGitHub Enterprise <[email protected]>2026-04-21 10:43:57 +0200
commitc4d4f9f1366e7ba33b587e80558b204cfdf62c59 (patch)
treee43d36eecfda95e5cafc0ccc7bedeabe788d9dd3
parentbuilds download "default" part if nothing is specified (#994) (diff)
downloadarchived-zen-c4d4f9f1366e7ba33b587e80558b204cfdf62c59.tar.xz
archived-zen-c4d4f9f1366e7ba33b587e80558b204cfdf62c59.zip
async consul register/deregister (#992)
- Improvement: Hub Consul service registration and deregistration are now dispatched on a dedicated background thread so instance state transitions no longer stall when the Consul agent is slow or unreachable
-rw-r--r--CHANGELOG.md1
-rw-r--r--src/zenserver-test/hub-tests.cpp3
-rw-r--r--src/zenserver/hub/zenhubserver.cpp88
-rw-r--r--src/zenutil/consul/consul.cpp167
-rw-r--r--src/zenutil/include/zenutil/consul.h36
5 files changed, 228 insertions, 67 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ddfc47a5b..bb097b684 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -29,6 +29,7 @@
- Improvement: Sensitive values in command line arguments (credentials, tokens, passwords, connection strings) are scrubbed before being written to logs or sent to Sentry
- Improvement: IMDS credential refresh log message no longer exposes the first 8 characters of the AWS AccessKeyId
- Improvement: `zen builds download`, `zen builds ls`, and `zen builds prime-cache` now default to the part named `default` when no `--build-part-id`/`--build-part-name` is given (previously all parts were selected). Pass `--build-part-name=*` to select all parts.
+- Improvement: Hub Consul service registration and deregistration are now dispatched on a dedicated background thread so instance state transitions no longer stall when the Consul agent is slow or unreachable
- Bugfix: `builds download` partial-block fetch decisions now account for build storage host latency
- Bugfix: Transfer rate displays in `builds` commands now smooth correctly
- Bugfix: Structured cache PUT errors with a detail body no longer write the HTTP response twice
diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp
index 35a840e5d..e83e18446 100644
--- a/src/zenserver-test/hub-tests.cpp
+++ b/src/zenserver-test/hub-tests.cpp
@@ -663,10 +663,11 @@ TEST_CASE("hub.consul.provision.registration")
Result = HubClient.Post("modules/testmod/deprovision");
REQUIRE(Result);
REQUIRE(WaitForConsulService(Client, "testmod", false, 10000));
+ REQUIRE(WaitForModuleGone(HubClient, "testmod"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout);
- CHECK(!ModClient.Get("/health/"));
+ CHECK(WaitForPortUnreachable(ModClient));
}
}
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index 1390d112e..ebc2cf2f1 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -468,47 +468,57 @@ ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId,
return;
}
- if (NewState == HubInstanceState::Provisioning || NewState == HubInstanceState::Provisioned)
+ switch (NewState)
{
- consul::ServiceRegistrationInfo ServiceInfo{
- .ServiceId = std::string(ModuleId),
- .ServiceName = "zen-storage",
- .Port = Info.Port,
- .HealthEndpoint = "health",
- .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)),
- std::make_pair("zen-hub", std::string(HubInstanceId)),
- std::make_pair("version", std::string(ZEN_CFG_VERSION))},
- .HealthIntervalSeconds = NewState == HubInstanceState::Provisioning ? 0u : m_ConsulHealthIntervalSeconds,
- .DeregisterAfterSeconds = NewState == HubInstanceState::Provisioning ? 0u : m_ConsulDeregisterAfterSeconds,
- .InitialStatus = NewState == HubInstanceState::Provisioned ? "passing" : ""};
-
- if (!m_ConsulClient->RegisterService(ServiceInfo))
- {
- ZEN_WARN("Failed to register storage server instance for module '{}' with Consul, continuing anyway", ModuleId);
- }
- else
- {
- ZEN_INFO("Registered storage server instance for module '{}' at port {} with Consul as '{}'",
- ModuleId,
- Info.Port,
- ServiceInfo.ServiceName);
- }
- }
- else if (NewState == HubInstanceState::Unprovisioned)
- {
- if (!m_ConsulClient->DeregisterService(ModuleId))
- {
- ZEN_WARN("Failed to deregister storage server instance for module '{}' at port {} from Consul, continuing anyway",
- ModuleId,
- Info.Port);
- }
- else
- {
- ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port);
- }
+ case HubInstanceState::Provisioning:
+ case HubInstanceState::Waking:
+ case HubInstanceState::Recovering:
+ case HubInstanceState::Provisioned:
+ {
+ const bool IsProvisioned = NewState == HubInstanceState::Provisioned;
+
+ consul::ServiceRegistrationInfo ServiceInfo{
+ .ServiceId = std::string(ModuleId),
+ .ServiceName = "zen-storage",
+ .Port = Info.Port,
+ .HealthEndpoint = "health",
+ .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)),
+ std::make_pair("zen-hub", std::string(HubInstanceId)),
+ std::make_pair("version", std::string(ZEN_CFG_VERSION))},
+ .HealthIntervalSeconds = IsProvisioned ? m_ConsulHealthIntervalSeconds : 0u,
+ .DeregisterAfterSeconds = IsProvisioned ? m_ConsulDeregisterAfterSeconds : 0u,
+ .InitialStatus = IsProvisioned ? "passing" : ""};
+
+ m_ConsulClient->RegisterService(ServiceInfo);
+ ZEN_INFO("Submitted Consul registration for storage server instance for module '{}' at port {} as '{}'",
+ ModuleId,
+ Info.Port,
+ ServiceInfo.ServiceName);
+ break;
+ }
+ case HubInstanceState::Deprovisioning:
+ case HubInstanceState::Hibernating:
+ case HubInstanceState::Obliterating:
+ case HubInstanceState::Crashed:
+ case HubInstanceState::Hibernated:
+ case HubInstanceState::Unprovisioned:
+ {
+ // A Consul registration is "live" while the module is in a register-state
+ // (Provisioning / Waking / Recovering / Provisioned). Deregister once when
+ // we leave a register-state into any non-register-state
+ const bool WasRegisteredState =
+ PreviousState == HubInstanceState::Provisioning || PreviousState == HubInstanceState::Waking ||
+ PreviousState == HubInstanceState::Recovering || PreviousState == HubInstanceState::Provisioned;
+ if (WasRegisteredState)
+ {
+ m_ConsulClient->DeregisterService(ModuleId);
+ ZEN_INFO("Submitted Consul deregistration for storage server instance for module '{}' at port {}", ModuleId, Info.Port);
+ }
+ }
+ break;
+ default:
+ break;
}
- // Transitional states (Waking, Recovering, Crashed) and stable states
- // not handled above (Hibernated) are intentionally ignored by Consul.
}
int
diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp
index c372b131d..3d16a9188 100644
--- a/src/zenutil/consul/consul.cpp
+++ b/src/zenutil/consul/consul.cpp
@@ -16,7 +16,11 @@
#include <zenhttp/httpserver.h>
+#include <unordered_set>
+
+ZEN_THIRD_PARTY_INCLUDES_START
#include <fmt/format.h>
+ZEN_THIRD_PARTY_INCLUDES_END
namespace zen::consul {
@@ -111,12 +115,30 @@ ConsulProcess::StopConsulAgent()
//////////////////////////////////////////////////////////////////////////
-ConsulClient::ConsulClient(const Configuration& Config) : m_Config(Config), m_HttpClient(m_Config.BaseUri)
+ConsulClient::ConsulClient(const Configuration& Config)
+: m_Config(Config)
+, m_HttpClient(m_Config.BaseUri,
+ HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds{500}, .Timeout = std::chrono::milliseconds{500}},
+ [this] { return m_Stop.load(); })
{
+ m_Worker = std::thread(&ConsulClient::WorkerLoop, this);
}
ConsulClient::~ConsulClient()
{
+ try
+ {
+ m_Stop.store(true);
+ m_Wakeup.Set();
+ if (m_Worker.joinable())
+ {
+ m_Worker.join();
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("ConsulClient::~ConsulClient threw exception: {}", Ex.what());
+ }
}
void
@@ -162,9 +184,27 @@ ConsulClient::DeleteKey(std::string_view Key)
}
}
-bool
+void
ConsulClient::RegisterService(const ServiceRegistrationInfo& Info)
{
+ PendingOp Op{PendingOp::Kind::Register, Info};
+ m_QueueLock.WithExclusiveLock([&] { m_Queue.push_back(std::move(Op)); });
+ m_Wakeup.Set();
+}
+
+void
+ConsulClient::DeregisterService(std::string_view ServiceId)
+{
+ PendingOp Op;
+ Op.Type = PendingOp::Kind::Deregister;
+ Op.Info.ServiceId = std::string(ServiceId);
+ m_QueueLock.WithExclusiveLock([&] { m_Queue.push_back(std::move(Op)); });
+ m_Wakeup.Set();
+}
+
+bool
+ConsulClient::DoRegister(const ServiceRegistrationInfo& Info)
+{
using namespace std::literals;
HttpClient::KeyValueMap AdditionalHeaders;
@@ -223,7 +263,7 @@ ConsulClient::RegisterService(const ServiceRegistrationInfo& Info)
if (!Result)
{
- ZEN_WARN("ConsulClient::RegisterService() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage(""));
+ ZEN_WARN("ConsulClient::DoRegister() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage(""));
return false;
}
@@ -231,7 +271,7 @@ ConsulClient::RegisterService(const ServiceRegistrationInfo& Info)
}
bool
-ConsulClient::DeregisterService(std::string_view ServiceId)
+ConsulClient::DoDeregister(std::string_view ServiceId)
{
using namespace std::literals;
@@ -264,13 +304,13 @@ ConsulClient::DeregisterService(std::string_view ServiceId)
HttpClient::Response CatalogResult = m_HttpClient.Put("v1/catalog/deregister", PayloadBuffer, AdditionalHeaders);
if (CatalogResult)
{
- ZEN_INFO("ConsulClient::DeregisterService() deregistered service '{}' via catalog fallback (agent error: {})",
+ ZEN_INFO("ConsulClient::DoDeregister() deregistered service '{}' via catalog fallback (agent error: {})",
ServiceId,
Result.ErrorMessage(""));
return true;
}
- ZEN_WARN("ConsulClient::DeregisterService() failed to deregister service '{}' (agent: {}, catalog: {})",
+ ZEN_WARN("ConsulClient::DoDeregister() failed to deregister service '{}' (agent: {}, catalog: {})",
ServiceId,
Result.ErrorMessage(""),
CatalogResult.ErrorMessage(""));
@@ -278,7 +318,7 @@ ConsulClient::DeregisterService(std::string_view ServiceId)
else
{
ZEN_WARN(
- "ConsulClient::DeregisterService() failed to deregister service '{}' (agent: {}, could not determine node name for catalog "
+ "ConsulClient::DoDeregister() failed to deregister service '{}' (agent: {}, could not determine node name for catalog "
"fallback)",
ServiceId,
Result.ErrorMessage(""));
@@ -390,8 +430,10 @@ ConsulClient::WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int
HttpClient::KeyValueMap AdditionalHeaders;
ApplyCommonHeaders(AdditionalHeaders);
- // Note: m_HttpClient uses unlimited HTTP timeout (Timeout{0}); the WaitSeconds parameter
- // governs the server-side bound on the blocking query. Do not add a separate client timeout.
+ // Note: m_HttpClient runs with a 500ms client-side timeout to keep Register/Deregister from
+ // stalling the hub state machine when the agent is unreachable. That bound applies here too:
+ // WaitSeconds is effectively capped at ~500ms regardless of the argument, so callers must
+ // treat this as a short-poll and loop rather than rely on a true blocking query.
HttpClient::KeyValueMap Parameters({{"index", std::to_string(InOutIndex)}, {"wait", fmt::format("{}s", WaitSeconds)}});
HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", AdditionalHeaders, Parameters);
if (!Result)
@@ -440,6 +482,82 @@ ConsulClient::GetAgentChecksJson()
return Result.ToText();
}
+void
+ConsulClient::WorkerLoop()
+{
+ SetCurrentThreadName("ConsulClient");
+
+ std::unordered_set<std::string> RegisteredServices;
+
+ while (true)
+ {
+ m_Wakeup.Wait(-1);
+ m_Wakeup.Reset();
+
+ const bool Stopping = m_Stop.load();
+
+ std::vector<PendingOp> Batch;
+ m_QueueLock.WithExclusiveLock([&] { Batch.swap(m_Queue); });
+
+ for (size_t Index = 0; Index < Batch.size(); ++Index)
+ {
+ PendingOp& Op = Batch[Index];
+
+ if (Stopping && Op.Type == PendingOp::Kind::Register)
+ {
+ continue;
+ }
+
+ const std::string_view OpName = (Op.Type == PendingOp::Kind::Register) ? "register" : "deregister";
+
+ try
+ {
+ if (Op.Type == PendingOp::Kind::Register)
+ {
+ bool Ok = DoRegister(Op.Info);
+ if (Ok)
+ {
+ RegisteredServices.insert(Op.Info.ServiceId);
+ }
+ else
+ {
+ const size_t Remaining = Batch.size() - Index - 1;
+ ZEN_WARN("ConsulClient worker: {} for '{}' failed; dropping {} remaining queued op(s)",
+ OpName,
+ Op.Info.ServiceId,
+ Remaining);
+ break;
+ }
+ }
+ else
+ {
+ ZEN_ASSERT(Op.Type == PendingOp::Kind::Deregister);
+ if (RegisteredServices.erase(Op.Info.ServiceId) == 1u)
+ {
+ if (!DoDeregister(Op.Info.ServiceId))
+ {
+ ZEN_WARN("ConsulClient worker: {} for '{}' failed", OpName, Op.Info.ServiceId);
+ }
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("ConsulClient worker: {} for '{}' threw: {}", OpName, Op.Info.ServiceId, Ex.what());
+ }
+ catch (...)
+ {
+ ZEN_WARN("ConsulClient worker: {} for '{}' threw unknown exception", OpName, Op.Info.ServiceId);
+ }
+ }
+
+ if (Stopping)
+ {
+ break;
+ }
+ }
+}
+
//////////////////////////////////////////////////////////////////////////
ServiceRegistration::ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info) : m_Client(Client), m_Info(Info)
@@ -460,7 +578,7 @@ ServiceRegistration::~ServiceRegistration()
if (m_IsRegistered.load())
{
- if (!m_Client->DeregisterService(m_Info.ServiceId))
+ if (!m_Client->DoDeregister(m_Info.ServiceId))
{
ZEN_WARN("ServiceRegistration: Failed to deregister service '{}' during cleanup", m_Info.ServiceId);
}
@@ -500,7 +618,7 @@ ServiceRegistration::RegistrationLoop()
// Try to register with exponential backoff
for (int Attempt = 0; Attempt < MaxAttempts; ++Attempt)
{
- if (m_Client->RegisterService(m_Info))
+ if (m_Client->DoRegister(m_Info))
{
Succeeded = true;
break;
@@ -512,7 +630,7 @@ ServiceRegistration::RegistrationLoop()
}
}
- if (Succeeded || m_Client->RegisterService(m_Info))
+ if (Succeeded || m_Client->DoRegister(m_Info))
{
break;
}
@@ -685,17 +803,19 @@ TEST_CASE("util.consul.service_lifecycle")
Info.HealthIntervalSeconds = 1;
Info.DeregisterAfterSeconds = 60;
+ // Register/Deregister are async; wait for the worker to propagate to Consul.
+
// Phase 1: Register and verify Consul sends health checks to our service
- REQUIRE(Client.RegisterService(Info));
- REQUIRE(Client.HasService(ServiceId));
+ Client.RegisterService(Info);
+ REQUIRE(WaitForCondition([&]() { return Client.HasService(ServiceId); }, 10000, 50));
REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000, 50));
CHECK(HealthServer.Mock.HealthCheckCount.load() >= 1);
CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing");
// Phase 2: Explicit deregister
- REQUIRE(Client.DeregisterService(ServiceId));
- CHECK_FALSE(Client.HasService(ServiceId));
+ Client.DeregisterService(ServiceId);
+ REQUIRE(WaitForCondition([&]() { return !Client.HasService(ServiceId); }, 10000, 50));
// Phase 3: Register with InitialStatus, verify immediately passing before any health check fires,
// then fail health and verify check goes critical
@@ -703,10 +823,13 @@ TEST_CASE("util.consul.service_lifecycle")
HealthServer.Mock.FailHealth.store(false);
Info.InitialStatus = "passing";
- REQUIRE(Client.RegisterService(Info));
- REQUIRE(Client.HasService(ServiceId));
+ Client.RegisterService(Info);
+ REQUIRE(WaitForCondition([&]() { return Client.HasService(ServiceId); }, 10000, 50));
- CHECK_EQ(HealthServer.Mock.HealthCheckCount.load(), 0);
+ // Registration is async; by the time HasService observes the service the 1s health interval
+ // may already have fired, so we can't robustly assert HealthCheckCount==0. The "passing" status
+ // below still proves InitialStatus applied (it can only be "passing" via InitialStatus or a
+ // successful health check - both are acceptable demonstrations).
CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing");
REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000, 50));
@@ -719,12 +842,12 @@ TEST_CASE("util.consul.service_lifecycle")
CHECK_EQ(GetCheckStatus(Client, ServiceId), "critical");
// Phase 4: Explicit deregister while critical
- REQUIRE(Client.DeregisterService(ServiceId));
- CHECK_FALSE(Client.HasService(ServiceId));
+ Client.DeregisterService(ServiceId);
+ REQUIRE(WaitForCondition([&]() { return !Client.HasService(ServiceId); }, 10000, 50));
// Phase 5: Deregister an already-deregistered service - should not crash
Client.DeregisterService(ServiceId);
- CHECK_FALSE(Client.HasService(ServiceId));
+ REQUIRE(WaitForCondition([&]() { return !Client.HasService(ServiceId); }, 10000, 50));
ConsulProc.StopConsulAgent();
}
diff --git a/src/zenutil/include/zenutil/consul.h b/src/zenutil/include/zenutil/consul.h
index c3d0e5f1d..4efb10263 100644
--- a/src/zenutil/include/zenutil/consul.h
+++ b/src/zenutil/include/zenutil/consul.h
@@ -3,6 +3,7 @@
#pragma once
#include <zenbase/zenbase.h>
+#include <zencore/thread.h>
#include <zenhttp/httpclient.h>
#include <atomic>
@@ -10,6 +11,7 @@
#include <string>
#include <string_view>
#include <thread>
+#include <vector>
namespace zen::consul {
@@ -46,8 +48,15 @@ public:
std::string GetKeyValue(std::string_view Key);
void DeleteKey(std::string_view Key);
- bool RegisterService(const ServiceRegistrationInfo& Info);
- bool DeregisterService(std::string_view ServiceId);
+ // Async. Enqueue onto the worker thread and return immediately.
+ // Transport outcome is not reported to the caller.
+ void RegisterService(const ServiceRegistrationInfo& Info);
+ void DeregisterService(std::string_view ServiceId);
+
+ // Synchronous counterparts. Block on the HTTP call and return true on
+ // success. Use when the caller needs a result (e.g. a retry loop).
+ bool DoRegister(const ServiceRegistrationInfo& Info);
+ bool DoDeregister(std::string_view ServiceId);
// Query methods for testing
bool HasService(std::string_view ServiceId);
@@ -61,12 +70,29 @@ public:
bool WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int WaitSeconds);
private:
+ struct PendingOp
+ {
+ enum class Kind
+ {
+ Register,
+ Deregister
+ };
+ Kind Type;
+ ServiceRegistrationInfo Info;
+ };
+
static bool FindServiceInJson(std::string_view Json, std::string_view ServiceId);
void ApplyCommonHeaders(HttpClient::KeyValueMap& InOutHeaderMap);
std::string GetNodeName();
-
- Configuration m_Config;
- HttpClient m_HttpClient;
+ void WorkerLoop();
+
+ Configuration m_Config;
+ std::atomic<bool> m_Stop{false};
+ HttpClient m_HttpClient;
+ RwLock m_QueueLock;
+ std::vector<PendingOp> m_Queue;
+ Event m_Wakeup;
+ std::thread m_Worker;
};
class ConsulProcess