diff options
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 507 |
1 files changed, 494 insertions, 13 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index f1cc86de6..c9720b32d 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -15,6 +15,13 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <gsl/gsl-lite.hpp> ZEN_THIRD_PARTY_INCLUDES_END +#if ZEN_WITH_TESTS +# include <zencore/filesystem.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +#endif + namespace zen { /////////////////////////////////////////////////////////////////////////// @@ -111,14 +118,19 @@ private: ////////////////////////////////////////////////////////////////////////// -Hub::Hub(const Configuration& Config) : m_UseJobObject(Config.UseJobObject) +Hub::Hub(const Configuration& Config, + ZenServerEnvironment&& RunEnvironment, + ProvisionModuleCallbackFunc&& ProvisionedModuleCallback, + ProvisionModuleCallbackFunc&& DeprovisionedModuleCallback) +: m_Config(Config) +, m_RunEnvironment(std::move(RunEnvironment)) +, m_ProvisionedModuleCallback(std::move(ProvisionedModuleCallback)) +, m_DeprovisionedModuleCallback(std::move(DeprovisionedModuleCallback)) { m_HostMetrics = GetSystemMetrics(); m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024; - m_RunEnvironment.InitializeForHub(Config.HubBaseDir, Config.ChildBaseDir); - m_FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); ZEN_INFO("using file hydration path: '{}'", m_FileHydrationPath); @@ -128,13 +140,10 @@ Hub::Hub(const Configuration& Config) : m_UseJobObject(Config.UseJobObject) // This is necessary to ensure the hub assigns a distinct port range. // We need to do this primarily because otherwise automated tests will // fail as the test runner will create processes in the default range. - // We should probably make this configurable or dynamic for maximum - // flexibility, and to allow running multiple hubs on the same host if - // necessary. - m_RunEnvironment.SetNextPortNumber(21000); + m_RunEnvironment.SetNextPortNumber(m_Config.BasePortNumber); #if ZEN_PLATFORM_WINDOWS - if (m_UseJobObject) + if (m_Config.UseJobObject) { m_JobObject.Initialize(); if (m_JobObject.IsValid()) @@ -158,6 +167,20 @@ Hub::~Hub() m_Lock.WithExclusiveLock([this] { for (auto& [ModuleId, Instance] : m_Instances) { + uint16_t BasePort = Instance->GetBasePort(); + std::string BaseUri; // TODO? + + if (m_DeprovisionedModuleCallback) + { + try + { + m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort}); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); + } + } Instance->Deprovision(); } m_Instances.clear(); @@ -170,7 +193,7 @@ Hub::~Hub() } bool -Hub::Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason) +Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason) { StorageServerInstance* Instance = nullptr; bool IsNewInstance = false; @@ -241,9 +264,20 @@ Hub::Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std: } OutInfo.Port = Instance->GetBasePort(); - // TODO: base URI? Would need to know what host name / IP to use + if (m_ProvisionedModuleCallback) + { + try + { + m_ProvisionedModuleCallback(ModuleId, OutInfo); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Provision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); + } + } + return true; } @@ -279,6 +313,21 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) } } + uint16_t BasePort = Instance->GetBasePort(); + std::string BaseUri; // TODO? + + if (m_DeprovisionedModuleCallback) + { + try + { + m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort}); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); + } + } + // The module is deprovisioned outside the lock to avoid blocking other operations. // // To ensure that no new provisioning can occur while we're deprovisioning, @@ -362,9 +411,9 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) return false; } - if (gsl::narrow_cast<int>(m_Instances.size()) >= m_InstanceLimit) + if (gsl::narrow_cast<int>(m_Instances.size()) >= m_Config.InstanceLimit) { - OutReason = fmt::format("instance limit exceeded ({})", m_InstanceLimit); + OutReason = fmt::format("instance limit exceeded ({})", m_Config.InstanceLimit); return false; } @@ -374,6 +423,438 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) return true; } -/////////////////////////////////////////////////////////////////////////// +#if ZEN_WITH_TESTS + +TEST_SUITE_BEGIN("server.hub"); + +namespace hub_testutils { + + ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir) + { + return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir); + } + + std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir, + Hub::Configuration Config = {}, + Hub::ProvisionModuleCallbackFunc ProvisionCallback = {}, + Hub::ProvisionModuleCallbackFunc DeprovisionCallback = {}) + { + return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(ProvisionCallback), std::move(DeprovisionCallback)); + } + +} // namespace hub_testutils + +TEST_CASE("hub.provision_basic") +{ + ScopedTemporaryDirectory TempDir; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + CHECK_FALSE(HubInstance->Find("module_a")); + + HubProvisionedInstanceInfo Info; + std::string Reason; + const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason); + REQUIRE_MESSAGE(ProvisionResult, Reason); + CHECK_NE(Info.Port, 0); + CHECK_EQ(HubInstance->GetInstanceCount(), 1); + CHECK(HubInstance->Find("module_a")); + + const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason); + CHECK(DeprovisionResult); + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + CHECK_FALSE(HubInstance->Find("module_a")); +} + +TEST_CASE("hub.provision_callbacks") +{ + ScopedTemporaryDirectory TempDir; + + struct CallbackRecord + { + std::string ModuleId; + uint16_t Port; + }; + RwLock CallbackMutex; + std::vector<CallbackRecord> ProvisionRecords; + std::vector<CallbackRecord> DeprovisionRecords; + + auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { + CallbackMutex.WithExclusiveLock([&]() { ProvisionRecords.push_back({std::string(ModuleId), Info.Port}); }); + }; + auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { + CallbackMutex.WithExclusiveLock([&]() { DeprovisionRecords.push_back({std::string(ModuleId), Info.Port}); }); + }; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(ProvisionCb), std::move(DeprovisionCb)); + + HubProvisionedInstanceInfo Info; + std::string Reason; + + const bool ProvisionResult = HubInstance->Provision("cb_module", Info, Reason); + REQUIRE_MESSAGE(ProvisionResult, Reason); + + { + RwLock::SharedLockScope _(CallbackMutex); + REQUIRE_EQ(ProvisionRecords.size(), 1u); + CHECK_EQ(ProvisionRecords[0].ModuleId, "cb_module"); + CHECK_EQ(ProvisionRecords[0].Port, Info.Port); + CHECK_NE(ProvisionRecords[0].Port, 0); + } + + const bool DeprovisionResult = HubInstance->Deprovision("cb_module", Reason); + CHECK(DeprovisionResult); + + { + RwLock::SharedLockScope _(CallbackMutex); + REQUIRE_EQ(DeprovisionRecords.size(), 1u); + CHECK_EQ(DeprovisionRecords[0].ModuleId, "cb_module"); + CHECK_NE(DeprovisionRecords[0].Port, 0); + CHECK_EQ(ProvisionRecords.size(), 1u); + } +} + +TEST_CASE("hub.instance_limit") +{ + ScopedTemporaryDirectory TempDir; + Hub::Configuration Config; + Config.InstanceLimit = 2; + Config.BasePortNumber = 21500; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + + HubProvisionedInstanceInfo Info; + std::string Reason; + + const bool FirstResult = HubInstance->Provision("limit_a", Info, Reason); + REQUIRE_MESSAGE(FirstResult, Reason); + + const bool SecondResult = HubInstance->Provision("limit_b", Info, Reason); + REQUIRE_MESSAGE(SecondResult, Reason); + CHECK_EQ(HubInstance->GetInstanceCount(), 2); + + Reason.clear(); + const bool ThirdResult = HubInstance->Provision("limit_c", Info, Reason); + CHECK_FALSE(ThirdResult); + CHECK_EQ(HubInstance->GetInstanceCount(), 2); + CHECK_NE(Reason.find("instance limit"), std::string::npos); + + HubInstance->Deprovision("limit_a", Reason); + CHECK_EQ(HubInstance->GetInstanceCount(), 1); + + Reason.clear(); + const bool FourthResult = HubInstance->Provision("limit_d", Info, Reason); + CHECK_MESSAGE(FourthResult, Reason); + CHECK_EQ(HubInstance->GetInstanceCount(), 2); +} + +TEST_CASE("hub.deprovision_nonexistent") +{ + ScopedTemporaryDirectory TempDir; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + std::string Reason; + const bool Result = HubInstance->Deprovision("never_provisioned", Reason); + CHECK_FALSE(Result); + CHECK(Reason.empty()); + CHECK_EQ(HubInstance->GetInstanceCount(), 0); +} + +TEST_CASE("hub.enumerate_modules") +{ + ScopedTemporaryDirectory TempDir; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + HubProvisionedInstanceInfo Info; + std::string Reason; + + REQUIRE_MESSAGE(HubInstance->Provision("enum_a", Info, Reason), Reason); + REQUIRE_MESSAGE(HubInstance->Provision("enum_b", Info, Reason), Reason); + + std::vector<std::string> Ids; + HubInstance->EnumerateModules([&](StorageServerInstance& Instance) { Ids.push_back(std::string(Instance.GetModuleId())); }); + CHECK_EQ(Ids.size(), 2u); + const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end(); + const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end(); + CHECK(FoundA); + CHECK(FoundB); + + HubInstance->Deprovision("enum_a", Reason); + Ids.clear(); + HubInstance->EnumerateModules([&](StorageServerInstance& Instance) { Ids.push_back(std::string(Instance.GetModuleId())); }); + REQUIRE_EQ(Ids.size(), 1u); + CHECK_EQ(Ids[0], "enum_b"); +} + +TEST_CASE("hub.max_instance_count") +{ + ScopedTemporaryDirectory TempDir; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + CHECK_EQ(HubInstance->GetMaxInstanceCount(), 0); + + HubProvisionedInstanceInfo Info; + std::string Reason; + + REQUIRE_MESSAGE(HubInstance->Provision("max_a", Info, Reason), Reason); + CHECK_GE(HubInstance->GetMaxInstanceCount(), 1); + + REQUIRE_MESSAGE(HubInstance->Provision("max_b", Info, Reason), Reason); + CHECK_GE(HubInstance->GetMaxInstanceCount(), 2); + + const int MaxAfterTwo = HubInstance->GetMaxInstanceCount(); + + HubInstance->Deprovision("max_a", Reason); + CHECK_EQ(HubInstance->GetInstanceCount(), 1); + CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo); +} + +TEST_CASE("hub.concurrent") +{ + ScopedTemporaryDirectory TempDir; + Hub::Configuration Config; + Config.BasePortNumber = 22000; + Config.InstanceLimit = 10; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + + constexpr int kHalf = 3; + + // Serially pre-provision kHalf modules + for (int I = 0; I < kHalf; ++I) + { + HubProvisionedInstanceInfo Info; + std::string Reason; + REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason); + } + CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); + + // Simultaneously: + // Provisioner pool → provisions kHalf new modules ("new_0" .. "new_N") + // Deprovisioner pool → deprovisions the kHalf pre-provisioned modules ("pre_0" .. "pre_N") + // The two pools use distinct OS threads, so provisions and deprovisions are interleaved. + + // Use int rather than bool to avoid std::vector<bool> bitfield packing, + // which would cause data races on concurrent per-index writes. + std::vector<int> ProvisionResults(kHalf, 0); + std::vector<std::string> ProvisionReasons(kHalf); + std::vector<int> DeprovisionResults(kHalf, 0); + + { + WorkerThreadPool Provisioners(kHalf, "hub_test_provisioners"); + WorkerThreadPool Deprovisioneers(kHalf, "hub_test_deprovisioneers"); + + std::vector<std::future<void>> ProvisionFutures(kHalf); + std::vector<std::future<void>> DeprovisionFutures(kHalf); + + for (int I = 0; I < kHalf; ++I) + { + ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] { + HubProvisionedInstanceInfo Info; + std::string Reason; + const bool Result = + HubInstance->Provision(fmt::format("new_{}", I), Info, Reason); + ProvisionResults[I] = Result ? 1 : 0; + ProvisionReasons[I] = Reason; + }), + WorkerThreadPool::EMode::EnableBacklog); + + DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] { + std::string Reason; + const bool Result = + HubInstance->Deprovision(fmt::format("pre_{}", I), Reason); + DeprovisionResults[I] = Result ? 1 : 0; + }), + WorkerThreadPool::EMode::EnableBacklog); + } + + for (std::future<void>& F : ProvisionFutures) + { + F.get(); + } + for (std::future<void>& F : DeprovisionFutures) + { + F.get(); + } + } + + for (int I = 0; I < kHalf; ++I) + { + CHECK_MESSAGE(ProvisionResults[I] != 0, ProvisionReasons[I]); + CHECK(DeprovisionResults[I] != 0); + } + // Only the newly provisioned modules should remain + CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); +} + +TEST_CASE("hub.concurrent_callbacks") +{ + ScopedTemporaryDirectory TempDir; + Hub::Configuration Config; + Config.BasePortNumber = 22300; + Config.InstanceLimit = 10; + + struct CallbackRecord + { + std::string ModuleId; + uint16_t Port; + }; + RwLock CallbackMutex; + std::vector<CallbackRecord> ProvisionCallbacks; + std::vector<CallbackRecord> DeprovisionCallbacks; + + auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { + CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); + }; + auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { + CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); + }; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, std::move(ProvisionCb), std::move(DeprovisionCb)); + + constexpr int kHalf = 3; + + // Serially pre-provision kHalf modules and drain the resulting callbacks before the + // concurrent phase so we start with a clean slate. + for (int I = 0; I < kHalf; ++I) + { + HubProvisionedInstanceInfo Info; + std::string Reason; + REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason); + } + CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); + + { + RwLock::ExclusiveLockScope _(CallbackMutex); + REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); + ProvisionCallbacks.clear(); + } + + // Concurrently provision kHalf new modules while deprovisioning the pre-provisioned ones. + std::vector<int> ProvisionResults(kHalf, 0); + std::vector<std::string> ProvisionReasons(kHalf); + std::vector<int> DeprovisionResults(kHalf, 0); + + { + WorkerThreadPool Provisioners(kHalf, "hub_cbtest_provisioners"); + WorkerThreadPool Deprovisioneers(kHalf, "hub_cbtest_deprovisioneers"); + + std::vector<std::future<void>> ProvisionFutures(kHalf); + std::vector<std::future<void>> DeprovisionFutures(kHalf); + + for (int I = 0; I < kHalf; ++I) + { + ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] { + HubProvisionedInstanceInfo Info; + std::string Reason; + const bool Result = + HubInstance->Provision(fmt::format("new_{}", I), Info, Reason); + ProvisionResults[I] = Result ? 1 : 0; + ProvisionReasons[I] = Reason; + }), + WorkerThreadPool::EMode::EnableBacklog); + + DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] { + std::string Reason; + const bool Result = + HubInstance->Deprovision(fmt::format("pre_{}", I), Reason); + DeprovisionResults[I] = Result ? 1 : 0; + }), + WorkerThreadPool::EMode::EnableBacklog); + } + + for (std::future<void>& F : ProvisionFutures) + { + F.get(); + } + for (std::future<void>& F : DeprovisionFutures) + { + F.get(); + } + } + + // All operations must have succeeded + for (int I = 0; I < kHalf; ++I) + { + CHECK_MESSAGE(ProvisionResults[I] != 0, ProvisionReasons[I]); + CHECK(DeprovisionResults[I] != 0); + } + CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); + + // Each new_* module must have triggered exactly one provision callback with a non-zero port. + // Each pre_* module must have triggered exactly one deprovision callback with a non-zero port. + { + RwLock::SharedLockScope _(CallbackMutex); + REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); + REQUIRE_EQ(DeprovisionCallbacks.size(), static_cast<size_t>(kHalf)); + + for (const CallbackRecord& Record : ProvisionCallbacks) + { + CHECK_NE(Record.Port, 0); + const bool IsNewModule = Record.ModuleId.rfind("new_", 0) == 0; + CHECK_MESSAGE(IsNewModule, Record.ModuleId); + } + for (const CallbackRecord& Record : DeprovisionCallbacks) + { + CHECK_NE(Record.Port, 0); + const bool IsPreModule = Record.ModuleId.rfind("pre_", 0) == 0; + CHECK_MESSAGE(IsPreModule, Record.ModuleId); + } + } +} + +# if ZEN_PLATFORM_WINDOWS +TEST_CASE("hub.job_object") +{ + SUBCASE("UseJobObject=true") + { + ScopedTemporaryDirectory TempDir; + Hub::Configuration Config; + Config.UseJobObject = true; + Config.BasePortNumber = 22100; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + HubProvisionedInstanceInfo Info; + std::string Reason; + + const bool ProvisionResult = HubInstance->Provision("jobobj_a", Info, Reason); + REQUIRE_MESSAGE(ProvisionResult, Reason); + CHECK_NE(Info.Port, 0); + + const bool DeprovisionResult = HubInstance->Deprovision("jobobj_a", Reason); + CHECK(DeprovisionResult); + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + } + + SUBCASE("UseJobObject=false") + { + ScopedTemporaryDirectory TempDir; + Hub::Configuration Config; + Config.UseJobObject = false; + Config.BasePortNumber = 22200; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + HubProvisionedInstanceInfo Info; + std::string Reason; + + const bool ProvisionResult = HubInstance->Provision("nojobobj_a", Info, Reason); + REQUIRE_MESSAGE(ProvisionResult, Reason); + CHECK_NE(Info.Port, 0); + + const bool DeprovisionResult = HubInstance->Deprovision("nojobobj_a", Reason); + CHECK(DeprovisionResult); + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + } +} +# endif // ZEN_PLATFORM_WINDOWS + +TEST_SUITE_END(); + +void +hub_forcelink() +{ +} + +#endif // ZEN_WITH_TESTS } // namespace zen |