diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-21 23:13:34 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-21 23:13:34 +0100 |
| commit | e3388acaca0ce6f1a2d4cb17e535497f2689118a (patch) | |
| tree | 817948a42b57ebd07f31d8317065c2667eddb699 /src/zenserver/hub/hub.cpp | |
| parent | Interprocess pipe support (for stdout/stderr capture) (#866) (diff) | |
| download | zen-e3388acaca0ce6f1a2d4cb17e535497f2689118a.tar.xz zen-e3388acaca0ce6f1a2d4cb17e535497f2689118a.zip | |
zen hub command (#877)
- Feature: Added `zen hub` command for managing a hub server and its provisioned module instances:
- `zen hub up` - Start a hub server (equivalent to `zen up` in hub mode)
- `zen hub down` - Shut down a hub server
- `zen hub provision <moduleid>` - Provision a storage server instance for a module
- `zen hub deprovision <moduleid>` - Deprovision a storage server instance
- `zen hub hibernate <moduleid>` - Hibernate a provisioned instance (shut down, data preserved)
- `zen hub wake <moduleid>` - Wake a hibernated instance
- `zen hub status [moduleid]` - Show state of all instances or a specific module
- Feature: Added new hub HTTP endpoints for instance lifecycle management:
- `POST /hub/modules/{moduleid}/hibernate` - Hibernate the instance for the given module
- `POST /hub/modules/{moduleid}/wake` - Wake a hibernated instance for the given module
- Improvement: `zen up` refactored to use shared `StartupZenServer`/`ShutdownZenServer` helpers (also used by `zen hub up`/`zen hub down`)
- Bugfix: Fixed shutdown event not being cleared after the server process exits in `ZenServerInstance::Shutdown()`, which could cause stale state on reuse
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 372 |
1 files changed, 244 insertions, 128 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index b0208db1f..54f45e511 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -340,7 +340,8 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s catch (const std::exception& Ex) { ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); - Instance = {}; + Instance = {}; + OutReason = Ex.what(); if (IsNewInstance) { @@ -371,6 +372,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s return false; } + OutReason.clear(); OutInfo.Port = AllocatedPort; // TODO: base URI? Would need to know what host name / IP to use @@ -398,7 +400,7 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) { RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_ProvisioningModules.find(ModuleId); It != m_ProvisioningModules.end()) + if (m_ProvisioningModules.contains(ModuleId)) { OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId); @@ -411,7 +413,7 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) { ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); - // Not found, OutReason left empty + OutReason.clear(); // empty = not found (-> 404) return false; } else @@ -463,7 +465,126 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) Instance.Deprovision(); Instance = {}; + OutReason.clear(); + + return true; +} + +bool +Hub::Hibernate(const std::string& ModuleId, std::string& OutReason) +{ + StorageServerInstance::ExclusiveLockedPtr Instance; + + { + RwLock::ExclusiveLockScope _(m_Lock); + + if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) || + m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId)) + { + OutReason = fmt::format("Module '{}' is currently changing state", ModuleId); + return false; + } + + auto It = m_InstanceLookup.find(ModuleId); + if (It == m_InstanceLookup.end()) + { + OutReason.clear(); // empty = not found (-> 404) + return false; + } + + const size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true); + m_HibernatingModules.emplace(ModuleId); + } + + ZEN_ASSERT(Instance); + + auto RemoveHibernatingModule = MakeGuard([&] { + RwLock::ExclusiveLockScope _(m_Lock); + m_HibernatingModules.erase(ModuleId); + }); + + // NOTE: done while not holding the hub lock, as hibernation may take time. + // m_HibernatingModules tracks which modules are being hibernated, blocking + // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + try + { + if (!Instance.Hibernate()) + { + OutReason = fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState())); + return false; + } + Instance = {}; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", ModuleId, Ex.what()); + Instance = {}; + OutReason = Ex.what(); + return false; + } + OutReason.clear(); + return true; +} + +bool +Hub::Wake(const std::string& ModuleId, std::string& OutReason) +{ + StorageServerInstance::ExclusiveLockedPtr Instance; + + { + RwLock::ExclusiveLockScope _(m_Lock); + + if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) || + m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId)) + { + OutReason = fmt::format("Module '{}' is currently changing state", ModuleId); + return false; + } + + auto It = m_InstanceLookup.find(ModuleId); + if (It == m_InstanceLookup.end()) + { + OutReason.clear(); // empty = not found (-> 404) + return false; + } + + const size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true); + m_WakingModules.emplace(ModuleId); + } + + ZEN_ASSERT(Instance); + + auto RemoveWakingModule = MakeGuard([&] { + RwLock::ExclusiveLockScope _(m_Lock); + m_WakingModules.erase(ModuleId); + }); + + // NOTE: done while not holding the hub lock, as waking may take time. + // m_WakingModules tracks which modules are being woken, blocking + // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + try + { + if (!Instance.Wake()) + { + OutReason = fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState())); + return false; + } + Instance = {}; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", ModuleId, Ex.what()); + Instance = {}; + OutReason = Ex.what(); + return false; + } + + OutReason.clear(); return true; } @@ -547,14 +668,14 @@ Hub::UpdateStats() bool Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) { - if (m_DeprovisioningModules.find(std::string(ModuleId)) != m_DeprovisioningModules.end()) + if (m_DeprovisioningModules.contains(std::string(ModuleId))) { OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId); return false; } - if (m_ProvisioningModules.find(std::string(ModuleId)) != m_ProvisioningModules.end()) + if (m_ProvisioningModules.contains(std::string(ModuleId))) { OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId); @@ -632,10 +753,10 @@ Hub::WatchDog() } else if (LockedInstance.GetState() == HubInstanceState::Provisioned) { - // Process is not running but state says it should be — instance died unexpectedly. + // Process is not running but state says it should be - instance died unexpectedly. // TODO: Track and attempt recovery. } - // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking) — expected, skip. + // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking) - expected, skip. LockedInstance = {}; } } @@ -652,6 +773,8 @@ Hub::WatchDog() TEST_SUITE_BEGIN("server.hub"); +static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::milliseconds(200)}; + namespace hub_testutils { ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir) @@ -681,16 +804,28 @@ TEST_CASE("hub.provision_basic") std::string Reason; const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason); REQUIRE_MESSAGE(ProvisionResult, Reason); + CHECK(Reason.empty()); CHECK_NE(Info.Port, 0); CHECK_EQ(HubInstance->GetInstanceCount(), 1); Hub::InstanceInfo InstanceInfo; REQUIRE(HubInstance->Find("module_a", &InstanceInfo)); CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned); + { + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason); CHECK(DeprovisionResult); + CHECK(Reason.empty()); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); + + { + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } } TEST_CASE("hub.provision_config") @@ -728,10 +863,20 @@ TEST_CASE("hub.provision_config") CHECK(TestResponse.IsSuccess()); CHECK(TestResponse.AsObject()["ok"].AsBool()); + { + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason); CHECK(DeprovisionResult); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); + + { + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } } TEST_CASE("hub.provision_callbacks") @@ -770,10 +915,20 @@ TEST_CASE("hub.provision_callbacks") CHECK_NE(ProvisionRecords[0].Port, 0); } + { + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + const bool DeprovisionResult = HubInstance->Deprovision("cb_module", Reason); CHECK(DeprovisionResult); { + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } + + { RwLock::SharedLockScope _(CallbackMutex); REQUIRE_EQ(DeprovisionRecords.size(), 1u); CHECK_EQ(DeprovisionRecords[0].ModuleId, "cb_module"); @@ -816,18 +971,6 @@ TEST_CASE("hub.instance_limit") CHECK_EQ(HubInstance->GetInstanceCount(), 2); } -TEST_CASE("hub.deprovision_nonexistent") -{ - ScopedTemporaryDirectory TempDir; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); - - std::string Reason; - const bool Result = HubInstance->Deprovision("never_provisioned", Reason); - CHECK_FALSE(Result); - CHECK(Reason.empty()); - CHECK_EQ(HubInstance->GetInstanceCount(), 0); -} - TEST_CASE("hub.enumerate_modules") { ScopedTemporaryDirectory TempDir; @@ -840,11 +983,16 @@ TEST_CASE("hub.enumerate_modules") REQUIRE_MESSAGE(HubInstance->Provision("enum_b", Info, Reason), Reason); std::vector<std::string> Ids; - HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& Info) { + int ProvisionedCount = 0; + HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { Ids.push_back(std::string(ModuleId)); - CHECK_EQ(Info.State, HubInstanceState::Provisioned); + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + ProvisionedCount++; + } }); CHECK_EQ(Ids.size(), 2u); + CHECK_EQ(ProvisionedCount, 2); const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end(); const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end(); CHECK(FoundA); @@ -852,12 +1000,17 @@ TEST_CASE("hub.enumerate_modules") HubInstance->Deprovision("enum_a", Reason); Ids.clear(); - HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& Info) { + ProvisionedCount = 0; + HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { Ids.push_back(std::string(ModuleId)); - CHECK_EQ(Info.State, HubInstanceState::Provisioned); + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + ProvisionedCount++; + } }); REQUIRE_EQ(Ids.size(), 1u); CHECK_EQ(Ids[0], "enum_b"); + CHECK_EQ(ProvisionedCount, 1); } TEST_CASE("hub.max_instance_count") @@ -883,84 +1036,6 @@ TEST_CASE("hub.max_instance_count") CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo); } -TEST_CASE("hub.concurrent") -{ - ScopedTemporaryDirectory TempDir; - Hub::Configuration Config; - Config.BasePortNumber = 22000; - Config.InstanceLimit = 10; - - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); - - constexpr int kHalf = 3; - - // Serially pre-provision kHalf modules - for (int I = 0; I < kHalf; ++I) - { - HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason); - } - CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); - - // Simultaneously: - // Provisioner pool → provisions kHalf new modules ("new_0" .. "new_N") - // Deprovisioner pool → deprovisions the kHalf pre-provisioned modules ("pre_0" .. "pre_N") - // The two pools use distinct OS threads, so provisions and deprovisions are interleaved. - - // Use int rather than bool to avoid std::vector<bool> bitfield packing, - // which would cause data races on concurrent per-index writes. - std::vector<int> ProvisionResults(kHalf, 0); - std::vector<std::string> ProvisionReasons(kHalf); - std::vector<int> DeprovisionResults(kHalf, 0); - - { - WorkerThreadPool Provisioners(kHalf, "hub_test_provisioners"); - WorkerThreadPool Deprovisioneers(kHalf, "hub_test_deprovisioneers"); - - std::vector<std::future<void>> ProvisionFutures(kHalf); - std::vector<std::future<void>> DeprovisionFutures(kHalf); - - for (int I = 0; I < kHalf; ++I) - { - ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] { - HubProvisionedInstanceInfo Info; - std::string Reason; - const bool Result = - HubInstance->Provision(fmt::format("new_{}", I), Info, Reason); - ProvisionResults[I] = Result ? 1 : 0; - ProvisionReasons[I] = Reason; - }), - WorkerThreadPool::EMode::EnableBacklog); - - DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] { - std::string Reason; - const bool Result = - HubInstance->Deprovision(fmt::format("pre_{}", I), Reason); - DeprovisionResults[I] = Result ? 1 : 0; - }), - WorkerThreadPool::EMode::EnableBacklog); - } - - for (std::future<void>& F : ProvisionFutures) - { - F.get(); - } - for (std::future<void>& F : DeprovisionFutures) - { - F.get(); - } - } - - for (int I = 0; I < kHalf; ++I) - { - CHECK_MESSAGE(ProvisionResults[I] != 0, ProvisionReasons[I]); - CHECK(DeprovisionResults[I] != 0); - } - // Only the newly provisioned modules should remain - CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); -} - TEST_CASE("hub.concurrent_callbacks") { ScopedTemporaryDirectory TempDir; @@ -1122,57 +1197,98 @@ TEST_CASE("hub.job_object") } # endif // ZEN_PLATFORM_WINDOWS -TEST_CASE("hub.instance_state_basic") +TEST_CASE("hub.hibernate_wake") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; - Config.BasePortNumber = 22400; + Config.BasePortNumber = 22600; std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision("state_a", ProvInfo, Reason), Reason); + // Provision + REQUIRE_MESSAGE(HubInstance->Provision("hib_a", ProvInfo, Reason), Reason); + CHECK(Reason.empty()); + REQUIRE(HubInstance->Find("hib_a", &Info)); + CHECK_EQ(Info.State, HubInstanceState::Provisioned); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + + // Hibernate + const bool HibernateResult = HubInstance->Hibernate("hib_a", Reason); + REQUIRE_MESSAGE(HibernateResult, Reason); + CHECK(Reason.empty()); + REQUIRE(HubInstance->Find("hib_a", &Info)); + CHECK_EQ(Info.State, HubInstanceState::Hibernated); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } - REQUIRE(HubInstance->Find("state_a", &Info)); + // Wake + const bool WakeResult = HubInstance->Wake("hib_a", Reason); + REQUIRE_MESSAGE(WakeResult, Reason); + CHECK(Reason.empty()); + REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } - HubInstance->Deprovision("state_a", Reason); - CHECK_FALSE(HubInstance->Find("state_a")); + // Deprovision + const bool DeprovisionResult = HubInstance->Deprovision("hib_a", Reason); + CHECK(DeprovisionResult); + CHECK(Reason.empty()); + CHECK_FALSE(HubInstance->Find("hib_a")); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } } -TEST_CASE("hub.instance_state_enumerate") +TEST_CASE("hub.hibernate_wake_errors") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; - Config.BasePortNumber = 22500; + Config.BasePortNumber = 22700; std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo ProvInfo; std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision("estate_a", ProvInfo, Reason), Reason); - REQUIRE_MESSAGE(HubInstance->Provision("estate_b", ProvInfo, Reason), Reason); - int ProvisionedCount = 0; - HubInstance->EnumerateModules([&](std::string_view, const Hub::InstanceInfo& InstanceInfo) { - if (InstanceInfo.State == HubInstanceState::Provisioned) - { - ProvisionedCount++; - } - }); - CHECK_EQ(ProvisionedCount, 2); + // Hibernate/wake on a non-existent module - should return false with empty reason (-> 404) + CHECK_FALSE(HubInstance->Hibernate("never_provisioned", Reason)); + CHECK(Reason.empty()); - HubInstance->Deprovision("estate_a", Reason); + CHECK_FALSE(HubInstance->Wake("never_provisioned", Reason)); + CHECK(Reason.empty()); - ProvisionedCount = 0; - HubInstance->EnumerateModules([&](std::string_view, const Hub::InstanceInfo& InstanceInfo) { - if (InstanceInfo.State == HubInstanceState::Provisioned) - { - ProvisionedCount++; - } - }); - CHECK_EQ(ProvisionedCount, 1); + // Double-hibernate: first hibernate succeeds, second returns false with non-empty reason (-> 400) + REQUIRE_MESSAGE(HubInstance->Provision("err_b", ProvInfo, Reason), Reason); + CHECK(Reason.empty()); + REQUIRE_MESSAGE(HubInstance->Hibernate("err_b", Reason), Reason); + CHECK(Reason.empty()); + + Reason.clear(); + CHECK_FALSE(HubInstance->Hibernate("err_b", Reason)); + CHECK_FALSE(Reason.empty()); + + // Wake on provisioned: succeeds (-> Provisioned), then wake again returns false (-> 400) + REQUIRE_MESSAGE(HubInstance->Wake("err_b", Reason), Reason); + CHECK(Reason.empty()); + + Reason.clear(); + CHECK_FALSE(HubInstance->Wake("err_b", Reason)); + CHECK_FALSE(Reason.empty()); + + // Deprovision not-found - should return false with empty reason (-> 404) + CHECK_FALSE(HubInstance->Deprovision("never_provisioned", Reason)); + CHECK(Reason.empty()); } TEST_SUITE_END(); |