aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-11 09:45:31 +0100
committerGitHub Enterprise <[email protected]>2026-03-11 09:45:31 +0100
commite9d04250225a430cffed28e6a49299e3da542f97 (patch)
tree7f99594bd930c072e159458319e822bbb18794f7 /src
parentminor zenstore/blockstore fixes (#821) (diff)
downloadzen-e9d04250225a430cffed28e6a49299e3da542f97.tar.xz
zen-e9d04250225a430cffed28e6a49299e3da542f97.zip
hub consul integration (#820)
- Feature: Basic consul integration for zenserver hub mode, restricted to host local consul agent and register/deregister of services - Feature: Added new options to zenserver hub mode - `consul-endpoint` - Consul endpoint URL for service registration (empty = disabled) - `hub-base-port-number` - Base port number for provisioned instances - `hub-instance-limit` - Maximum number of provisioned instances for this hub - `hub-use-job-object` - Enable the use of a Windows Job Object for child process management (Windows only)
Diffstat (limited to 'src')
-rw-r--r--src/zenserver-test/hub-tests.cpp137
-rw-r--r--src/zenserver/hub/httphubservice.cpp (renamed from src/zenserver/hub/hubservice.cpp)6
-rw-r--r--src/zenserver/hub/httphubservice.h (renamed from src/zenserver/hub/hubservice.h)0
-rw-r--r--src/zenserver/hub/hub.cpp507
-rw-r--r--src/zenserver/hub/hub.h50
-rw-r--r--src/zenserver/hub/zenhubserver.cpp155
-rw-r--r--src/zenserver/hub/zenhubserver.h16
-rw-r--r--src/zenserver/storage/upstream/zen.cpp2
-rw-r--r--src/zenutil/consul/consul.cpp251
-rw-r--r--src/zenutil/include/zenutil/consul.h62
-rw-r--r--src/zenutil/include/zenutil/zenserverprocess.h25
-rw-r--r--src/zenutil/zenserverprocess.cpp33
12 files changed, 1200 insertions, 44 deletions
diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp
index 11531e30f..9c1669256 100644
--- a/src/zenserver-test/hub-tests.cpp
+++ b/src/zenserver-test/hub-tests.cpp
@@ -19,6 +19,8 @@
# include <zenutil/zenserverprocess.h>
# include <zenhttp/httpclient.h>
# include <zenutil/consul.h>
+# include <zencore/thread.h>
+# include <zencore/timer.h>
namespace zen::tests::hub {
@@ -230,12 +232,33 @@ TEST_CASE("hub.lifecycle.children")
}
}
-TEST_CASE("hub.consul.lifecycle" * doctest::skip())
+static bool
+WaitForConsulService(consul::ConsulClient& Client, std::string_view ServiceId, bool ExpectedState, int TimeoutMs)
{
- zen::consul::ConsulProcess ConsulProc;
+ Stopwatch Timer;
+ uint64_t Index = 0;
+ while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs))
+ {
+ uint64_t RemainingMs = static_cast<uint64_t>(TimeoutMs) - Timer.GetElapsedTimeMs();
+ int WaitSeconds = std::max(1, static_cast<int>(RemainingMs / 1000));
+ if (Client.WatchService(ServiceId, Index, WaitSeconds) == ExpectedState)
+ {
+ return true;
+ }
+ if (Index == 0)
+ {
+ Sleep(100); // error path only: avoid tight loop on persistent connection failure
+ }
+ }
+ return Client.HasService(ServiceId) == ExpectedState;
+}
+
+TEST_CASE("hub.consul.kv")
+{
+ consul::ConsulProcess ConsulProc;
ConsulProc.SpawnConsulAgent();
- zen::consul::ConsulClient Client("http://localhost:8500/");
+ consul::ConsulClient Client("http://localhost:8500/");
Client.SetKeyValue("zen/hub/testkey", "testvalue");
std::string RetrievedValue = Client.GetKeyValue("zen/hub/testkey");
@@ -246,6 +269,114 @@ TEST_CASE("hub.consul.lifecycle" * doctest::skip())
ConsulProc.StopConsulAgent();
}
+TEST_CASE("hub.consul.hub.registration")
+{
+ consul::ConsulProcess ConsulProc;
+ ConsulProc.SpawnConsulAgent();
+
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer);
+ const uint16_t PortNumber =
+ Instance.SpawnServerAndWaitUntilReady("--consul-endpoint=http://localhost:8500/ --instance-id=test-instance");
+ REQUIRE(PortNumber != 0);
+
+ consul::ConsulClient Client("http://localhost:8500/");
+
+ REQUIRE(WaitForConsulService(Client, "zen-hub-test-instance", true, 5000));
+
+ Instance.Shutdown();
+
+ CHECK(!Client.HasService("zen-hub-test-instance"));
+
+ ConsulProc.StopConsulAgent();
+}
+
+TEST_CASE("hub.consul.provision.registration")
+{
+ consul::ConsulProcess ConsulProc;
+ ConsulProc.SpawnConsulAgent();
+
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer);
+ const uint16_t PortNumber =
+ Instance.SpawnServerAndWaitUntilReady("--consul-endpoint=http://localhost:8500/ --instance-id=test-instance");
+ REQUIRE(PortNumber != 0);
+
+ consul::ConsulClient Client("http://localhost:8500/");
+
+ REQUIRE(WaitForConsulService(Client, "zen-hub-test-instance", true, 5000));
+
+ HttpClient HubClient(Instance.GetBaseUri() + "/hub/");
+
+ HttpClient::Response Result = HubClient.Post("modules/testmod/provision");
+ REQUIRE(Result);
+
+ CHECK(Client.HasService("testmod"));
+ {
+ const uint16_t ModulePort = Result.AsObject()["port"].AsUInt16(0);
+ REQUIRE(ModulePort != 0);
+
+ std::string JsonError;
+ CbFieldIterator ServicesRoot = LoadCompactBinaryFromJson(Client.GetAgentServicesJson(), JsonError);
+ REQUIRE(JsonError.empty());
+
+ CbObjectView ServicesMap;
+ for (CbFieldView F : ServicesRoot)
+ {
+ if (F.IsObject())
+ {
+ ServicesMap = F.AsObjectView();
+ }
+ }
+ REQUIRE(ServicesMap);
+
+ // Verify fields registered by OnProvisioned
+ {
+ CbObjectView ModService = ServicesMap["testmod"].AsObjectView();
+ CHECK_EQ(ModService["ID"sv].AsString(), "testmod"sv);
+ CHECK_EQ(ModService["Service"sv].AsString(), "zen-storage"sv);
+ CHECK_EQ(ModService["Port"sv].AsDouble(0), double(ModulePort));
+ bool FoundModuleTag = false;
+ bool FoundHubTag = false;
+ bool FoundVersionTag = false;
+ for (CbFieldView Tag : ModService["Tags"].AsArrayView())
+ {
+ std::string_view TagStr = Tag.AsString();
+ if (TagStr == "module:testmod"sv)
+ {
+ FoundModuleTag = true;
+ }
+ else if (TagStr == "zen-hub:zen-hub-test-instance"sv)
+ {
+ FoundHubTag = true;
+ }
+ else if (TagStr.substr(0, 8) == "version:"sv)
+ {
+ FoundVersionTag = true;
+ }
+ }
+ CHECK(FoundModuleTag);
+ CHECK(FoundHubTag);
+ CHECK(FoundVersionTag);
+ }
+
+ // Verify fields registered by InitializeConsulRegistration
+ {
+ CbObjectView HubService = ServicesMap["zen-hub-test-instance"].AsObjectView();
+ CHECK_EQ(HubService["ID"sv].AsString(), "zen-hub-test-instance"sv);
+ CHECK_EQ(HubService["Service"sv].AsString(), "zen-hub"sv);
+ CHECK_EQ(HubService["Port"sv].AsDouble(0), double(PortNumber));
+ }
+ }
+
+ Result = HubClient.Post("modules/testmod/deprovision");
+ REQUIRE(Result);
+
+ CHECK(!Client.HasService("testmod"));
+
+ Instance.Shutdown();
+
+ ConsulProc.StopConsulAgent();
+}
+
TEST_SUITE_END();
} // namespace zen::tests::hub
diff --git a/src/zenserver/hub/hubservice.cpp b/src/zenserver/hub/httphubservice.cpp
index 6765340dc..67ed0cfd8 100644
--- a/src/zenserver/hub/hubservice.cpp
+++ b/src/zenserver/hub/httphubservice.cpp
@@ -1,6 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include "hubservice.h"
+#include "httphubservice.h"
#include "hub.h"
#include "storageserverinstance.h"
@@ -74,7 +74,7 @@ HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub)
try
{
- Hub::ProvisionedInstanceInfo Info;
+ HubProvisionedInstanceInfo Info;
if (m_Hub.Provision(ModuleId, /* out */ Info, /* out */ FailureReason))
{
CbObjectWriter Obj;
@@ -144,7 +144,7 @@ HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub)
CbObjectWriter Obj;
Obj << "currentInstanceCount" << m_Hub.GetInstanceCount();
Obj << "maxInstanceCount" << m_Hub.GetMaxInstanceCount();
- Obj << "instanceLimit" << m_Hub.GetInstanceLimit();
+ Obj << "instanceLimit" << m_Hub.GetConfig().InstanceLimit;
Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
},
HttpVerb::kGet);
diff --git a/src/zenserver/hub/hubservice.h b/src/zenserver/hub/httphubservice.h
index d08eeea2a..d08eeea2a 100644
--- a/src/zenserver/hub/hubservice.h
+++ b/src/zenserver/hub/httphubservice.h
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index f1cc86de6..c9720b32d 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -15,6 +15,13 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#if ZEN_WITH_TESTS
+# include <zencore/filesystem.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+#endif
+
namespace zen {
///////////////////////////////////////////////////////////////////////////
@@ -111,14 +118,19 @@ private:
//////////////////////////////////////////////////////////////////////////
-Hub::Hub(const Configuration& Config) : m_UseJobObject(Config.UseJobObject)
+Hub::Hub(const Configuration& Config,
+ ZenServerEnvironment&& RunEnvironment,
+ ProvisionModuleCallbackFunc&& ProvisionedModuleCallback,
+ ProvisionModuleCallbackFunc&& DeprovisionedModuleCallback)
+: m_Config(Config)
+, m_RunEnvironment(std::move(RunEnvironment))
+, m_ProvisionedModuleCallback(std::move(ProvisionedModuleCallback))
+, m_DeprovisionedModuleCallback(std::move(DeprovisionedModuleCallback))
{
m_HostMetrics = GetSystemMetrics();
m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024;
m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024;
- m_RunEnvironment.InitializeForHub(Config.HubBaseDir, Config.ChildBaseDir);
-
m_FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage");
ZEN_INFO("using file hydration path: '{}'", m_FileHydrationPath);
@@ -128,13 +140,10 @@ Hub::Hub(const Configuration& Config) : m_UseJobObject(Config.UseJobObject)
// This is necessary to ensure the hub assigns a distinct port range.
// We need to do this primarily because otherwise automated tests will
// fail as the test runner will create processes in the default range.
- // We should probably make this configurable or dynamic for maximum
- // flexibility, and to allow running multiple hubs on the same host if
- // necessary.
- m_RunEnvironment.SetNextPortNumber(21000);
+ m_RunEnvironment.SetNextPortNumber(m_Config.BasePortNumber);
#if ZEN_PLATFORM_WINDOWS
- if (m_UseJobObject)
+ if (m_Config.UseJobObject)
{
m_JobObject.Initialize();
if (m_JobObject.IsValid())
@@ -158,6 +167,20 @@ Hub::~Hub()
m_Lock.WithExclusiveLock([this] {
for (auto& [ModuleId, Instance] : m_Instances)
{
+ uint16_t BasePort = Instance->GetBasePort();
+ std::string BaseUri; // TODO?
+
+ if (m_DeprovisionedModuleCallback)
+ {
+ try
+ {
+ m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort});
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what());
+ }
+ }
Instance->Deprovision();
}
m_Instances.clear();
@@ -170,7 +193,7 @@ Hub::~Hub()
}
bool
-Hub::Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason)
+Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason)
{
StorageServerInstance* Instance = nullptr;
bool IsNewInstance = false;
@@ -241,9 +264,20 @@ Hub::Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std:
}
OutInfo.Port = Instance->GetBasePort();
-
// TODO: base URI? Would need to know what host name / IP to use
+ if (m_ProvisionedModuleCallback)
+ {
+ try
+ {
+ m_ProvisionedModuleCallback(ModuleId, OutInfo);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Provision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what());
+ }
+ }
+
return true;
}
@@ -279,6 +313,21 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
}
}
+ uint16_t BasePort = Instance->GetBasePort();
+ std::string BaseUri; // TODO?
+
+ if (m_DeprovisionedModuleCallback)
+ {
+ try
+ {
+ m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort});
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what());
+ }
+ }
+
// The module is deprovisioned outside the lock to avoid blocking other operations.
//
// To ensure that no new provisioning can occur while we're deprovisioning,
@@ -362,9 +411,9 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
return false;
}
- if (gsl::narrow_cast<int>(m_Instances.size()) >= m_InstanceLimit)
+ if (gsl::narrow_cast<int>(m_Instances.size()) >= m_Config.InstanceLimit)
{
- OutReason = fmt::format("instance limit exceeded ({})", m_InstanceLimit);
+ OutReason = fmt::format("instance limit exceeded ({})", m_Config.InstanceLimit);
return false;
}
@@ -374,6 +423,438 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
return true;
}
-///////////////////////////////////////////////////////////////////////////
+#if ZEN_WITH_TESTS
+
+TEST_SUITE_BEGIN("server.hub");
+
+namespace hub_testutils {
+
+ ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir)
+ {
+ return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir);
+ }
+
+ std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir,
+ Hub::Configuration Config = {},
+ Hub::ProvisionModuleCallbackFunc ProvisionCallback = {},
+ Hub::ProvisionModuleCallbackFunc DeprovisionCallback = {})
+ {
+ return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(ProvisionCallback), std::move(DeprovisionCallback));
+ }
+
+} // namespace hub_testutils
+
+TEST_CASE("hub.provision_basic")
+{
+ ScopedTemporaryDirectory TempDir;
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
+
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+ CHECK_FALSE(HubInstance->Find("module_a"));
+
+ HubProvisionedInstanceInfo Info;
+ std::string Reason;
+ const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason);
+ REQUIRE_MESSAGE(ProvisionResult, Reason);
+ CHECK_NE(Info.Port, 0);
+ CHECK_EQ(HubInstance->GetInstanceCount(), 1);
+ CHECK(HubInstance->Find("module_a"));
+
+ const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason);
+ CHECK(DeprovisionResult);
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+ CHECK_FALSE(HubInstance->Find("module_a"));
+}
+
+TEST_CASE("hub.provision_callbacks")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ struct CallbackRecord
+ {
+ std::string ModuleId;
+ uint16_t Port;
+ };
+ RwLock CallbackMutex;
+ std::vector<CallbackRecord> ProvisionRecords;
+ std::vector<CallbackRecord> DeprovisionRecords;
+
+ auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) {
+ CallbackMutex.WithExclusiveLock([&]() { ProvisionRecords.push_back({std::string(ModuleId), Info.Port}); });
+ };
+ auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) {
+ CallbackMutex.WithExclusiveLock([&]() { DeprovisionRecords.push_back({std::string(ModuleId), Info.Port}); });
+ };
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(ProvisionCb), std::move(DeprovisionCb));
+
+ HubProvisionedInstanceInfo Info;
+ std::string Reason;
+
+ const bool ProvisionResult = HubInstance->Provision("cb_module", Info, Reason);
+ REQUIRE_MESSAGE(ProvisionResult, Reason);
+
+ {
+ RwLock::SharedLockScope _(CallbackMutex);
+ REQUIRE_EQ(ProvisionRecords.size(), 1u);
+ CHECK_EQ(ProvisionRecords[0].ModuleId, "cb_module");
+ CHECK_EQ(ProvisionRecords[0].Port, Info.Port);
+ CHECK_NE(ProvisionRecords[0].Port, 0);
+ }
+
+ const bool DeprovisionResult = HubInstance->Deprovision("cb_module", Reason);
+ CHECK(DeprovisionResult);
+
+ {
+ RwLock::SharedLockScope _(CallbackMutex);
+ REQUIRE_EQ(DeprovisionRecords.size(), 1u);
+ CHECK_EQ(DeprovisionRecords[0].ModuleId, "cb_module");
+ CHECK_NE(DeprovisionRecords[0].Port, 0);
+ CHECK_EQ(ProvisionRecords.size(), 1u);
+ }
+}
+
+TEST_CASE("hub.instance_limit")
+{
+ ScopedTemporaryDirectory TempDir;
+ Hub::Configuration Config;
+ Config.InstanceLimit = 2;
+ Config.BasePortNumber = 21500;
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
+
+ HubProvisionedInstanceInfo Info;
+ std::string Reason;
+
+ const bool FirstResult = HubInstance->Provision("limit_a", Info, Reason);
+ REQUIRE_MESSAGE(FirstResult, Reason);
+
+ const bool SecondResult = HubInstance->Provision("limit_b", Info, Reason);
+ REQUIRE_MESSAGE(SecondResult, Reason);
+ CHECK_EQ(HubInstance->GetInstanceCount(), 2);
+
+ Reason.clear();
+ const bool ThirdResult = HubInstance->Provision("limit_c", Info, Reason);
+ CHECK_FALSE(ThirdResult);
+ CHECK_EQ(HubInstance->GetInstanceCount(), 2);
+ CHECK_NE(Reason.find("instance limit"), std::string::npos);
+
+ HubInstance->Deprovision("limit_a", Reason);
+ CHECK_EQ(HubInstance->GetInstanceCount(), 1);
+
+ Reason.clear();
+ const bool FourthResult = HubInstance->Provision("limit_d", Info, Reason);
+ CHECK_MESSAGE(FourthResult, Reason);
+ CHECK_EQ(HubInstance->GetInstanceCount(), 2);
+}
+
+TEST_CASE("hub.deprovision_nonexistent")
+{
+ ScopedTemporaryDirectory TempDir;
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
+
+ std::string Reason;
+ const bool Result = HubInstance->Deprovision("never_provisioned", Reason);
+ CHECK_FALSE(Result);
+ CHECK(Reason.empty());
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+}
+
+TEST_CASE("hub.enumerate_modules")
+{
+ ScopedTemporaryDirectory TempDir;
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
+
+ HubProvisionedInstanceInfo Info;
+ std::string Reason;
+
+ REQUIRE_MESSAGE(HubInstance->Provision("enum_a", Info, Reason), Reason);
+ REQUIRE_MESSAGE(HubInstance->Provision("enum_b", Info, Reason), Reason);
+
+ std::vector<std::string> Ids;
+ HubInstance->EnumerateModules([&](StorageServerInstance& Instance) { Ids.push_back(std::string(Instance.GetModuleId())); });
+ CHECK_EQ(Ids.size(), 2u);
+ const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end();
+ const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end();
+ CHECK(FoundA);
+ CHECK(FoundB);
+
+ HubInstance->Deprovision("enum_a", Reason);
+ Ids.clear();
+ HubInstance->EnumerateModules([&](StorageServerInstance& Instance) { Ids.push_back(std::string(Instance.GetModuleId())); });
+ REQUIRE_EQ(Ids.size(), 1u);
+ CHECK_EQ(Ids[0], "enum_b");
+}
+
+TEST_CASE("hub.max_instance_count")
+{
+ ScopedTemporaryDirectory TempDir;
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
+
+ CHECK_EQ(HubInstance->GetMaxInstanceCount(), 0);
+
+ HubProvisionedInstanceInfo Info;
+ std::string Reason;
+
+ REQUIRE_MESSAGE(HubInstance->Provision("max_a", Info, Reason), Reason);
+ CHECK_GE(HubInstance->GetMaxInstanceCount(), 1);
+
+ REQUIRE_MESSAGE(HubInstance->Provision("max_b", Info, Reason), Reason);
+ CHECK_GE(HubInstance->GetMaxInstanceCount(), 2);
+
+ const int MaxAfterTwo = HubInstance->GetMaxInstanceCount();
+
+ HubInstance->Deprovision("max_a", Reason);
+ CHECK_EQ(HubInstance->GetInstanceCount(), 1);
+ CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo);
+}
+
+TEST_CASE("hub.concurrent")
+{
+ ScopedTemporaryDirectory TempDir;
+ Hub::Configuration Config;
+ Config.BasePortNumber = 22000;
+ Config.InstanceLimit = 10;
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
+
+ constexpr int kHalf = 3;
+
+ // Serially pre-provision kHalf modules
+ for (int I = 0; I < kHalf; ++I)
+ {
+ HubProvisionedInstanceInfo Info;
+ std::string Reason;
+ REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason);
+ }
+ CHECK_EQ(HubInstance->GetInstanceCount(), kHalf);
+
+ // Simultaneously:
+ // Provisioner pool → provisions kHalf new modules ("new_0" .. "new_N")
+ // Deprovisioner pool → deprovisions the kHalf pre-provisioned modules ("pre_0" .. "pre_N")
+ // The two pools use distinct OS threads, so provisions and deprovisions are interleaved.
+
+ // Use int rather than bool to avoid std::vector<bool> bitfield packing,
+ // which would cause data races on concurrent per-index writes.
+ std::vector<int> ProvisionResults(kHalf, 0);
+ std::vector<std::string> ProvisionReasons(kHalf);
+ std::vector<int> DeprovisionResults(kHalf, 0);
+
+ {
+ WorkerThreadPool Provisioners(kHalf, "hub_test_provisioners");
+ WorkerThreadPool Deprovisioneers(kHalf, "hub_test_deprovisioneers");
+
+ std::vector<std::future<void>> ProvisionFutures(kHalf);
+ std::vector<std::future<void>> DeprovisionFutures(kHalf);
+
+ for (int I = 0; I < kHalf; ++I)
+ {
+ ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] {
+ HubProvisionedInstanceInfo Info;
+ std::string Reason;
+ const bool Result =
+ HubInstance->Provision(fmt::format("new_{}", I), Info, Reason);
+ ProvisionResults[I] = Result ? 1 : 0;
+ ProvisionReasons[I] = Reason;
+ }),
+ WorkerThreadPool::EMode::EnableBacklog);
+
+ DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] {
+ std::string Reason;
+ const bool Result =
+ HubInstance->Deprovision(fmt::format("pre_{}", I), Reason);
+ DeprovisionResults[I] = Result ? 1 : 0;
+ }),
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+
+ for (std::future<void>& F : ProvisionFutures)
+ {
+ F.get();
+ }
+ for (std::future<void>& F : DeprovisionFutures)
+ {
+ F.get();
+ }
+ }
+
+ for (int I = 0; I < kHalf; ++I)
+ {
+ CHECK_MESSAGE(ProvisionResults[I] != 0, ProvisionReasons[I]);
+ CHECK(DeprovisionResults[I] != 0);
+ }
+ // Only the newly provisioned modules should remain
+ CHECK_EQ(HubInstance->GetInstanceCount(), kHalf);
+}
+
+TEST_CASE("hub.concurrent_callbacks")
+{
+ ScopedTemporaryDirectory TempDir;
+ Hub::Configuration Config;
+ Config.BasePortNumber = 22300;
+ Config.InstanceLimit = 10;
+
+ struct CallbackRecord
+ {
+ std::string ModuleId;
+ uint16_t Port;
+ };
+ RwLock CallbackMutex;
+ std::vector<CallbackRecord> ProvisionCallbacks;
+ std::vector<CallbackRecord> DeprovisionCallbacks;
+
+ auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) {
+ CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); });
+ };
+ auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) {
+ CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); });
+ };
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, std::move(ProvisionCb), std::move(DeprovisionCb));
+
+ constexpr int kHalf = 3;
+
+ // Serially pre-provision kHalf modules and drain the resulting callbacks before the
+ // concurrent phase so we start with a clean slate.
+ for (int I = 0; I < kHalf; ++I)
+ {
+ HubProvisionedInstanceInfo Info;
+ std::string Reason;
+ REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason);
+ }
+ CHECK_EQ(HubInstance->GetInstanceCount(), kHalf);
+
+ {
+ RwLock::ExclusiveLockScope _(CallbackMutex);
+ REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf));
+ ProvisionCallbacks.clear();
+ }
+
+ // Concurrently provision kHalf new modules while deprovisioning the pre-provisioned ones.
+ std::vector<int> ProvisionResults(kHalf, 0);
+ std::vector<std::string> ProvisionReasons(kHalf);
+ std::vector<int> DeprovisionResults(kHalf, 0);
+
+ {
+ WorkerThreadPool Provisioners(kHalf, "hub_cbtest_provisioners");
+ WorkerThreadPool Deprovisioneers(kHalf, "hub_cbtest_deprovisioneers");
+
+ std::vector<std::future<void>> ProvisionFutures(kHalf);
+ std::vector<std::future<void>> DeprovisionFutures(kHalf);
+
+ for (int I = 0; I < kHalf; ++I)
+ {
+ ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] {
+ HubProvisionedInstanceInfo Info;
+ std::string Reason;
+ const bool Result =
+ HubInstance->Provision(fmt::format("new_{}", I), Info, Reason);
+ ProvisionResults[I] = Result ? 1 : 0;
+ ProvisionReasons[I] = Reason;
+ }),
+ WorkerThreadPool::EMode::EnableBacklog);
+
+ DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] {
+ std::string Reason;
+ const bool Result =
+ HubInstance->Deprovision(fmt::format("pre_{}", I), Reason);
+ DeprovisionResults[I] = Result ? 1 : 0;
+ }),
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+
+ for (std::future<void>& F : ProvisionFutures)
+ {
+ F.get();
+ }
+ for (std::future<void>& F : DeprovisionFutures)
+ {
+ F.get();
+ }
+ }
+
+ // All operations must have succeeded
+ for (int I = 0; I < kHalf; ++I)
+ {
+ CHECK_MESSAGE(ProvisionResults[I] != 0, ProvisionReasons[I]);
+ CHECK(DeprovisionResults[I] != 0);
+ }
+ CHECK_EQ(HubInstance->GetInstanceCount(), kHalf);
+
+ // Each new_* module must have triggered exactly one provision callback with a non-zero port.
+ // Each pre_* module must have triggered exactly one deprovision callback with a non-zero port.
+ {
+ RwLock::SharedLockScope _(CallbackMutex);
+ REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf));
+ REQUIRE_EQ(DeprovisionCallbacks.size(), static_cast<size_t>(kHalf));
+
+ for (const CallbackRecord& Record : ProvisionCallbacks)
+ {
+ CHECK_NE(Record.Port, 0);
+ const bool IsNewModule = Record.ModuleId.rfind("new_", 0) == 0;
+ CHECK_MESSAGE(IsNewModule, Record.ModuleId);
+ }
+ for (const CallbackRecord& Record : DeprovisionCallbacks)
+ {
+ CHECK_NE(Record.Port, 0);
+ const bool IsPreModule = Record.ModuleId.rfind("pre_", 0) == 0;
+ CHECK_MESSAGE(IsPreModule, Record.ModuleId);
+ }
+ }
+}
+
+# if ZEN_PLATFORM_WINDOWS
+TEST_CASE("hub.job_object")
+{
+ SUBCASE("UseJobObject=true")
+ {
+ ScopedTemporaryDirectory TempDir;
+ Hub::Configuration Config;
+ Config.UseJobObject = true;
+ Config.BasePortNumber = 22100;
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
+ HubProvisionedInstanceInfo Info;
+ std::string Reason;
+
+ const bool ProvisionResult = HubInstance->Provision("jobobj_a", Info, Reason);
+ REQUIRE_MESSAGE(ProvisionResult, Reason);
+ CHECK_NE(Info.Port, 0);
+
+ const bool DeprovisionResult = HubInstance->Deprovision("jobobj_a", Reason);
+ CHECK(DeprovisionResult);
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+ }
+
+ SUBCASE("UseJobObject=false")
+ {
+ ScopedTemporaryDirectory TempDir;
+ Hub::Configuration Config;
+ Config.UseJobObject = false;
+ Config.BasePortNumber = 22200;
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
+ HubProvisionedInstanceInfo Info;
+ std::string Reason;
+
+ const bool ProvisionResult = HubInstance->Provision("nojobobj_a", Info, Reason);
+ REQUIRE_MESSAGE(ProvisionResult, Reason);
+ CHECK_NE(Info.Port, 0);
+
+ const bool DeprovisionResult = HubInstance->Deprovision("nojobobj_a", Reason);
+ CHECK(DeprovisionResult);
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+ }
+}
+# endif // ZEN_PLATFORM_WINDOWS
+
+TEST_SUITE_END();
+
+void
+hub_forcelink()
+{
+}
+
+#endif // ZEN_WITH_TESTS
} // namespace zen
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index caf8890ff..8b61f988d 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -22,33 +22,40 @@ class StorageServerInstance;
*
* Core logic for managing storage server instances on behalf of external clients.
*/
+
+struct HubProvisionedInstanceInfo
+{
+ std::string BaseUri;
+ uint16_t Port;
+};
+
class Hub
{
public:
struct Configuration
{
- std::filesystem::path HubBaseDir;
- std::filesystem::path ChildBaseDir;
-
/** Enable or disable the use of a Windows Job Object for child process management.
* When enabled, all spawned child processes are assigned to a job object with
* JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, ensuring children are terminated if the hub
* crashes or is force-killed. Must be called before Initialize(). No-op on non-Windows.
*/
- bool UseJobObject = true;
+ bool UseJobObject = true;
+ uint16_t BasePortNumber = 21000;
+
+ int InstanceLimit = 1000;
};
- Hub(const Configuration& Config);
+
+ typedef std::function<void(std::string_view ModuleId, const HubProvisionedInstanceInfo& Info)> ProvisionModuleCallbackFunc;
+
+ Hub(const Configuration& Config,
+ ZenServerEnvironment&& RunEnvironment,
+ ProvisionModuleCallbackFunc&& ProvisionedModuleCallback = {},
+ ProvisionModuleCallbackFunc&& DeprovisionedModuleCallback = {});
~Hub();
Hub(const Hub&) = delete;
Hub& operator=(const Hub&) = delete;
- struct ProvisionedInstanceInfo
- {
- std::string BaseUri;
- uint16_t Port;
- };
-
/**
* Provision a storage server instance for the given module ID.
*
@@ -56,7 +63,7 @@ public:
* @param OutInfo If successful, information about the provisioned instance will be returned here.
* @param OutReason If unsuccessful, the reason will be returned here.
*/
- bool Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason);
+ bool Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason);
/**
* Deprovision a storage server instance for the given module ID.
@@ -90,13 +97,20 @@ public:
int GetInstanceCount();
- int GetInstanceLimit() const { return m_InstanceLimit; }
int GetMaxInstanceCount() const { return m_MaxInstanceCount; }
+ const Configuration& GetConfig() const { return m_Config; }
+
private:
- ZenServerEnvironment m_RunEnvironment;
+ const Configuration m_Config;
+ ZenServerEnvironment m_RunEnvironment;
+
+ ProvisionModuleCallbackFunc m_ProvisionedModuleCallback;
+ ProvisionModuleCallbackFunc m_DeprovisionedModuleCallback;
+
std::filesystem::path m_FileHydrationPath;
std::filesystem::path m_HydrationTempPath;
+
#if ZEN_PLATFORM_WINDOWS
JobObject m_JobObject;
#endif
@@ -104,15 +118,17 @@ private:
std::unordered_map<std::string, std::unique_ptr<StorageServerInstance>> m_Instances;
std::unordered_set<std::string> m_DeprovisioningModules;
std::unordered_set<std::string> m_ProvisioningModules;
- int m_MaxInstanceCount = 0;
- int m_InstanceLimit = 1000;
ResourceMetrics m_ResourceLimits;
SystemMetrics m_HostMetrics;
- bool m_UseJobObject = true;
+ int m_MaxInstanceCount = 0;
void UpdateStats();
void UpdateCapacityMetrics();
bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason);
};
+#if ZEN_WITH_TESTS
+void hub_forcelink();
+#endif // ZEN_WITH_TESTS
+
} // namespace zen
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index 64ab0bd9e..a019ff295 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -3,9 +3,10 @@
#include "zenhubserver.h"
#include "frontend/frontend.h"
+#include "httphubservice.h"
#include "hub.h"
-#include "hubservice.h"
+#include <zencore/config.h>
#include <zencore/fmtutils.h>
#include <zencore/memory/llm.h>
#include <zencore/memory/memorytrace.h>
@@ -38,6 +39,35 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
"Instance ID for use in notifications",
cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""),
"");
+
+ Options.add_option("hub",
+ "",
+ "consul-endpoint",
+ "Consul endpoint URL for service registration (empty = disabled)",
+ cxxopts::value<std::string>(m_ServerOptions.ConsulEndpoint)->default_value(""),
+ "");
+
+ Options.add_option("hub",
+ "",
+ "hub-base-port-number",
+ "Base port number for provisioned instances",
+ cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber)->default_value("21000"),
+ "");
+
+ Options.add_option("hub",
+ "",
+ "hub-instance-limit",
+ "Maximum number of provisioned instances for this hub",
+ cxxopts::value<int>(m_ServerOptions.HubInstanceLimit)->default_value("1000"),
+ "");
+#if ZEN_PLATFORM_WINDOWS
+ Options.add_option("hub",
+ "",
+ "hub-use-job-object",
+ "Enable the use of a Windows Job Object for child process management",
+ cxxopts::value<bool>(m_ServerOptions.HubUseJobObject)->default_value("true"),
+ "");
+#endif // ZEN_PLATFORM_WINDOWS
}
void
@@ -74,6 +104,56 @@ ZenHubServer::~ZenHubServer()
Cleanup();
}
+void
+ZenHubServer::OnProvisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info)
+{
+ if (m_ConsulClient)
+ {
+ consul::ServiceRegistrationInfo ServiceInfo{
+ .ServiceId = std::string(ModuleId),
+ .ServiceName = "zen-storage",
+ // .Address = "localhost", // Let the consul agent figure out out external address // TODO: Info.BaseUri?
+ .Port = Info.Port,
+ .HealthEndpoint = "health",
+ .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)),
+ std::make_pair("zen-hub", std::string(HubInstanceId)),
+ std::make_pair("version", std::string(ZEN_CFG_VERSION))},
+ .HealthIntervalSeconds = 10,
+ .DeregisterAfterSeconds = 30};
+
+ if (!m_ConsulClient->RegisterService(ServiceInfo))
+ {
+ ZEN_WARN("Failed to register storage server instance for module '{}' with Consul, continuing anyway", ModuleId);
+ }
+ else
+ {
+ ZEN_INFO("Registered storage server instance for module '{}' at port {} with Consul as '{}'",
+ ModuleId,
+ Info.Port,
+ ServiceInfo.ServiceName);
+ }
+ }
+}
+
+void
+ZenHubServer::OnDeprovisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info)
+{
+ ZEN_UNUSED(HubInstanceId);
+ if (m_ConsulClient)
+ {
+ if (!m_ConsulClient->DeregisterService(ModuleId))
+ {
+ ZEN_WARN("Failed to deregister storage server instance for module '{}' at port {} from Consul, continuing anyway",
+ ModuleId,
+ Info.Port);
+ }
+ else
+ {
+ ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port);
+ }
+ }
+}
+
int
ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry)
{
@@ -96,6 +176,7 @@ ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState:
m_DebugOptionForcedCrash = ServerConfig.ShouldCrash;
InitializeState(ServerConfig);
+ InitializeConsulRegistration(ServerConfig, EffectiveBasePort);
InitializeServices(ServerConfig);
RegisterServices(ServerConfig);
@@ -127,6 +208,9 @@ ZenHubServer::Cleanup()
m_HubService.reset();
m_ApiService.reset();
m_Hub.reset();
+
+ m_ConsulRegistration.reset();
+ m_ConsulClient.reset();
}
catch (const std::exception& Ex)
{
@@ -147,7 +231,18 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
ZEN_INFO("instantiating Hub");
m_Hub = std::make_unique<Hub>(
- Hub::Configuration{.HubBaseDir = ServerConfig.DataDir / "hub", .ChildBaseDir = ServerConfig.DataDir / "servers"});
+ Hub::Configuration{.UseJobObject = ServerConfig.HubUseJobObject,
+ .BasePortNumber = ServerConfig.HubBasePortNumber,
+ .InstanceLimit = ServerConfig.HubInstanceLimit},
+ ZenServerEnvironment(ZenServerEnvironment::Hub, ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers"),
+ m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](
+ std::string_view ModuleId,
+ const HubProvisionedInstanceInfo& Info) { OnProvisioned(HubInstanceId, ModuleId, Info); }
+ : Hub::ProvisionModuleCallbackFunc{},
+ m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](
+ std::string_view ModuleId,
+ const HubProvisionedInstanceInfo& Info) { OnDeprovisioned(HubInstanceId, ModuleId, Info); }
+ : Hub::ProvisionModuleCallbackFunc{});
ZEN_INFO("instantiating API service");
m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http);
@@ -181,6 +276,47 @@ ZenHubServer::RegisterServices(const ZenHubServerConfig& ServerConfig)
}
void
+ZenHubServer::InitializeConsulRegistration(const ZenHubServerConfig& ServerConfig, int EffectivePort)
+{
+ if (ServerConfig.ConsulEndpoint.empty())
+ {
+ ZEN_INFO("Consul registration disabled (no endpoint configured)");
+ return;
+ }
+
+ ZEN_INFO("Initializing Consul registration with endpoint: {}", ServerConfig.ConsulEndpoint);
+
+ try
+ {
+ m_ConsulClient = std::make_unique<consul::ConsulClient>(ServerConfig.ConsulEndpoint);
+
+ consul::ServiceRegistrationInfo Info;
+ Info.ServiceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId);
+ Info.ServiceName = "zen-hub";
+ // Info.Address = "localhost"; // Let the consul agent figure out out external address // TODO: Info.BaseUri?
+ Info.Port = static_cast<uint16_t>(EffectivePort);
+ Info.HealthEndpoint = "hub/health";
+ Info.Tags = std::vector<std::pair<std::string, std::string>>{
+ std::make_pair("zen-hub", Info.ServiceId),
+ std::make_pair("version", std::string(ZEN_CFG_VERSION)),
+ std::make_pair("base-port-number", fmt::format("{}", ServerConfig.HubBasePortNumber)),
+ std::make_pair("instance-limit", fmt::format("{}", ServerConfig.HubInstanceLimit)),
+ std::make_pair("use-job-object", fmt::format("{}", ServerConfig.HubUseJobObject))};
+
+ m_ConsulRegistration = std::make_unique<consul::ServiceRegistration>(m_ConsulClient.get(), Info);
+
+ ZEN_INFO("Consul service registration initiated for service ID: {}", Info.ServiceId);
+ }
+ catch (const std::exception& Ex)
+ {
+ // REQ-F-12: Hub should start successfully even if Consul registration fails
+ ZEN_WARN("Failed to initialize Consul registration (hub will continue without it): {}", Ex.what());
+ m_ConsulRegistration.reset();
+ m_ConsulClient.reset();
+ }
+}
+
+void
ZenHubServer::Run()
{
if (m_ProcessMonitor.IsActive())
@@ -228,6 +364,21 @@ ZenHubServer::Run()
const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode;
+ if (m_ConsulRegistration)
+ {
+ if (!m_ConsulRegistration->IsRegistered())
+ {
+ ZEN_INFO("Waiting for consul integration to register...");
+ m_ConsulRegistration->WaitForReadyEvent(2000);
+ }
+ if (!m_ConsulRegistration->IsRegistered())
+ {
+ m_ConsulClient.reset();
+ m_ConsulRegistration.reset();
+ ZEN_WARN("Consul registration failed, running without consul integration");
+ }
+ }
+
SetNewState(kRunning);
OnReady();
diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h
index a3eeb2ead..f6a3eb1bc 100644
--- a/src/zenserver/hub/zenhubserver.h
+++ b/src/zenserver/hub/zenhubserver.h
@@ -4,6 +4,8 @@
#include "zenserver.h"
+#include <zenutil/consul.h>
+
namespace cxxopts {
class Options;
}
@@ -20,10 +22,15 @@ class HttpHubService;
struct ZenHubServerConfig : public ZenServerConfig
{
std::string UpstreamNotificationEndpoint;
- std::string InstanceId; // For use in notifications
+ std::string InstanceId; // For use in notifications
+ std::string ConsulEndpoint; // If set, enables Consul service registration
+ uint16_t HubBasePortNumber = 21000;
+ int HubInstanceLimit = 1000;
+ bool HubUseJobObject = true;
};
class Hub;
+struct HubProvisionedInstanceInfo;
struct ZenHubServerConfigurator : public ZenServerConfiguratorBase
{
@@ -78,6 +85,9 @@ public:
void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; }
private:
+ void OnProvisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info);
+ void OnDeprovisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info);
+
bool m_IsDedicatedMode = false;
bool m_TestMode = false;
std::filesystem::path m_DataRoot;
@@ -90,9 +100,13 @@ private:
std::unique_ptr<HttpApiService> m_ApiService;
std::unique_ptr<HttpFrontendService> m_FrontendService;
+ std::unique_ptr<consul::ConsulClient> m_ConsulClient;
+ std::unique_ptr<consul::ServiceRegistration> m_ConsulRegistration;
+
void InitializeState(const ZenHubServerConfig& ServerConfig);
void InitializeServices(const ZenHubServerConfig& ServerConfig);
void RegisterServices(const ZenHubServerConfig& ServerConfig);
+ void InitializeConsulRegistration(const ZenHubServerConfig& ServerConfig, int EffectivePort);
};
} // namespace zen
diff --git a/src/zenserver/storage/upstream/zen.cpp b/src/zenserver/storage/upstream/zen.cpp
index 423c9039c..a3807675c 100644
--- a/src/zenserver/storage/upstream/zen.cpp
+++ b/src/zenserver/storage/upstream/zen.cpp
@@ -63,7 +63,7 @@ ZenStructuredCacheSession::CheckHealth()
return {.Bytes = Response.DownloadedBytes,
.ElapsedSeconds = Response.ElapsedSeconds,
- .Success = Response.StatusCode == HttpResponseCode::OK};
+ .Success = Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::NoContent};
}
ZenCacheResult
diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp
index 6ddebf97a..272096ebb 100644
--- a/src/zenutil/consul/consul.cpp
+++ b/src/zenutil/consul/consul.cpp
@@ -2,11 +2,14 @@
#include <zenutil/consul.h>
+#include <zencore/compactbinarybuilder.h>
#include <zencore/except_fmt.h>
+#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/process.h>
#include <zencore/string.h>
+#include <zencore/thread.h>
#include <zencore/timer.h>
#include <fmt/format.h>
@@ -30,7 +33,8 @@ struct ConsulProcess::Impl
CreateProcOptions Options;
Options.Flags |= CreateProcOptions::Flag_Windows_NewProcessGroup;
- CreateProcResult Result = CreateProc("consul" ZEN_EXE_SUFFIX_LITERAL, "consul" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options);
+ const std::filesystem::path ConsulExe = GetRunningExecutablePath().parent_path() / ("consul" ZEN_EXE_SUFFIX_LITERAL);
+ CreateProcResult Result = CreateProc(ConsulExe, "consul" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options);
if (Result)
{
@@ -107,9 +111,9 @@ ConsulClient::~ConsulClient()
void
ConsulClient::SetKeyValue(std::string_view Key, std::string_view Value)
{
- IoBuffer ValueBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Value));
- HttpClient::Response Result =
- m_HttpClient.Put(fmt::format("v1/kv/{}", Key), ValueBuffer, {{"Content-Type", "text/plain"}, {"Accept", "application/json"}});
+ IoBuffer ValueBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Value));
+ ValueBuffer.SetContentType(HttpContentType::kText);
+ HttpClient::Response Result = m_HttpClient.Put(fmt::format("v1/kv/{}", Key), ValueBuffer, HttpClient::Accept(HttpContentType::kJSON));
if (!Result)
{
throw runtime_error("ConsulClient::SetKeyValue() failed to set key '{}' ({})", Key, Result.ErrorMessage(""));
@@ -137,4 +141,243 @@ ConsulClient::DeleteKey(std::string_view Key)
}
}
+bool
+ConsulClient::RegisterService(const ServiceRegistrationInfo& Info)
+{
+ using namespace std::literals;
+
+ CbObjectWriter Writer;
+ {
+ Writer.AddString("ID"sv, Info.ServiceId);
+ Writer.AddString("Name"sv, Info.ServiceName);
+ if (!Info.Address.empty())
+ {
+ Writer.AddString("Address"sv, Info.Address);
+ }
+ Writer.AddInteger("Port"sv, Info.Port);
+ if (!Info.Tags.empty())
+ {
+ Writer.BeginArray("Tags"sv);
+ for (const std::pair<std::string, std::string>& Tag : Info.Tags)
+ {
+ Writer.AddString(fmt::format("{}:{}", Tag.first, Tag.second));
+ }
+ Writer.EndArray(); // Tags
+ }
+ Writer.BeginObject("Check"sv);
+ {
+ Writer.AddString("HTTP"sv, fmt::format("http://{}:{}/{}", Info.Address, Info.Port, Info.HealthEndpoint));
+ Writer.AddString("Interval"sv, fmt::format("{}s", Info.HealthIntervalSeconds));
+ Writer.AddString("DeregisterCriticalServiceAfter"sv, fmt::format("{}s", Info.DeregisterAfterSeconds));
+ }
+ Writer.EndObject(); // Check
+ }
+
+ ExtendableStringBuilder<512> SB;
+ CompactBinaryToJson(Writer.Save(), SB);
+
+ IoBuffer PayloadBuffer(IoBuffer::Wrap, SB.Data(), SB.Size());
+ PayloadBuffer.SetContentType(HttpContentType::kJSON);
+ HttpClient::Response Result = m_HttpClient.Put("v1/agent/service/register", PayloadBuffer, HttpClient::Accept(HttpContentType::kJSON));
+
+ if (!Result)
+ {
+ ZEN_WARN("ConsulClient::RegisterService() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage(""));
+ return false;
+ }
+
+ return true;
+}
+
+bool
+ConsulClient::DeregisterService(std::string_view ServiceId)
+{
+ HttpClient::Response Result =
+ m_HttpClient.Put(fmt::format("v1/agent/service/deregister/{}", ServiceId), HttpClient::Accept(HttpContentType::kJSON));
+
+ if (!Result)
+ {
+ ZEN_WARN("ConsulClient::DeregisterService() failed to deregister service '{}' ({})", ServiceId, Result.ErrorMessage(""));
+ return false;
+ }
+
+ return true;
+}
+
+bool
+ConsulClient::FindServiceInJson(std::string_view Json, std::string_view ServiceId)
+{
+ using namespace std::literals;
+
+ std::string JsonError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(Json, JsonError);
+ if (Root && JsonError.empty())
+ {
+ for (CbFieldView RootIterator : Root)
+ {
+ CbObjectView ServiceObject = RootIterator.AsObjectView();
+ if (ServiceObject)
+ {
+ for (CbFieldView ServiceIterator : ServiceObject)
+ {
+ CbObjectView ObjectIterator = ServiceIterator.AsObjectView();
+ if (ObjectIterator["ID"sv].AsString() == ServiceId)
+ {
+ return true;
+ }
+ }
+ }
+ }
+ }
+ return false;
+}
+
+bool
+ConsulClient::HasService(std::string_view ServiceId)
+{
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/services");
+ if (!Result)
+ {
+ return false;
+ }
+ return FindServiceInJson(Result.AsText(), ServiceId);
+}
+
+bool
+ConsulClient::WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int WaitSeconds)
+{
+ // Note: m_HttpClient uses unlimited HTTP timeout (Timeout{0}); the WaitSeconds parameter
+ // governs the server-side bound on the blocking query. Do not add a separate client timeout.
+ HttpClient::KeyValueMap Parameters({{"index", std::to_string(InOutIndex)}, {"wait", fmt::format("{}s", WaitSeconds)}});
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", {}, Parameters);
+ if (!Result)
+ {
+ return false;
+ }
+
+ auto IndexIt = Result.Header->find("X-Consul-Index");
+ if (IndexIt != Result.Header->end())
+ {
+ std::optional<uint64_t> ParsedIndex = ParseInt<uint64_t>(IndexIt->second);
+ if (ParsedIndex.has_value())
+ {
+ InOutIndex = ParsedIndex.value();
+ }
+ }
+
+ return FindServiceInJson(Result.AsText(), ServiceId);
+}
+
+std::string
+ConsulClient::GetAgentServicesJson()
+{
+ HttpClient::Response Result = m_HttpClient.Get("v1/agent/services");
+ if (!Result)
+ {
+ return "{}";
+ }
+ return Result.ToText();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ServiceRegistration::ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info) : m_Client(Client), m_Info(Info)
+{
+ m_RegistrationThread = std::thread(&ServiceRegistration::RegistrationLoop, this);
+}
+
+ServiceRegistration::~ServiceRegistration()
+{
+ try
+ {
+ m_StopEvent.Set();
+
+ if (m_RegistrationThread.joinable())
+ {
+ m_RegistrationThread.join();
+ }
+
+ if (m_IsRegistered.load())
+ {
+ if (!m_Client->DeregisterService(m_Info.ServiceId))
+ {
+ ZEN_WARN("ServiceRegistration: Failed to deregister service '{}' during cleanup", m_Info.ServiceId);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR(":~ServiceRegistration() threw exception: {}", Ex.what());
+ }
+}
+
+void
+ServiceRegistration::WaitForReadyEvent(int MaxWaitMS)
+{
+ if (m_IsRegistered.load())
+ {
+ return;
+ }
+ m_ReadyWakeupEvent.Wait(MaxWaitMS);
+}
+
+void
+ServiceRegistration::RegistrationLoop()
+{
+ try
+ {
+ SetCurrentThreadName("ConsulRegistration");
+
+ // Exponential backoff delays: 100ms, 200ms, 400ms
+ constexpr int BackoffDelaysMs[] = {100, 200, 400};
+ constexpr int MaxAttempts = 3;
+ constexpr int RetryIntervalMs = 5000;
+
+ while (true)
+ {
+ bool Succeeded = false;
+ // Try to register with exponential backoff
+ for (int Attempt = 0; Attempt < MaxAttempts; ++Attempt)
+ {
+ if (m_Client->RegisterService(m_Info))
+ {
+ Succeeded = true;
+ break;
+ }
+
+ if (m_StopEvent.Wait(BackoffDelaysMs[Attempt]))
+ {
+ return;
+ }
+ }
+
+ if (Succeeded || m_Client->RegisterService(m_Info))
+ {
+ break;
+ }
+
+ // All attempts failed, log warning and wait before retrying
+ ZEN_WARN("ServiceRegistration: Failed to register service '{}' after {} attempts, retrying in {}s",
+ m_Info.ServiceId,
+ MaxAttempts + 1,
+ RetryIntervalMs / 1000);
+
+ // Wait for 5 seconds before retrying; m_StopEvent.Set() in the destructor
+ // wakes this immediately so the thread exits without polling.
+ if (m_StopEvent.Wait(RetryIntervalMs))
+ {
+ return;
+ }
+ }
+
+ ZEN_INFO("ServiceRegistration: Successfully registered service '{}'", m_Info.ServiceId);
+ m_IsRegistered.store(true);
+ m_ReadyWakeupEvent.Set();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("ServiceRegistration::RegistrationLoop failed with exception: {}", Ex.what());
+ }
+}
+
} // namespace zen::consul
diff --git a/src/zenutil/include/zenutil/consul.h b/src/zenutil/include/zenutil/consul.h
index 08871fa66..b5bfa42d1 100644
--- a/src/zenutil/include/zenutil/consul.h
+++ b/src/zenutil/include/zenutil/consul.h
@@ -5,11 +5,26 @@
#include <zenbase/zenbase.h>
#include <zenhttp/httpclient.h>
+#include <atomic>
+#include <cstdint>
#include <string>
#include <string_view>
+#include <thread>
namespace zen::consul {
+struct ServiceRegistrationInfo
+{
+ std::string ServiceId;
+ std::string ServiceName;
+ std::string Address;
+ uint16_t Port = 0;
+ std::string HealthEndpoint;
+ std::vector<std::pair<std::string, std::string>> Tags;
+ int HealthIntervalSeconds = 10;
+ int DeregisterAfterSeconds = 30;
+};
+
class ConsulClient
{
public:
@@ -23,7 +38,22 @@ public:
std::string GetKeyValue(std::string_view Key);
void DeleteKey(std::string_view Key);
+ bool RegisterService(const ServiceRegistrationInfo& Info);
+ bool DeregisterService(std::string_view ServiceId);
+
+ // Query methods for testing
+ bool HasService(std::string_view ServiceId);
+ std::string GetAgentServicesJson();
+
+ // Blocking query on v1/agent/services. Blocks until the service list changes or
+ // the wait period expires. InOutIndex must be 0 for the first call; it is updated
+ // to the new X-Consul-Index value on success (left unchanged on error).
+ // Returns true if ServiceId is present in the response.
+ bool WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int WaitSeconds);
+
private:
+ static bool FindServiceInJson(std::string_view Json, std::string_view ServiceId);
+
HttpClient m_HttpClient;
};
@@ -44,4 +74,36 @@ private:
std::unique_ptr<Impl> m_Impl;
};
+/**
+ * RAII wrapper for Consul service registration.
+ *
+ * Manages the lifecycle of a Consul service registration with:
+ * - Async registration in a background thread
+ * - Exponential backoff retry on failure (100ms, 200ms, 400ms)
+ * - Continuous background retry every 5 seconds after initial failures until registration succeeds
+ * - Automatic deregistration on destruction
+ */
+class ServiceRegistration
+{
+public:
+ ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info);
+ ~ServiceRegistration();
+
+ ServiceRegistration(const ServiceRegistration&) = delete;
+ ServiceRegistration& operator=(const ServiceRegistration&) = delete;
+
+ bool IsRegistered() const { return m_IsRegistered.load(); }
+ void WaitForReadyEvent(int MaxWaitMS);
+
+private:
+ ConsulClient* m_Client;
+ ServiceRegistrationInfo m_Info;
+ std::atomic<bool> m_IsRegistered{false};
+ std::thread m_RegistrationThread;
+ Event m_ReadyWakeupEvent;
+ Event m_StopEvent;
+
+ void RegistrationLoop();
+};
+
} // namespace zen::consul
diff --git a/src/zenutil/include/zenutil/zenserverprocess.h b/src/zenutil/include/zenutil/zenserverprocess.h
index 2a8617162..1b8750628 100644
--- a/src/zenutil/include/zenutil/zenserverprocess.h
+++ b/src/zenutil/include/zenutil/zenserverprocess.h
@@ -29,9 +29,34 @@ class CbObject;
class ZenServerEnvironment
{
public:
+ enum EStorageTag
+ {
+ Storage
+ };
+ enum EHubTag
+ {
+ Hub
+ };
+ enum ETestTag
+ {
+ Test
+ };
+
ZenServerEnvironment();
~ZenServerEnvironment();
+ ZenServerEnvironment(ZenServerEnvironment&&);
+
+ ZenServerEnvironment(EStorageTag, std::filesystem::path ProgramBaseDir);
+ ZenServerEnvironment(EHubTag,
+ std::filesystem::path ProgramBaseDir,
+ std::filesystem::path TestBaseDir,
+ std::string_view ServerClass = "");
+ ZenServerEnvironment(ETestTag,
+ std::filesystem::path ProgramBaseDir,
+ std::filesystem::path TestBaseDir,
+ std::string_view ServerClass = "");
+
void Initialize(std::filesystem::path ProgramBaseDir);
void InitializeForTest(std::filesystem::path ProgramBaseDir, std::filesystem::path TestBaseDir, std::string_view ServerClass = "");
void InitializeForHub(std::filesystem::path ProgramBaseDir, std::filesystem::path TestBaseDir, std::string_view ServerClass = "");
diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp
index b09c2d89a..b9c50be4f 100644
--- a/src/zenutil/zenserverprocess.cpp
+++ b/src/zenutil/zenserverprocess.cpp
@@ -503,6 +503,39 @@ ZenServerEnvironment::~ZenServerEnvironment()
{
}
+ZenServerEnvironment::ZenServerEnvironment(ZenServerEnvironment&& Other)
+: m_ProgramBaseDir(std::move(Other.m_ProgramBaseDir))
+, m_ChildProcessBaseDir(std::move(Other.m_ChildProcessBaseDir))
+, m_IsInitialized(Other.m_IsInitialized)
+, m_IsTestInstance(Other.m_IsTestInstance)
+, m_IsHubInstance(Other.m_IsHubInstance)
+, m_PassthroughOutput(Other.m_PassthroughOutput)
+, m_ServerClass(std::move(Other.m_ServerClass))
+, m_NextPortNumber(Other.m_NextPortNumber.load())
+{
+}
+
+ZenServerEnvironment::ZenServerEnvironment(EStorageTag, std::filesystem::path ProgramBaseDir)
+{
+ Initialize(ProgramBaseDir);
+}
+
+ZenServerEnvironment::ZenServerEnvironment(EHubTag,
+ std::filesystem::path ProgramBaseDir,
+ std::filesystem::path TestBaseDir,
+ std::string_view ServerClass)
+{
+ InitializeForHub(ProgramBaseDir, TestBaseDir, ServerClass);
+}
+
+ZenServerEnvironment::ZenServerEnvironment(ETestTag,
+ std::filesystem::path ProgramBaseDir,
+ std::filesystem::path TestBaseDir,
+ std::string_view ServerClass)
+{
+ InitializeForTest(ProgramBaseDir, TestBaseDir, ServerClass);
+}
+
void
ZenServerEnvironment::Initialize(std::filesystem::path ProgramBaseDir)
{