diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-11 09:45:31 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-11 09:45:31 +0100 |
| commit | e9d04250225a430cffed28e6a49299e3da542f97 (patch) | |
| tree | 7f99594bd930c072e159458319e822bbb18794f7 /src | |
| parent | minor zenstore/blockstore fixes (#821) (diff) | |
| download | zen-e9d04250225a430cffed28e6a49299e3da542f97.tar.xz zen-e9d04250225a430cffed28e6a49299e3da542f97.zip | |
hub consul integration (#820)
- Feature: Basic consul integration for zenserver hub mode, restricted to host local consul agent and register/deregister of services
- Feature: Added new options to zenserver hub mode
- `consul-endpoint` - Consul endpoint URL for service registration (empty = disabled)
- `hub-base-port-number` - Base port number for provisioned instances
- `hub-instance-limit` - Maximum number of provisioned instances for this hub
- `hub-use-job-object` - Enable the use of a Windows Job Object for child process management (Windows only)
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver-test/hub-tests.cpp | 137 | ||||
| -rw-r--r-- | src/zenserver/hub/httphubservice.cpp (renamed from src/zenserver/hub/hubservice.cpp) | 6 | ||||
| -rw-r--r-- | src/zenserver/hub/httphubservice.h (renamed from src/zenserver/hub/hubservice.h) | 0 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 507 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 50 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 155 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.h | 16 | ||||
| -rw-r--r-- | src/zenserver/storage/upstream/zen.cpp | 2 | ||||
| -rw-r--r-- | src/zenutil/consul/consul.cpp | 251 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/consul.h | 62 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/zenserverprocess.h | 25 | ||||
| -rw-r--r-- | src/zenutil/zenserverprocess.cpp | 33 |
12 files changed, 1200 insertions, 44 deletions
diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp index 11531e30f..9c1669256 100644 --- a/src/zenserver-test/hub-tests.cpp +++ b/src/zenserver-test/hub-tests.cpp @@ -19,6 +19,8 @@ # include <zenutil/zenserverprocess.h> # include <zenhttp/httpclient.h> # include <zenutil/consul.h> +# include <zencore/thread.h> +# include <zencore/timer.h> namespace zen::tests::hub { @@ -230,12 +232,33 @@ TEST_CASE("hub.lifecycle.children") } } -TEST_CASE("hub.consul.lifecycle" * doctest::skip()) +static bool +WaitForConsulService(consul::ConsulClient& Client, std::string_view ServiceId, bool ExpectedState, int TimeoutMs) { - zen::consul::ConsulProcess ConsulProc; + Stopwatch Timer; + uint64_t Index = 0; + while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs)) + { + uint64_t RemainingMs = static_cast<uint64_t>(TimeoutMs) - Timer.GetElapsedTimeMs(); + int WaitSeconds = std::max(1, static_cast<int>(RemainingMs / 1000)); + if (Client.WatchService(ServiceId, Index, WaitSeconds) == ExpectedState) + { + return true; + } + if (Index == 0) + { + Sleep(100); // error path only: avoid tight loop on persistent connection failure + } + } + return Client.HasService(ServiceId) == ExpectedState; +} + +TEST_CASE("hub.consul.kv") +{ + consul::ConsulProcess ConsulProc; ConsulProc.SpawnConsulAgent(); - zen::consul::ConsulClient Client("http://localhost:8500/"); + consul::ConsulClient Client("http://localhost:8500/"); Client.SetKeyValue("zen/hub/testkey", "testvalue"); std::string RetrievedValue = Client.GetKeyValue("zen/hub/testkey"); @@ -246,6 +269,114 @@ TEST_CASE("hub.consul.lifecycle" * doctest::skip()) ConsulProc.StopConsulAgent(); } +TEST_CASE("hub.consul.hub.registration") +{ + consul::ConsulProcess ConsulProc; + ConsulProc.SpawnConsulAgent(); + + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer); + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady("--consul-endpoint=http://localhost:8500/ --instance-id=test-instance"); + REQUIRE(PortNumber != 0); + + consul::ConsulClient Client("http://localhost:8500/"); + + REQUIRE(WaitForConsulService(Client, "zen-hub-test-instance", true, 5000)); + + Instance.Shutdown(); + + CHECK(!Client.HasService("zen-hub-test-instance")); + + ConsulProc.StopConsulAgent(); +} + +TEST_CASE("hub.consul.provision.registration") +{ + consul::ConsulProcess ConsulProc; + ConsulProc.SpawnConsulAgent(); + + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer); + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady("--consul-endpoint=http://localhost:8500/ --instance-id=test-instance"); + REQUIRE(PortNumber != 0); + + consul::ConsulClient Client("http://localhost:8500/"); + + REQUIRE(WaitForConsulService(Client, "zen-hub-test-instance", true, 5000)); + + HttpClient HubClient(Instance.GetBaseUri() + "/hub/"); + + HttpClient::Response Result = HubClient.Post("modules/testmod/provision"); + REQUIRE(Result); + + CHECK(Client.HasService("testmod")); + { + const uint16_t ModulePort = Result.AsObject()["port"].AsUInt16(0); + REQUIRE(ModulePort != 0); + + std::string JsonError; + CbFieldIterator ServicesRoot = LoadCompactBinaryFromJson(Client.GetAgentServicesJson(), JsonError); + REQUIRE(JsonError.empty()); + + CbObjectView ServicesMap; + for (CbFieldView F : ServicesRoot) + { + if (F.IsObject()) + { + ServicesMap = F.AsObjectView(); + } + } + REQUIRE(ServicesMap); + + // Verify fields registered by OnProvisioned + { + CbObjectView ModService = ServicesMap["testmod"].AsObjectView(); + CHECK_EQ(ModService["ID"sv].AsString(), "testmod"sv); + CHECK_EQ(ModService["Service"sv].AsString(), "zen-storage"sv); + CHECK_EQ(ModService["Port"sv].AsDouble(0), double(ModulePort)); + bool FoundModuleTag = false; + bool FoundHubTag = false; + bool FoundVersionTag = false; + for (CbFieldView Tag : ModService["Tags"].AsArrayView()) + { + std::string_view TagStr = Tag.AsString(); + if (TagStr == "module:testmod"sv) + { + FoundModuleTag = true; + } + else if (TagStr == "zen-hub:zen-hub-test-instance"sv) + { + FoundHubTag = true; + } + else if (TagStr.substr(0, 8) == "version:"sv) + { + FoundVersionTag = true; + } + } + CHECK(FoundModuleTag); + CHECK(FoundHubTag); + CHECK(FoundVersionTag); + } + + // Verify fields registered by InitializeConsulRegistration + { + CbObjectView HubService = ServicesMap["zen-hub-test-instance"].AsObjectView(); + CHECK_EQ(HubService["ID"sv].AsString(), "zen-hub-test-instance"sv); + CHECK_EQ(HubService["Service"sv].AsString(), "zen-hub"sv); + CHECK_EQ(HubService["Port"sv].AsDouble(0), double(PortNumber)); + } + } + + Result = HubClient.Post("modules/testmod/deprovision"); + REQUIRE(Result); + + CHECK(!Client.HasService("testmod")); + + Instance.Shutdown(); + + ConsulProc.StopConsulAgent(); +} + TEST_SUITE_END(); } // namespace zen::tests::hub diff --git a/src/zenserver/hub/hubservice.cpp b/src/zenserver/hub/httphubservice.cpp index 6765340dc..67ed0cfd8 100644 --- a/src/zenserver/hub/hubservice.cpp +++ b/src/zenserver/hub/httphubservice.cpp @@ -1,6 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include "hubservice.h" +#include "httphubservice.h" #include "hub.h" #include "storageserverinstance.h" @@ -74,7 +74,7 @@ HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub) try { - Hub::ProvisionedInstanceInfo Info; + HubProvisionedInstanceInfo Info; if (m_Hub.Provision(ModuleId, /* out */ Info, /* out */ FailureReason)) { CbObjectWriter Obj; @@ -144,7 +144,7 @@ HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub) CbObjectWriter Obj; Obj << "currentInstanceCount" << m_Hub.GetInstanceCount(); Obj << "maxInstanceCount" << m_Hub.GetMaxInstanceCount(); - Obj << "instanceLimit" << m_Hub.GetInstanceLimit(); + Obj << "instanceLimit" << m_Hub.GetConfig().InstanceLimit; Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); }, HttpVerb::kGet); diff --git a/src/zenserver/hub/hubservice.h b/src/zenserver/hub/httphubservice.h index d08eeea2a..d08eeea2a 100644 --- a/src/zenserver/hub/hubservice.h +++ b/src/zenserver/hub/httphubservice.h diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index f1cc86de6..c9720b32d 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -15,6 +15,13 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <gsl/gsl-lite.hpp> ZEN_THIRD_PARTY_INCLUDES_END +#if ZEN_WITH_TESTS +# include <zencore/filesystem.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +#endif + namespace zen { /////////////////////////////////////////////////////////////////////////// @@ -111,14 +118,19 @@ private: ////////////////////////////////////////////////////////////////////////// -Hub::Hub(const Configuration& Config) : m_UseJobObject(Config.UseJobObject) +Hub::Hub(const Configuration& Config, + ZenServerEnvironment&& RunEnvironment, + ProvisionModuleCallbackFunc&& ProvisionedModuleCallback, + ProvisionModuleCallbackFunc&& DeprovisionedModuleCallback) +: m_Config(Config) +, m_RunEnvironment(std::move(RunEnvironment)) +, m_ProvisionedModuleCallback(std::move(ProvisionedModuleCallback)) +, m_DeprovisionedModuleCallback(std::move(DeprovisionedModuleCallback)) { m_HostMetrics = GetSystemMetrics(); m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024; - m_RunEnvironment.InitializeForHub(Config.HubBaseDir, Config.ChildBaseDir); - m_FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); ZEN_INFO("using file hydration path: '{}'", m_FileHydrationPath); @@ -128,13 +140,10 @@ Hub::Hub(const Configuration& Config) : m_UseJobObject(Config.UseJobObject) // This is necessary to ensure the hub assigns a distinct port range. // We need to do this primarily because otherwise automated tests will // fail as the test runner will create processes in the default range. - // We should probably make this configurable or dynamic for maximum - // flexibility, and to allow running multiple hubs on the same host if - // necessary. - m_RunEnvironment.SetNextPortNumber(21000); + m_RunEnvironment.SetNextPortNumber(m_Config.BasePortNumber); #if ZEN_PLATFORM_WINDOWS - if (m_UseJobObject) + if (m_Config.UseJobObject) { m_JobObject.Initialize(); if (m_JobObject.IsValid()) @@ -158,6 +167,20 @@ Hub::~Hub() m_Lock.WithExclusiveLock([this] { for (auto& [ModuleId, Instance] : m_Instances) { + uint16_t BasePort = Instance->GetBasePort(); + std::string BaseUri; // TODO? + + if (m_DeprovisionedModuleCallback) + { + try + { + m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort}); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); + } + } Instance->Deprovision(); } m_Instances.clear(); @@ -170,7 +193,7 @@ Hub::~Hub() } bool -Hub::Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason) +Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason) { StorageServerInstance* Instance = nullptr; bool IsNewInstance = false; @@ -241,9 +264,20 @@ Hub::Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std: } OutInfo.Port = Instance->GetBasePort(); - // TODO: base URI? Would need to know what host name / IP to use + if (m_ProvisionedModuleCallback) + { + try + { + m_ProvisionedModuleCallback(ModuleId, OutInfo); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Provision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); + } + } + return true; } @@ -279,6 +313,21 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) } } + uint16_t BasePort = Instance->GetBasePort(); + std::string BaseUri; // TODO? + + if (m_DeprovisionedModuleCallback) + { + try + { + m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort}); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); + } + } + // The module is deprovisioned outside the lock to avoid blocking other operations. // // To ensure that no new provisioning can occur while we're deprovisioning, @@ -362,9 +411,9 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) return false; } - if (gsl::narrow_cast<int>(m_Instances.size()) >= m_InstanceLimit) + if (gsl::narrow_cast<int>(m_Instances.size()) >= m_Config.InstanceLimit) { - OutReason = fmt::format("instance limit exceeded ({})", m_InstanceLimit); + OutReason = fmt::format("instance limit exceeded ({})", m_Config.InstanceLimit); return false; } @@ -374,6 +423,438 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) return true; } -/////////////////////////////////////////////////////////////////////////// +#if ZEN_WITH_TESTS + +TEST_SUITE_BEGIN("server.hub"); + +namespace hub_testutils { + + ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir) + { + return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir); + } + + std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir, + Hub::Configuration Config = {}, + Hub::ProvisionModuleCallbackFunc ProvisionCallback = {}, + Hub::ProvisionModuleCallbackFunc DeprovisionCallback = {}) + { + return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(ProvisionCallback), std::move(DeprovisionCallback)); + } + +} // namespace hub_testutils + +TEST_CASE("hub.provision_basic") +{ + ScopedTemporaryDirectory TempDir; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + CHECK_FALSE(HubInstance->Find("module_a")); + + HubProvisionedInstanceInfo Info; + std::string Reason; + const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason); + REQUIRE_MESSAGE(ProvisionResult, Reason); + CHECK_NE(Info.Port, 0); + CHECK_EQ(HubInstance->GetInstanceCount(), 1); + CHECK(HubInstance->Find("module_a")); + + const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason); + CHECK(DeprovisionResult); + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + CHECK_FALSE(HubInstance->Find("module_a")); +} + +TEST_CASE("hub.provision_callbacks") +{ + ScopedTemporaryDirectory TempDir; + + struct CallbackRecord + { + std::string ModuleId; + uint16_t Port; + }; + RwLock CallbackMutex; + std::vector<CallbackRecord> ProvisionRecords; + std::vector<CallbackRecord> DeprovisionRecords; + + auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { + CallbackMutex.WithExclusiveLock([&]() { ProvisionRecords.push_back({std::string(ModuleId), Info.Port}); }); + }; + auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { + CallbackMutex.WithExclusiveLock([&]() { DeprovisionRecords.push_back({std::string(ModuleId), Info.Port}); }); + }; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(ProvisionCb), std::move(DeprovisionCb)); + + HubProvisionedInstanceInfo Info; + std::string Reason; + + const bool ProvisionResult = HubInstance->Provision("cb_module", Info, Reason); + REQUIRE_MESSAGE(ProvisionResult, Reason); + + { + RwLock::SharedLockScope _(CallbackMutex); + REQUIRE_EQ(ProvisionRecords.size(), 1u); + CHECK_EQ(ProvisionRecords[0].ModuleId, "cb_module"); + CHECK_EQ(ProvisionRecords[0].Port, Info.Port); + CHECK_NE(ProvisionRecords[0].Port, 0); + } + + const bool DeprovisionResult = HubInstance->Deprovision("cb_module", Reason); + CHECK(DeprovisionResult); + + { + RwLock::SharedLockScope _(CallbackMutex); + REQUIRE_EQ(DeprovisionRecords.size(), 1u); + CHECK_EQ(DeprovisionRecords[0].ModuleId, "cb_module"); + CHECK_NE(DeprovisionRecords[0].Port, 0); + CHECK_EQ(ProvisionRecords.size(), 1u); + } +} + +TEST_CASE("hub.instance_limit") +{ + ScopedTemporaryDirectory TempDir; + Hub::Configuration Config; + Config.InstanceLimit = 2; + Config.BasePortNumber = 21500; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + + HubProvisionedInstanceInfo Info; + std::string Reason; + + const bool FirstResult = HubInstance->Provision("limit_a", Info, Reason); + REQUIRE_MESSAGE(FirstResult, Reason); + + const bool SecondResult = HubInstance->Provision("limit_b", Info, Reason); + REQUIRE_MESSAGE(SecondResult, Reason); + CHECK_EQ(HubInstance->GetInstanceCount(), 2); + + Reason.clear(); + const bool ThirdResult = HubInstance->Provision("limit_c", Info, Reason); + CHECK_FALSE(ThirdResult); + CHECK_EQ(HubInstance->GetInstanceCount(), 2); + CHECK_NE(Reason.find("instance limit"), std::string::npos); + + HubInstance->Deprovision("limit_a", Reason); + CHECK_EQ(HubInstance->GetInstanceCount(), 1); + + Reason.clear(); + const bool FourthResult = HubInstance->Provision("limit_d", Info, Reason); + CHECK_MESSAGE(FourthResult, Reason); + CHECK_EQ(HubInstance->GetInstanceCount(), 2); +} + +TEST_CASE("hub.deprovision_nonexistent") +{ + ScopedTemporaryDirectory TempDir; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + std::string Reason; + const bool Result = HubInstance->Deprovision("never_provisioned", Reason); + CHECK_FALSE(Result); + CHECK(Reason.empty()); + CHECK_EQ(HubInstance->GetInstanceCount(), 0); +} + +TEST_CASE("hub.enumerate_modules") +{ + ScopedTemporaryDirectory TempDir; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + HubProvisionedInstanceInfo Info; + std::string Reason; + + REQUIRE_MESSAGE(HubInstance->Provision("enum_a", Info, Reason), Reason); + REQUIRE_MESSAGE(HubInstance->Provision("enum_b", Info, Reason), Reason); + + std::vector<std::string> Ids; + HubInstance->EnumerateModules([&](StorageServerInstance& Instance) { Ids.push_back(std::string(Instance.GetModuleId())); }); + CHECK_EQ(Ids.size(), 2u); + const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end(); + const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end(); + CHECK(FoundA); + CHECK(FoundB); + + HubInstance->Deprovision("enum_a", Reason); + Ids.clear(); + HubInstance->EnumerateModules([&](StorageServerInstance& Instance) { Ids.push_back(std::string(Instance.GetModuleId())); }); + REQUIRE_EQ(Ids.size(), 1u); + CHECK_EQ(Ids[0], "enum_b"); +} + +TEST_CASE("hub.max_instance_count") +{ + ScopedTemporaryDirectory TempDir; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + CHECK_EQ(HubInstance->GetMaxInstanceCount(), 0); + + HubProvisionedInstanceInfo Info; + std::string Reason; + + REQUIRE_MESSAGE(HubInstance->Provision("max_a", Info, Reason), Reason); + CHECK_GE(HubInstance->GetMaxInstanceCount(), 1); + + REQUIRE_MESSAGE(HubInstance->Provision("max_b", Info, Reason), Reason); + CHECK_GE(HubInstance->GetMaxInstanceCount(), 2); + + const int MaxAfterTwo = HubInstance->GetMaxInstanceCount(); + + HubInstance->Deprovision("max_a", Reason); + CHECK_EQ(HubInstance->GetInstanceCount(), 1); + CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo); +} + +TEST_CASE("hub.concurrent") +{ + ScopedTemporaryDirectory TempDir; + Hub::Configuration Config; + Config.BasePortNumber = 22000; + Config.InstanceLimit = 10; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + + constexpr int kHalf = 3; + + // Serially pre-provision kHalf modules + for (int I = 0; I < kHalf; ++I) + { + HubProvisionedInstanceInfo Info; + std::string Reason; + REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason); + } + CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); + + // Simultaneously: + // Provisioner pool → provisions kHalf new modules ("new_0" .. "new_N") + // Deprovisioner pool → deprovisions the kHalf pre-provisioned modules ("pre_0" .. "pre_N") + // The two pools use distinct OS threads, so provisions and deprovisions are interleaved. + + // Use int rather than bool to avoid std::vector<bool> bitfield packing, + // which would cause data races on concurrent per-index writes. + std::vector<int> ProvisionResults(kHalf, 0); + std::vector<std::string> ProvisionReasons(kHalf); + std::vector<int> DeprovisionResults(kHalf, 0); + + { + WorkerThreadPool Provisioners(kHalf, "hub_test_provisioners"); + WorkerThreadPool Deprovisioneers(kHalf, "hub_test_deprovisioneers"); + + std::vector<std::future<void>> ProvisionFutures(kHalf); + std::vector<std::future<void>> DeprovisionFutures(kHalf); + + for (int I = 0; I < kHalf; ++I) + { + ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] { + HubProvisionedInstanceInfo Info; + std::string Reason; + const bool Result = + HubInstance->Provision(fmt::format("new_{}", I), Info, Reason); + ProvisionResults[I] = Result ? 1 : 0; + ProvisionReasons[I] = Reason; + }), + WorkerThreadPool::EMode::EnableBacklog); + + DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] { + std::string Reason; + const bool Result = + HubInstance->Deprovision(fmt::format("pre_{}", I), Reason); + DeprovisionResults[I] = Result ? 1 : 0; + }), + WorkerThreadPool::EMode::EnableBacklog); + } + + for (std::future<void>& F : ProvisionFutures) + { + F.get(); + } + for (std::future<void>& F : DeprovisionFutures) + { + F.get(); + } + } + + for (int I = 0; I < kHalf; ++I) + { + CHECK_MESSAGE(ProvisionResults[I] != 0, ProvisionReasons[I]); + CHECK(DeprovisionResults[I] != 0); + } + // Only the newly provisioned modules should remain + CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); +} + +TEST_CASE("hub.concurrent_callbacks") +{ + ScopedTemporaryDirectory TempDir; + Hub::Configuration Config; + Config.BasePortNumber = 22300; + Config.InstanceLimit = 10; + + struct CallbackRecord + { + std::string ModuleId; + uint16_t Port; + }; + RwLock CallbackMutex; + std::vector<CallbackRecord> ProvisionCallbacks; + std::vector<CallbackRecord> DeprovisionCallbacks; + + auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { + CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); + }; + auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { + CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); + }; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, std::move(ProvisionCb), std::move(DeprovisionCb)); + + constexpr int kHalf = 3; + + // Serially pre-provision kHalf modules and drain the resulting callbacks before the + // concurrent phase so we start with a clean slate. + for (int I = 0; I < kHalf; ++I) + { + HubProvisionedInstanceInfo Info; + std::string Reason; + REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason); + } + CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); + + { + RwLock::ExclusiveLockScope _(CallbackMutex); + REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); + ProvisionCallbacks.clear(); + } + + // Concurrently provision kHalf new modules while deprovisioning the pre-provisioned ones. + std::vector<int> ProvisionResults(kHalf, 0); + std::vector<std::string> ProvisionReasons(kHalf); + std::vector<int> DeprovisionResults(kHalf, 0); + + { + WorkerThreadPool Provisioners(kHalf, "hub_cbtest_provisioners"); + WorkerThreadPool Deprovisioneers(kHalf, "hub_cbtest_deprovisioneers"); + + std::vector<std::future<void>> ProvisionFutures(kHalf); + std::vector<std::future<void>> DeprovisionFutures(kHalf); + + for (int I = 0; I < kHalf; ++I) + { + ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] { + HubProvisionedInstanceInfo Info; + std::string Reason; + const bool Result = + HubInstance->Provision(fmt::format("new_{}", I), Info, Reason); + ProvisionResults[I] = Result ? 1 : 0; + ProvisionReasons[I] = Reason; + }), + WorkerThreadPool::EMode::EnableBacklog); + + DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] { + std::string Reason; + const bool Result = + HubInstance->Deprovision(fmt::format("pre_{}", I), Reason); + DeprovisionResults[I] = Result ? 1 : 0; + }), + WorkerThreadPool::EMode::EnableBacklog); + } + + for (std::future<void>& F : ProvisionFutures) + { + F.get(); + } + for (std::future<void>& F : DeprovisionFutures) + { + F.get(); + } + } + + // All operations must have succeeded + for (int I = 0; I < kHalf; ++I) + { + CHECK_MESSAGE(ProvisionResults[I] != 0, ProvisionReasons[I]); + CHECK(DeprovisionResults[I] != 0); + } + CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); + + // Each new_* module must have triggered exactly one provision callback with a non-zero port. + // Each pre_* module must have triggered exactly one deprovision callback with a non-zero port. + { + RwLock::SharedLockScope _(CallbackMutex); + REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); + REQUIRE_EQ(DeprovisionCallbacks.size(), static_cast<size_t>(kHalf)); + + for (const CallbackRecord& Record : ProvisionCallbacks) + { + CHECK_NE(Record.Port, 0); + const bool IsNewModule = Record.ModuleId.rfind("new_", 0) == 0; + CHECK_MESSAGE(IsNewModule, Record.ModuleId); + } + for (const CallbackRecord& Record : DeprovisionCallbacks) + { + CHECK_NE(Record.Port, 0); + const bool IsPreModule = Record.ModuleId.rfind("pre_", 0) == 0; + CHECK_MESSAGE(IsPreModule, Record.ModuleId); + } + } +} + +# if ZEN_PLATFORM_WINDOWS +TEST_CASE("hub.job_object") +{ + SUBCASE("UseJobObject=true") + { + ScopedTemporaryDirectory TempDir; + Hub::Configuration Config; + Config.UseJobObject = true; + Config.BasePortNumber = 22100; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + HubProvisionedInstanceInfo Info; + std::string Reason; + + const bool ProvisionResult = HubInstance->Provision("jobobj_a", Info, Reason); + REQUIRE_MESSAGE(ProvisionResult, Reason); + CHECK_NE(Info.Port, 0); + + const bool DeprovisionResult = HubInstance->Deprovision("jobobj_a", Reason); + CHECK(DeprovisionResult); + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + } + + SUBCASE("UseJobObject=false") + { + ScopedTemporaryDirectory TempDir; + Hub::Configuration Config; + Config.UseJobObject = false; + Config.BasePortNumber = 22200; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + HubProvisionedInstanceInfo Info; + std::string Reason; + + const bool ProvisionResult = HubInstance->Provision("nojobobj_a", Info, Reason); + REQUIRE_MESSAGE(ProvisionResult, Reason); + CHECK_NE(Info.Port, 0); + + const bool DeprovisionResult = HubInstance->Deprovision("nojobobj_a", Reason); + CHECK(DeprovisionResult); + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + } +} +# endif // ZEN_PLATFORM_WINDOWS + +TEST_SUITE_END(); + +void +hub_forcelink() +{ +} + +#endif // ZEN_WITH_TESTS } // namespace zen diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index caf8890ff..8b61f988d 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -22,33 +22,40 @@ class StorageServerInstance; * * Core logic for managing storage server instances on behalf of external clients. */ + +struct HubProvisionedInstanceInfo +{ + std::string BaseUri; + uint16_t Port; +}; + class Hub { public: struct Configuration { - std::filesystem::path HubBaseDir; - std::filesystem::path ChildBaseDir; - /** Enable or disable the use of a Windows Job Object for child process management. * When enabled, all spawned child processes are assigned to a job object with * JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, ensuring children are terminated if the hub * crashes or is force-killed. Must be called before Initialize(). No-op on non-Windows. */ - bool UseJobObject = true; + bool UseJobObject = true; + uint16_t BasePortNumber = 21000; + + int InstanceLimit = 1000; }; - Hub(const Configuration& Config); + + typedef std::function<void(std::string_view ModuleId, const HubProvisionedInstanceInfo& Info)> ProvisionModuleCallbackFunc; + + Hub(const Configuration& Config, + ZenServerEnvironment&& RunEnvironment, + ProvisionModuleCallbackFunc&& ProvisionedModuleCallback = {}, + ProvisionModuleCallbackFunc&& DeprovisionedModuleCallback = {}); ~Hub(); Hub(const Hub&) = delete; Hub& operator=(const Hub&) = delete; - struct ProvisionedInstanceInfo - { - std::string BaseUri; - uint16_t Port; - }; - /** * Provision a storage server instance for the given module ID. * @@ -56,7 +63,7 @@ public: * @param OutInfo If successful, information about the provisioned instance will be returned here. * @param OutReason If unsuccessful, the reason will be returned here. */ - bool Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason); + bool Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason); /** * Deprovision a storage server instance for the given module ID. @@ -90,13 +97,20 @@ public: int GetInstanceCount(); - int GetInstanceLimit() const { return m_InstanceLimit; } int GetMaxInstanceCount() const { return m_MaxInstanceCount; } + const Configuration& GetConfig() const { return m_Config; } + private: - ZenServerEnvironment m_RunEnvironment; + const Configuration m_Config; + ZenServerEnvironment m_RunEnvironment; + + ProvisionModuleCallbackFunc m_ProvisionedModuleCallback; + ProvisionModuleCallbackFunc m_DeprovisionedModuleCallback; + std::filesystem::path m_FileHydrationPath; std::filesystem::path m_HydrationTempPath; + #if ZEN_PLATFORM_WINDOWS JobObject m_JobObject; #endif @@ -104,15 +118,17 @@ private: std::unordered_map<std::string, std::unique_ptr<StorageServerInstance>> m_Instances; std::unordered_set<std::string> m_DeprovisioningModules; std::unordered_set<std::string> m_ProvisioningModules; - int m_MaxInstanceCount = 0; - int m_InstanceLimit = 1000; ResourceMetrics m_ResourceLimits; SystemMetrics m_HostMetrics; - bool m_UseJobObject = true; + int m_MaxInstanceCount = 0; void UpdateStats(); void UpdateCapacityMetrics(); bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); }; +#if ZEN_WITH_TESTS +void hub_forcelink(); +#endif // ZEN_WITH_TESTS + } // namespace zen diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index 64ab0bd9e..a019ff295 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -3,9 +3,10 @@ #include "zenhubserver.h" #include "frontend/frontend.h" +#include "httphubservice.h" #include "hub.h" -#include "hubservice.h" +#include <zencore/config.h> #include <zencore/fmtutils.h> #include <zencore/memory/llm.h> #include <zencore/memory/memorytrace.h> @@ -38,6 +39,35 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) "Instance ID for use in notifications", cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""), ""); + + Options.add_option("hub", + "", + "consul-endpoint", + "Consul endpoint URL for service registration (empty = disabled)", + cxxopts::value<std::string>(m_ServerOptions.ConsulEndpoint)->default_value(""), + ""); + + Options.add_option("hub", + "", + "hub-base-port-number", + "Base port number for provisioned instances", + cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber)->default_value("21000"), + ""); + + Options.add_option("hub", + "", + "hub-instance-limit", + "Maximum number of provisioned instances for this hub", + cxxopts::value<int>(m_ServerOptions.HubInstanceLimit)->default_value("1000"), + ""); +#if ZEN_PLATFORM_WINDOWS + Options.add_option("hub", + "", + "hub-use-job-object", + "Enable the use of a Windows Job Object for child process management", + cxxopts::value<bool>(m_ServerOptions.HubUseJobObject)->default_value("true"), + ""); +#endif // ZEN_PLATFORM_WINDOWS } void @@ -74,6 +104,56 @@ ZenHubServer::~ZenHubServer() Cleanup(); } +void +ZenHubServer::OnProvisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) +{ + if (m_ConsulClient) + { + consul::ServiceRegistrationInfo ServiceInfo{ + .ServiceId = std::string(ModuleId), + .ServiceName = "zen-storage", + // .Address = "localhost", // Let the consul agent figure out out external address // TODO: Info.BaseUri? + .Port = Info.Port, + .HealthEndpoint = "health", + .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)), + std::make_pair("zen-hub", std::string(HubInstanceId)), + std::make_pair("version", std::string(ZEN_CFG_VERSION))}, + .HealthIntervalSeconds = 10, + .DeregisterAfterSeconds = 30}; + + if (!m_ConsulClient->RegisterService(ServiceInfo)) + { + ZEN_WARN("Failed to register storage server instance for module '{}' with Consul, continuing anyway", ModuleId); + } + else + { + ZEN_INFO("Registered storage server instance for module '{}' at port {} with Consul as '{}'", + ModuleId, + Info.Port, + ServiceInfo.ServiceName); + } + } +} + +void +ZenHubServer::OnDeprovisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) +{ + ZEN_UNUSED(HubInstanceId); + if (m_ConsulClient) + { + if (!m_ConsulClient->DeregisterService(ModuleId)) + { + ZEN_WARN("Failed to deregister storage server instance for module '{}' at port {} from Consul, continuing anyway", + ModuleId, + Info.Port); + } + else + { + ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port); + } + } +} + int ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry) { @@ -96,6 +176,7 @@ ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState: m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; InitializeState(ServerConfig); + InitializeConsulRegistration(ServerConfig, EffectiveBasePort); InitializeServices(ServerConfig); RegisterServices(ServerConfig); @@ -127,6 +208,9 @@ ZenHubServer::Cleanup() m_HubService.reset(); m_ApiService.reset(); m_Hub.reset(); + + m_ConsulRegistration.reset(); + m_ConsulClient.reset(); } catch (const std::exception& Ex) { @@ -147,7 +231,18 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) ZEN_INFO("instantiating Hub"); m_Hub = std::make_unique<Hub>( - Hub::Configuration{.HubBaseDir = ServerConfig.DataDir / "hub", .ChildBaseDir = ServerConfig.DataDir / "servers"}); + Hub::Configuration{.UseJobObject = ServerConfig.HubUseJobObject, + .BasePortNumber = ServerConfig.HubBasePortNumber, + .InstanceLimit = ServerConfig.HubInstanceLimit}, + ZenServerEnvironment(ZenServerEnvironment::Hub, ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers"), + m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)]( + std::string_view ModuleId, + const HubProvisionedInstanceInfo& Info) { OnProvisioned(HubInstanceId, ModuleId, Info); } + : Hub::ProvisionModuleCallbackFunc{}, + m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)]( + std::string_view ModuleId, + const HubProvisionedInstanceInfo& Info) { OnDeprovisioned(HubInstanceId, ModuleId, Info); } + : Hub::ProvisionModuleCallbackFunc{}); ZEN_INFO("instantiating API service"); m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http); @@ -181,6 +276,47 @@ ZenHubServer::RegisterServices(const ZenHubServerConfig& ServerConfig) } void +ZenHubServer::InitializeConsulRegistration(const ZenHubServerConfig& ServerConfig, int EffectivePort) +{ + if (ServerConfig.ConsulEndpoint.empty()) + { + ZEN_INFO("Consul registration disabled (no endpoint configured)"); + return; + } + + ZEN_INFO("Initializing Consul registration with endpoint: {}", ServerConfig.ConsulEndpoint); + + try + { + m_ConsulClient = std::make_unique<consul::ConsulClient>(ServerConfig.ConsulEndpoint); + + consul::ServiceRegistrationInfo Info; + Info.ServiceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId); + Info.ServiceName = "zen-hub"; + // Info.Address = "localhost"; // Let the consul agent figure out out external address // TODO: Info.BaseUri? + Info.Port = static_cast<uint16_t>(EffectivePort); + Info.HealthEndpoint = "hub/health"; + Info.Tags = std::vector<std::pair<std::string, std::string>>{ + std::make_pair("zen-hub", Info.ServiceId), + std::make_pair("version", std::string(ZEN_CFG_VERSION)), + std::make_pair("base-port-number", fmt::format("{}", ServerConfig.HubBasePortNumber)), + std::make_pair("instance-limit", fmt::format("{}", ServerConfig.HubInstanceLimit)), + std::make_pair("use-job-object", fmt::format("{}", ServerConfig.HubUseJobObject))}; + + m_ConsulRegistration = std::make_unique<consul::ServiceRegistration>(m_ConsulClient.get(), Info); + + ZEN_INFO("Consul service registration initiated for service ID: {}", Info.ServiceId); + } + catch (const std::exception& Ex) + { + // REQ-F-12: Hub should start successfully even if Consul registration fails + ZEN_WARN("Failed to initialize Consul registration (hub will continue without it): {}", Ex.what()); + m_ConsulRegistration.reset(); + m_ConsulClient.reset(); + } +} + +void ZenHubServer::Run() { if (m_ProcessMonitor.IsActive()) @@ -228,6 +364,21 @@ ZenHubServer::Run() const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode; + if (m_ConsulRegistration) + { + if (!m_ConsulRegistration->IsRegistered()) + { + ZEN_INFO("Waiting for consul integration to register..."); + m_ConsulRegistration->WaitForReadyEvent(2000); + } + if (!m_ConsulRegistration->IsRegistered()) + { + m_ConsulClient.reset(); + m_ConsulRegistration.reset(); + ZEN_WARN("Consul registration failed, running without consul integration"); + } + } + SetNewState(kRunning); OnReady(); diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h index a3eeb2ead..f6a3eb1bc 100644 --- a/src/zenserver/hub/zenhubserver.h +++ b/src/zenserver/hub/zenhubserver.h @@ -4,6 +4,8 @@ #include "zenserver.h" +#include <zenutil/consul.h> + namespace cxxopts { class Options; } @@ -20,10 +22,15 @@ class HttpHubService; struct ZenHubServerConfig : public ZenServerConfig { std::string UpstreamNotificationEndpoint; - std::string InstanceId; // For use in notifications + std::string InstanceId; // For use in notifications + std::string ConsulEndpoint; // If set, enables Consul service registration + uint16_t HubBasePortNumber = 21000; + int HubInstanceLimit = 1000; + bool HubUseJobObject = true; }; class Hub; +struct HubProvisionedInstanceInfo; struct ZenHubServerConfigurator : public ZenServerConfiguratorBase { @@ -78,6 +85,9 @@ public: void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } private: + void OnProvisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info); + void OnDeprovisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info); + bool m_IsDedicatedMode = false; bool m_TestMode = false; std::filesystem::path m_DataRoot; @@ -90,9 +100,13 @@ private: std::unique_ptr<HttpApiService> m_ApiService; std::unique_ptr<HttpFrontendService> m_FrontendService; + std::unique_ptr<consul::ConsulClient> m_ConsulClient; + std::unique_ptr<consul::ServiceRegistration> m_ConsulRegistration; + void InitializeState(const ZenHubServerConfig& ServerConfig); void InitializeServices(const ZenHubServerConfig& ServerConfig); void RegisterServices(const ZenHubServerConfig& ServerConfig); + void InitializeConsulRegistration(const ZenHubServerConfig& ServerConfig, int EffectivePort); }; } // namespace zen diff --git a/src/zenserver/storage/upstream/zen.cpp b/src/zenserver/storage/upstream/zen.cpp index 423c9039c..a3807675c 100644 --- a/src/zenserver/storage/upstream/zen.cpp +++ b/src/zenserver/storage/upstream/zen.cpp @@ -63,7 +63,7 @@ ZenStructuredCacheSession::CheckHealth() return {.Bytes = Response.DownloadedBytes, .ElapsedSeconds = Response.ElapsedSeconds, - .Success = Response.StatusCode == HttpResponseCode::OK}; + .Success = Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::NoContent}; } ZenCacheResult diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp index 6ddebf97a..272096ebb 100644 --- a/src/zenutil/consul/consul.cpp +++ b/src/zenutil/consul/consul.cpp @@ -2,11 +2,14 @@ #include <zenutil/consul.h> +#include <zencore/compactbinarybuilder.h> #include <zencore/except_fmt.h> +#include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/process.h> #include <zencore/string.h> +#include <zencore/thread.h> #include <zencore/timer.h> #include <fmt/format.h> @@ -30,7 +33,8 @@ struct ConsulProcess::Impl CreateProcOptions Options; Options.Flags |= CreateProcOptions::Flag_Windows_NewProcessGroup; - CreateProcResult Result = CreateProc("consul" ZEN_EXE_SUFFIX_LITERAL, "consul" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options); + const std::filesystem::path ConsulExe = GetRunningExecutablePath().parent_path() / ("consul" ZEN_EXE_SUFFIX_LITERAL); + CreateProcResult Result = CreateProc(ConsulExe, "consul" ZEN_EXE_SUFFIX_LITERAL " agent -dev", Options); if (Result) { @@ -107,9 +111,9 @@ ConsulClient::~ConsulClient() void ConsulClient::SetKeyValue(std::string_view Key, std::string_view Value) { - IoBuffer ValueBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Value)); - HttpClient::Response Result = - m_HttpClient.Put(fmt::format("v1/kv/{}", Key), ValueBuffer, {{"Content-Type", "text/plain"}, {"Accept", "application/json"}}); + IoBuffer ValueBuffer = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Value)); + ValueBuffer.SetContentType(HttpContentType::kText); + HttpClient::Response Result = m_HttpClient.Put(fmt::format("v1/kv/{}", Key), ValueBuffer, HttpClient::Accept(HttpContentType::kJSON)); if (!Result) { throw runtime_error("ConsulClient::SetKeyValue() failed to set key '{}' ({})", Key, Result.ErrorMessage("")); @@ -137,4 +141,243 @@ ConsulClient::DeleteKey(std::string_view Key) } } +bool +ConsulClient::RegisterService(const ServiceRegistrationInfo& Info) +{ + using namespace std::literals; + + CbObjectWriter Writer; + { + Writer.AddString("ID"sv, Info.ServiceId); + Writer.AddString("Name"sv, Info.ServiceName); + if (!Info.Address.empty()) + { + Writer.AddString("Address"sv, Info.Address); + } + Writer.AddInteger("Port"sv, Info.Port); + if (!Info.Tags.empty()) + { + Writer.BeginArray("Tags"sv); + for (const std::pair<std::string, std::string>& Tag : Info.Tags) + { + Writer.AddString(fmt::format("{}:{}", Tag.first, Tag.second)); + } + Writer.EndArray(); // Tags + } + Writer.BeginObject("Check"sv); + { + Writer.AddString("HTTP"sv, fmt::format("http://{}:{}/{}", Info.Address, Info.Port, Info.HealthEndpoint)); + Writer.AddString("Interval"sv, fmt::format("{}s", Info.HealthIntervalSeconds)); + Writer.AddString("DeregisterCriticalServiceAfter"sv, fmt::format("{}s", Info.DeregisterAfterSeconds)); + } + Writer.EndObject(); // Check + } + + ExtendableStringBuilder<512> SB; + CompactBinaryToJson(Writer.Save(), SB); + + IoBuffer PayloadBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()); + PayloadBuffer.SetContentType(HttpContentType::kJSON); + HttpClient::Response Result = m_HttpClient.Put("v1/agent/service/register", PayloadBuffer, HttpClient::Accept(HttpContentType::kJSON)); + + if (!Result) + { + ZEN_WARN("ConsulClient::RegisterService() failed to register service '{}' ({})", Info.ServiceId, Result.ErrorMessage("")); + return false; + } + + return true; +} + +bool +ConsulClient::DeregisterService(std::string_view ServiceId) +{ + HttpClient::Response Result = + m_HttpClient.Put(fmt::format("v1/agent/service/deregister/{}", ServiceId), HttpClient::Accept(HttpContentType::kJSON)); + + if (!Result) + { + ZEN_WARN("ConsulClient::DeregisterService() failed to deregister service '{}' ({})", ServiceId, Result.ErrorMessage("")); + return false; + } + + return true; +} + +bool +ConsulClient::FindServiceInJson(std::string_view Json, std::string_view ServiceId) +{ + using namespace std::literals; + + std::string JsonError; + CbFieldIterator Root = LoadCompactBinaryFromJson(Json, JsonError); + if (Root && JsonError.empty()) + { + for (CbFieldView RootIterator : Root) + { + CbObjectView ServiceObject = RootIterator.AsObjectView(); + if (ServiceObject) + { + for (CbFieldView ServiceIterator : ServiceObject) + { + CbObjectView ObjectIterator = ServiceIterator.AsObjectView(); + if (ObjectIterator["ID"sv].AsString() == ServiceId) + { + return true; + } + } + } + } + } + return false; +} + +bool +ConsulClient::HasService(std::string_view ServiceId) +{ + HttpClient::Response Result = m_HttpClient.Get("v1/agent/services"); + if (!Result) + { + return false; + } + return FindServiceInJson(Result.AsText(), ServiceId); +} + +bool +ConsulClient::WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int WaitSeconds) +{ + // Note: m_HttpClient uses unlimited HTTP timeout (Timeout{0}); the WaitSeconds parameter + // governs the server-side bound on the blocking query. Do not add a separate client timeout. + HttpClient::KeyValueMap Parameters({{"index", std::to_string(InOutIndex)}, {"wait", fmt::format("{}s", WaitSeconds)}}); + HttpClient::Response Result = m_HttpClient.Get("v1/agent/services", {}, Parameters); + if (!Result) + { + return false; + } + + auto IndexIt = Result.Header->find("X-Consul-Index"); + if (IndexIt != Result.Header->end()) + { + std::optional<uint64_t> ParsedIndex = ParseInt<uint64_t>(IndexIt->second); + if (ParsedIndex.has_value()) + { + InOutIndex = ParsedIndex.value(); + } + } + + return FindServiceInJson(Result.AsText(), ServiceId); +} + +std::string +ConsulClient::GetAgentServicesJson() +{ + HttpClient::Response Result = m_HttpClient.Get("v1/agent/services"); + if (!Result) + { + return "{}"; + } + return Result.ToText(); +} + +////////////////////////////////////////////////////////////////////////// + +ServiceRegistration::ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info) : m_Client(Client), m_Info(Info) +{ + m_RegistrationThread = std::thread(&ServiceRegistration::RegistrationLoop, this); +} + +ServiceRegistration::~ServiceRegistration() +{ + try + { + m_StopEvent.Set(); + + if (m_RegistrationThread.joinable()) + { + m_RegistrationThread.join(); + } + + if (m_IsRegistered.load()) + { + if (!m_Client->DeregisterService(m_Info.ServiceId)) + { + ZEN_WARN("ServiceRegistration: Failed to deregister service '{}' during cleanup", m_Info.ServiceId); + } + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR(":~ServiceRegistration() threw exception: {}", Ex.what()); + } +} + +void +ServiceRegistration::WaitForReadyEvent(int MaxWaitMS) +{ + if (m_IsRegistered.load()) + { + return; + } + m_ReadyWakeupEvent.Wait(MaxWaitMS); +} + +void +ServiceRegistration::RegistrationLoop() +{ + try + { + SetCurrentThreadName("ConsulRegistration"); + + // Exponential backoff delays: 100ms, 200ms, 400ms + constexpr int BackoffDelaysMs[] = {100, 200, 400}; + constexpr int MaxAttempts = 3; + constexpr int RetryIntervalMs = 5000; + + while (true) + { + bool Succeeded = false; + // Try to register with exponential backoff + for (int Attempt = 0; Attempt < MaxAttempts; ++Attempt) + { + if (m_Client->RegisterService(m_Info)) + { + Succeeded = true; + break; + } + + if (m_StopEvent.Wait(BackoffDelaysMs[Attempt])) + { + return; + } + } + + if (Succeeded || m_Client->RegisterService(m_Info)) + { + break; + } + + // All attempts failed, log warning and wait before retrying + ZEN_WARN("ServiceRegistration: Failed to register service '{}' after {} attempts, retrying in {}s", + m_Info.ServiceId, + MaxAttempts + 1, + RetryIntervalMs / 1000); + + // Wait for 5 seconds before retrying; m_StopEvent.Set() in the destructor + // wakes this immediately so the thread exits without polling. + if (m_StopEvent.Wait(RetryIntervalMs)) + { + return; + } + } + + ZEN_INFO("ServiceRegistration: Successfully registered service '{}'", m_Info.ServiceId); + m_IsRegistered.store(true); + m_ReadyWakeupEvent.Set(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("ServiceRegistration::RegistrationLoop failed with exception: {}", Ex.what()); + } +} + } // namespace zen::consul diff --git a/src/zenutil/include/zenutil/consul.h b/src/zenutil/include/zenutil/consul.h index 08871fa66..b5bfa42d1 100644 --- a/src/zenutil/include/zenutil/consul.h +++ b/src/zenutil/include/zenutil/consul.h @@ -5,11 +5,26 @@ #include <zenbase/zenbase.h> #include <zenhttp/httpclient.h> +#include <atomic> +#include <cstdint> #include <string> #include <string_view> +#include <thread> namespace zen::consul { +struct ServiceRegistrationInfo +{ + std::string ServiceId; + std::string ServiceName; + std::string Address; + uint16_t Port = 0; + std::string HealthEndpoint; + std::vector<std::pair<std::string, std::string>> Tags; + int HealthIntervalSeconds = 10; + int DeregisterAfterSeconds = 30; +}; + class ConsulClient { public: @@ -23,7 +38,22 @@ public: std::string GetKeyValue(std::string_view Key); void DeleteKey(std::string_view Key); + bool RegisterService(const ServiceRegistrationInfo& Info); + bool DeregisterService(std::string_view ServiceId); + + // Query methods for testing + bool HasService(std::string_view ServiceId); + std::string GetAgentServicesJson(); + + // Blocking query on v1/agent/services. Blocks until the service list changes or + // the wait period expires. InOutIndex must be 0 for the first call; it is updated + // to the new X-Consul-Index value on success (left unchanged on error). + // Returns true if ServiceId is present in the response. + bool WatchService(std::string_view ServiceId, uint64_t& InOutIndex, int WaitSeconds); + private: + static bool FindServiceInJson(std::string_view Json, std::string_view ServiceId); + HttpClient m_HttpClient; }; @@ -44,4 +74,36 @@ private: std::unique_ptr<Impl> m_Impl; }; +/** + * RAII wrapper for Consul service registration. + * + * Manages the lifecycle of a Consul service registration with: + * - Async registration in a background thread + * - Exponential backoff retry on failure (100ms, 200ms, 400ms) + * - Continuous background retry every 5 seconds after initial failures until registration succeeds + * - Automatic deregistration on destruction + */ +class ServiceRegistration +{ +public: + ServiceRegistration(ConsulClient* Client, const ServiceRegistrationInfo& Info); + ~ServiceRegistration(); + + ServiceRegistration(const ServiceRegistration&) = delete; + ServiceRegistration& operator=(const ServiceRegistration&) = delete; + + bool IsRegistered() const { return m_IsRegistered.load(); } + void WaitForReadyEvent(int MaxWaitMS); + +private: + ConsulClient* m_Client; + ServiceRegistrationInfo m_Info; + std::atomic<bool> m_IsRegistered{false}; + std::thread m_RegistrationThread; + Event m_ReadyWakeupEvent; + Event m_StopEvent; + + void RegistrationLoop(); +}; + } // namespace zen::consul diff --git a/src/zenutil/include/zenutil/zenserverprocess.h b/src/zenutil/include/zenutil/zenserverprocess.h index 2a8617162..1b8750628 100644 --- a/src/zenutil/include/zenutil/zenserverprocess.h +++ b/src/zenutil/include/zenutil/zenserverprocess.h @@ -29,9 +29,34 @@ class CbObject; class ZenServerEnvironment { public: + enum EStorageTag + { + Storage + }; + enum EHubTag + { + Hub + }; + enum ETestTag + { + Test + }; + ZenServerEnvironment(); ~ZenServerEnvironment(); + ZenServerEnvironment(ZenServerEnvironment&&); + + ZenServerEnvironment(EStorageTag, std::filesystem::path ProgramBaseDir); + ZenServerEnvironment(EHubTag, + std::filesystem::path ProgramBaseDir, + std::filesystem::path TestBaseDir, + std::string_view ServerClass = ""); + ZenServerEnvironment(ETestTag, + std::filesystem::path ProgramBaseDir, + std::filesystem::path TestBaseDir, + std::string_view ServerClass = ""); + void Initialize(std::filesystem::path ProgramBaseDir); void InitializeForTest(std::filesystem::path ProgramBaseDir, std::filesystem::path TestBaseDir, std::string_view ServerClass = ""); void InitializeForHub(std::filesystem::path ProgramBaseDir, std::filesystem::path TestBaseDir, std::string_view ServerClass = ""); diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index b09c2d89a..b9c50be4f 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -503,6 +503,39 @@ ZenServerEnvironment::~ZenServerEnvironment() { } +ZenServerEnvironment::ZenServerEnvironment(ZenServerEnvironment&& Other) +: m_ProgramBaseDir(std::move(Other.m_ProgramBaseDir)) +, m_ChildProcessBaseDir(std::move(Other.m_ChildProcessBaseDir)) +, m_IsInitialized(Other.m_IsInitialized) +, m_IsTestInstance(Other.m_IsTestInstance) +, m_IsHubInstance(Other.m_IsHubInstance) +, m_PassthroughOutput(Other.m_PassthroughOutput) +, m_ServerClass(std::move(Other.m_ServerClass)) +, m_NextPortNumber(Other.m_NextPortNumber.load()) +{ +} + +ZenServerEnvironment::ZenServerEnvironment(EStorageTag, std::filesystem::path ProgramBaseDir) +{ + Initialize(ProgramBaseDir); +} + +ZenServerEnvironment::ZenServerEnvironment(EHubTag, + std::filesystem::path ProgramBaseDir, + std::filesystem::path TestBaseDir, + std::string_view ServerClass) +{ + InitializeForHub(ProgramBaseDir, TestBaseDir, ServerClass); +} + +ZenServerEnvironment::ZenServerEnvironment(ETestTag, + std::filesystem::path ProgramBaseDir, + std::filesystem::path TestBaseDir, + std::string_view ServerClass) +{ + InitializeForTest(ProgramBaseDir, TestBaseDir, ServerClass); +} + void ZenServerEnvironment::Initialize(std::filesystem::path ProgramBaseDir) { |