diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-23 23:52:27 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-23 23:52:27 +0100 |
| commit | 658a5fea740d97033cd12aa37bd6ecd32b15a924 (patch) | |
| tree | 517f69ebeb87fc2e995eb6d158acd3d17bc25cca /src/zenserver/hub/hub.cpp | |
| parent | Cross-platform process metrics support (#887) (diff) | |
| download | zen-658a5fea740d97033cd12aa37bd6ecd32b15a924.tar.xz zen-658a5fea740d97033cd12aa37bd6ecd32b15a924.zip | |
refactor hub notifications (#888)
* refactor hub callbacks
* improve http responses
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 353 |
1 files changed, 199 insertions, 154 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index e8487d7d9..6a2609443 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -122,14 +122,10 @@ private: ////////////////////////////////////////////////////////////////////////// -Hub::Hub(const Configuration& Config, - ZenServerEnvironment&& RunEnvironment, - ProvisionModuleCallbackFunc&& ProvisionedModuleCallback, - ProvisionModuleCallbackFunc&& DeprovisionedModuleCallback) +Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) : m_Config(Config) , m_RunEnvironment(std::move(RunEnvironment)) -, m_ProvisionedModuleCallback(std::move(ProvisionedModuleCallback)) -, m_DeprovisionedModuleCallback(std::move(DeprovisionedModuleCallback)) +, m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback)) { m_HostMetrics = GetSystemMetrics(); m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; @@ -178,51 +174,59 @@ Hub::~Hub() { try { - ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); + // Safety call - should normally be properly Shutdown by owner + Shutdown(); + } + catch (const std::exception& e) + { + ZEN_WARN("Exception during hub service shutdown: {}", e.what()); + } +} - m_WatchDogEvent.Set(); - if (m_WatchDog.joinable()) - { - m_WatchDog.join(); - } +void +Hub::Shutdown() +{ + ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); + + m_WatchDogEvent.Set(); + if (m_WatchDog.joinable()) + { + m_WatchDog.join(); + } - m_WatchDog = {}; + m_WatchDog = {}; - // WatchDog has been joined; no concurrent access is possible - m_Lock.WithExclusiveLock([this] { - for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup) + m_Lock.WithExclusiveLock([this] { + for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup) + { + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; { - std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; - { - StorageServerInstance::ExclusiveLockedPtr Instance(InstanceRaw->LockExclusive(/*Wait*/ true)); + StorageServerInstance::ExclusiveLockedPtr Instance(InstanceRaw->LockExclusive(/*Wait*/ true)); - uint16_t BasePort = InstanceRaw->GetBasePort(); - std::string BaseUri; // TODO? + uint16_t BasePort = Instance.GetBasePort(); + std::string BaseUri; // TODO? + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); - if (m_DeprovisionedModuleCallback) - { - try - { - m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort}); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); - } - } - Instance.Deprovision(); + try + { + (void)Instance.Deprovision(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what()); } - InstanceRaw.reset(); + // Instance is being destroyed; always report Unprovisioned so callbacks (e.g. Consul) fire. + NewState = HubInstanceState::Unprovisioned; + Instance = {}; } - m_InstanceLookup.clear(); - m_ActiveInstances.clear(); - m_FreeActiveInstanceIndexes.clear(); - }); - } - catch (const std::exception& e) - { - ZEN_WARN("Exception during hub service shutdown: {}", e.what()); - } + InstanceRaw.reset(); + } + m_InstanceLookup.clear(); + m_ActiveInstances.clear(); + m_FreeActiveInstanceIndexes.clear(); + }); } bool @@ -324,6 +328,12 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s ZEN_ASSERT(Instance); + uint16_t BasePort = Instance.GetBasePort(); + std::string BaseUri; // TODO? + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); + auto RemoveProvisioningModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_ProvisioningModules.erase(std::string(ModuleId)); @@ -341,14 +351,15 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s try { - Instance.Provision(); + (void)Instance.Provision(); // false = already in target state (idempotent); not an error + NewState = Instance.GetState(); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); - Instance = {}; - OutReason = Ex.what(); + NewState = Instance.GetState(); + Instance = {}; if (IsNewInstance) { @@ -370,31 +381,20 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s try { DestroyInstance.reset(); + NewState = HubInstanceState::Unprovisioned; } - catch (const std::exception& Ex) + catch (const std::exception& DestroyEx) { - ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, Ex.what()); + ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what()); } } - return false; + throw; } OutReason.clear(); OutInfo.Port = AllocatedPort; // TODO: base URI? Would need to know what host name / IP to use - if (m_ProvisionedModuleCallback) - { - try - { - m_ProvisionedModuleCallback(ModuleId, OutInfo); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Provision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); - } - } - return true; } @@ -447,20 +447,11 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) ZEN_ASSERT(RawInstance); ZEN_ASSERT(Instance); - uint16_t BasePort = RawInstance->GetBasePort(); - std::string BaseUri; // TODO? - - if (m_DeprovisionedModuleCallback) - { - try - { - m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort}); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); - } - } + uint16_t BasePort = Instance.GetBasePort(); + std::string BaseUri; // TODO? + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); // The module is deprovisioned outside the hub lock to avoid blocking other operations. // @@ -476,8 +467,19 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) } }); - Instance.Deprovision(); - + try + { + (void)Instance.Deprovision(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to deprovision storage server instance for module '{}': {}", ModuleId, Ex.what()); + // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly. + NewState = HubInstanceState::Unprovisioned; + Instance = {}; + throw; + } + NewState = Instance.GetState(); Instance = {}; OutReason.clear(); @@ -513,6 +515,11 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason) } ZEN_ASSERT(Instance); + uint16_t BasePort = Instance.GetBasePort(); + std::string BaseUri; // TODO? + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); auto RemoveHibernatingModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); @@ -527,19 +534,22 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason) if (!Instance.Hibernate()) { OutReason = fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState())); + NewState = Instance.GetState(); return false; } + NewState = Instance.GetState(); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", ModuleId, Ex.what()); - Instance = {}; - OutReason = Ex.what(); - return false; + NewState = Instance.GetState(); + Instance = {}; + throw; } OutReason.clear(); + return true; } @@ -573,6 +583,12 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason) ZEN_ASSERT(Instance); + uint16_t BasePort = Instance.GetBasePort(); + std::string BaseUri; // TODO? + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); + auto RemoveWakingModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_WakingModules.erase(ModuleId); @@ -586,19 +602,22 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason) if (!Instance.Wake()) { OutReason = fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState())); + NewState = Instance.GetState(); return false; } + NewState = Instance.GetState(); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", ModuleId, Ex.what()); - Instance = {}; - OutReason = Ex.what(); - return false; + NewState = Instance.GetState(); + Instance = {}; + throw; } OutReason.clear(); + return true; } @@ -751,6 +770,12 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) ZEN_ASSERT(Instance); + uint16_t BasePort = Instance.GetBasePort(); + std::string BaseUri; // TODO? + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); + auto RemoveRecoveringModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_RecoveringModules.erase(std::string(ModuleId)); @@ -762,30 +787,24 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) return; } - const uint16_t Port = RawInstance->GetBasePort(); - std::string BaseUri; // TODO? - if (Instance.RecoverFromCrash()) { + NewState = Instance.GetState(); Instance = {}; return; } // Restart threw but data dir is intact - run Dehydrate via Deprovision before cleanup. - Instance.Deprovision(); - Instance = {}; - - if (m_DeprovisionedModuleCallback) + try { - try - { - m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = Port}); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Deprovision callback for recovered module {} failed. Reason: '{}'", ModuleId, Ex.what()); - } + (void)Instance.Deprovision(); } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what()); + } + NewState = Instance.GetState(); + Instance = {}; std::unique_ptr<StorageServerInstance> DestroyInstance; { @@ -798,7 +817,7 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(It); } - m_FreePorts.push_back(Port); + m_FreePorts.push_back(BasePort); m_RecoveringModules.erase(std::string(ModuleId)); } RemoveRecoveringModule.Dismiss(); @@ -806,6 +825,7 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) try { DestroyInstance.reset(); + NewState = HubInstanceState::Unprovisioned; } catch (const std::exception& Ex) { @@ -884,6 +904,29 @@ Hub::WatchDog() } } +void +Hub::OnStateUpdate(std::string_view ModuleId, + HubInstanceState OldState, + HubInstanceState& NewState, + uint16_t BasePort, + std::string_view BaseUri) +{ + if (m_ModuleStateChangeCallback && OldState != NewState) + { + try + { + m_ModuleStateChangeCallback(ModuleId, + HubProvisionedInstanceInfo{.BaseUri = std::string(BaseUri), .Port = BasePort}, + OldState, + NewState); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Module state change callback for module '{}' failed. Reason: '{}'", ModuleId, Ex.what()); + } + } +} + #if ZEN_WITH_TESTS TEST_SUITE_BEGIN("server.hub"); @@ -897,14 +940,44 @@ namespace hub_testutils { return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir); } - std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir, - Hub::Configuration Config = {}, - Hub::ProvisionModuleCallbackFunc ProvisionCallback = {}, - Hub::ProvisionModuleCallbackFunc DeprovisionCallback = {}) + std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir, + Hub::Configuration Config = {}, + Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {}) { - return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(ProvisionCallback), std::move(DeprovisionCallback)); + return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback)); } + struct CallbackRecord + { + std::string ModuleId; + uint16_t Port; + }; + + struct StateChangeCapture + { + RwLock CallbackMutex; + std::vector<CallbackRecord> ProvisionCallbacks; + std::vector<CallbackRecord> DeprovisionCallbacks; + + auto CaptureFunc() + { + return [this](std::string_view ModuleId, + const HubProvisionedInstanceInfo& Info, + HubInstanceState PreviousState, + HubInstanceState NewState) { + ZEN_UNUSED(PreviousState); + if (NewState == HubInstanceState::Provisioned) + { + CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); + } + else if (NewState == HubInstanceState::Unprovisioned) + { + CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); + } + }; + } + }; + } // namespace hub_testutils TEST_CASE("hub.provision_basic") @@ -998,23 +1071,9 @@ TEST_CASE("hub.provision_callbacks") { ScopedTemporaryDirectory TempDir; - struct CallbackRecord - { - std::string ModuleId; - uint16_t Port; - }; - RwLock CallbackMutex; - std::vector<CallbackRecord> ProvisionRecords; - std::vector<CallbackRecord> DeprovisionRecords; - - auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { - CallbackMutex.WithExclusiveLock([&]() { ProvisionRecords.push_back({std::string(ModuleId), Info.Port}); }); - }; - auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { - CallbackMutex.WithExclusiveLock([&]() { DeprovisionRecords.push_back({std::string(ModuleId), Info.Port}); }); - }; + hub_testutils::StateChangeCapture CaptureInstance; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(ProvisionCb), std::move(DeprovisionCb)); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, CaptureInstance.CaptureFunc()); HubProvisionedInstanceInfo Info; std::string Reason; @@ -1023,11 +1082,11 @@ TEST_CASE("hub.provision_callbacks") REQUIRE_MESSAGE(ProvisionResult, Reason); { - RwLock::SharedLockScope _(CallbackMutex); - REQUIRE_EQ(ProvisionRecords.size(), 1u); - CHECK_EQ(ProvisionRecords[0].ModuleId, "cb_module"); - CHECK_EQ(ProvisionRecords[0].Port, Info.Port); - CHECK_NE(ProvisionRecords[0].Port, 0); + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u); + CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "cb_module"); + CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port); + CHECK_NE(CaptureInstance.ProvisionCallbacks[0].Port, 0); } { @@ -1044,11 +1103,11 @@ TEST_CASE("hub.provision_callbacks") } { - RwLock::SharedLockScope _(CallbackMutex); - REQUIRE_EQ(DeprovisionRecords.size(), 1u); - CHECK_EQ(DeprovisionRecords[0].ModuleId, "cb_module"); - CHECK_NE(DeprovisionRecords[0].Port, 0); - CHECK_EQ(ProvisionRecords.size(), 1u); + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); + CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "cb_module"); + CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port); + CHECK_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); } } @@ -1158,23 +1217,9 @@ TEST_CASE("hub.concurrent_callbacks") Config.BasePortNumber = 22300; Config.InstanceLimit = 10; - struct CallbackRecord - { - std::string ModuleId; - uint16_t Port; - }; - RwLock CallbackMutex; - std::vector<CallbackRecord> ProvisionCallbacks; - std::vector<CallbackRecord> DeprovisionCallbacks; - - auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { - CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); - }; - auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { - CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); - }; + hub_testutils::StateChangeCapture CaptureInstance; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, std::move(ProvisionCb), std::move(DeprovisionCb)); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, CaptureInstance.CaptureFunc()); constexpr int kHalf = 3; @@ -1189,9 +1234,9 @@ TEST_CASE("hub.concurrent_callbacks") CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); { - RwLock::ExclusiveLockScope _(CallbackMutex); - REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); - ProvisionCallbacks.clear(); + RwLock::ExclusiveLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); + CaptureInstance.ProvisionCallbacks.clear(); } // Concurrently provision kHalf new modules while deprovisioning the pre-provisioned ones. @@ -1248,17 +1293,17 @@ TEST_CASE("hub.concurrent_callbacks") // Each new_* module must have triggered exactly one provision callback with a non-zero port. // Each pre_* module must have triggered exactly one deprovision callback with a non-zero port. { - RwLock::SharedLockScope _(CallbackMutex); - REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); - REQUIRE_EQ(DeprovisionCallbacks.size(), static_cast<size_t>(kHalf)); + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); + REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), static_cast<size_t>(kHalf)); - for (const CallbackRecord& Record : ProvisionCallbacks) + for (const hub_testutils::CallbackRecord& Record : CaptureInstance.ProvisionCallbacks) { CHECK_NE(Record.Port, 0); const bool IsNewModule = Record.ModuleId.rfind("new_", 0) == 0; CHECK_MESSAGE(IsNewModule, Record.ModuleId); } - for (const CallbackRecord& Record : DeprovisionCallbacks) + for (const hub_testutils::CallbackRecord& Record : CaptureInstance.DeprovisionCallbacks) { CHECK_NE(Record.Port, 0); const bool IsPreModule = Record.ModuleId.rfind("pre_", 0) == 0; |