diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-23 23:52:27 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-23 23:52:27 +0100 |
| commit | 658a5fea740d97033cd12aa37bd6ecd32b15a924 (patch) | |
| tree | 517f69ebeb87fc2e995eb6d158acd3d17bc25cca /src | |
| parent | Cross-platform process metrics support (#887) (diff) | |
| download | zen-658a5fea740d97033cd12aa37bd6ecd32b15a924.tar.xz zen-658a5fea740d97033cd12aa37bd6ecd32b15a924.zip | |
refactor hub notifications (#888)
* refactor hub callbacks
* improve http responses
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/hub/httphubservice.cpp | 28 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 353 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 55 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.cpp | 79 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.h | 22 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 49 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.h | 8 |
7 files changed, 371 insertions, 223 deletions
diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp index a91e36128..34f4294e4 100644 --- a/src/zenserver/hub/httphubservice.cpp +++ b/src/zenserver/hub/httphubservice.cpp @@ -269,9 +269,6 @@ HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view Mod return; } - // TODO: A separate http request for the modules/{moduleid}/deprovision endpoint can be called and deprovision the instance leaving us - // with a dangling pointer... - CbObjectWriter Obj; Obj << "moduleId" << ModuleId; Obj << "state" << ToString(InstanceInfo.State); @@ -288,12 +285,29 @@ HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view return; } - if (InstanceInfo.State == HubInstanceState::Provisioned) + if (InstanceInfo.State == HubInstanceState::Provisioned || InstanceInfo.State == HubInstanceState::Hibernated || + InstanceInfo.State == HubInstanceState::Crashed) { - std::string Reason; - if (!m_Hub.Deprovision(std::string(ModuleId), Reason)) + std::string FailureReason; + try + { + if (!m_Hub.Deprovision(std::string(ModuleId), FailureReason)) + { + if (FailureReason.empty()) + { + Request.WriteResponse(HttpResponseCode::NotFound); + } + else + { + Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason); + } + return; + } + } + catch (const std::exception& Ex) { - Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, Reason); + ZEN_ERROR("Exception while deprovisioning module '{}': {}", ModuleId, Ex.what()); + Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, Ex.what()); return; } } diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index e8487d7d9..6a2609443 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -122,14 +122,10 @@ private: ////////////////////////////////////////////////////////////////////////// -Hub::Hub(const Configuration& Config, - ZenServerEnvironment&& RunEnvironment, - ProvisionModuleCallbackFunc&& ProvisionedModuleCallback, - ProvisionModuleCallbackFunc&& DeprovisionedModuleCallback) +Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) : m_Config(Config) , m_RunEnvironment(std::move(RunEnvironment)) -, m_ProvisionedModuleCallback(std::move(ProvisionedModuleCallback)) -, m_DeprovisionedModuleCallback(std::move(DeprovisionedModuleCallback)) +, m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback)) { m_HostMetrics = GetSystemMetrics(); m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; @@ -178,51 +174,59 @@ Hub::~Hub() { try { - ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); + // Safety call - should normally be properly Shutdown by owner + Shutdown(); + } + catch (const std::exception& e) + { + ZEN_WARN("Exception during hub service shutdown: {}", e.what()); + } +} - m_WatchDogEvent.Set(); - if (m_WatchDog.joinable()) - { - m_WatchDog.join(); - } +void +Hub::Shutdown() +{ + ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); + + m_WatchDogEvent.Set(); + if (m_WatchDog.joinable()) + { + m_WatchDog.join(); + } - m_WatchDog = {}; + m_WatchDog = {}; - // WatchDog has been joined; no concurrent access is possible - m_Lock.WithExclusiveLock([this] { - for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup) + m_Lock.WithExclusiveLock([this] { + for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup) + { + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; { - std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; - { - StorageServerInstance::ExclusiveLockedPtr Instance(InstanceRaw->LockExclusive(/*Wait*/ true)); + StorageServerInstance::ExclusiveLockedPtr Instance(InstanceRaw->LockExclusive(/*Wait*/ true)); - uint16_t BasePort = InstanceRaw->GetBasePort(); - std::string BaseUri; // TODO? + uint16_t BasePort = Instance.GetBasePort(); + std::string BaseUri; // TODO? + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); - if (m_DeprovisionedModuleCallback) - { - try - { - m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort}); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); - } - } - Instance.Deprovision(); + try + { + (void)Instance.Deprovision(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what()); } - InstanceRaw.reset(); + // Instance is being destroyed; always report Unprovisioned so callbacks (e.g. Consul) fire. + NewState = HubInstanceState::Unprovisioned; + Instance = {}; } - m_InstanceLookup.clear(); - m_ActiveInstances.clear(); - m_FreeActiveInstanceIndexes.clear(); - }); - } - catch (const std::exception& e) - { - ZEN_WARN("Exception during hub service shutdown: {}", e.what()); - } + InstanceRaw.reset(); + } + m_InstanceLookup.clear(); + m_ActiveInstances.clear(); + m_FreeActiveInstanceIndexes.clear(); + }); } bool @@ -324,6 +328,12 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s ZEN_ASSERT(Instance); + uint16_t BasePort = Instance.GetBasePort(); + std::string BaseUri; // TODO? + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); + auto RemoveProvisioningModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_ProvisioningModules.erase(std::string(ModuleId)); @@ -341,14 +351,15 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s try { - Instance.Provision(); + (void)Instance.Provision(); // false = already in target state (idempotent); not an error + NewState = Instance.GetState(); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); - Instance = {}; - OutReason = Ex.what(); + NewState = Instance.GetState(); + Instance = {}; if (IsNewInstance) { @@ -370,31 +381,20 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s try { DestroyInstance.reset(); + NewState = HubInstanceState::Unprovisioned; } - catch (const std::exception& Ex) + catch (const std::exception& DestroyEx) { - ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, Ex.what()); + ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what()); } } - return false; + throw; } OutReason.clear(); OutInfo.Port = AllocatedPort; // TODO: base URI? Would need to know what host name / IP to use - if (m_ProvisionedModuleCallback) - { - try - { - m_ProvisionedModuleCallback(ModuleId, OutInfo); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Provision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); - } - } - return true; } @@ -447,20 +447,11 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) ZEN_ASSERT(RawInstance); ZEN_ASSERT(Instance); - uint16_t BasePort = RawInstance->GetBasePort(); - std::string BaseUri; // TODO? - - if (m_DeprovisionedModuleCallback) - { - try - { - m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort}); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); - } - } + uint16_t BasePort = Instance.GetBasePort(); + std::string BaseUri; // TODO? + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); // The module is deprovisioned outside the hub lock to avoid blocking other operations. // @@ -476,8 +467,19 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) } }); - Instance.Deprovision(); - + try + { + (void)Instance.Deprovision(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to deprovision storage server instance for module '{}': {}", ModuleId, Ex.what()); + // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly. + NewState = HubInstanceState::Unprovisioned; + Instance = {}; + throw; + } + NewState = Instance.GetState(); Instance = {}; OutReason.clear(); @@ -513,6 +515,11 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason) } ZEN_ASSERT(Instance); + uint16_t BasePort = Instance.GetBasePort(); + std::string BaseUri; // TODO? + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); auto RemoveHibernatingModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); @@ -527,19 +534,22 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason) if (!Instance.Hibernate()) { OutReason = fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState())); + NewState = Instance.GetState(); return false; } + NewState = Instance.GetState(); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", ModuleId, Ex.what()); - Instance = {}; - OutReason = Ex.what(); - return false; + NewState = Instance.GetState(); + Instance = {}; + throw; } OutReason.clear(); + return true; } @@ -573,6 +583,12 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason) ZEN_ASSERT(Instance); + uint16_t BasePort = Instance.GetBasePort(); + std::string BaseUri; // TODO? + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); + auto RemoveWakingModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_WakingModules.erase(ModuleId); @@ -586,19 +602,22 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason) if (!Instance.Wake()) { OutReason = fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState())); + NewState = Instance.GetState(); return false; } + NewState = Instance.GetState(); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", ModuleId, Ex.what()); - Instance = {}; - OutReason = Ex.what(); - return false; + NewState = Instance.GetState(); + Instance = {}; + throw; } OutReason.clear(); + return true; } @@ -751,6 +770,12 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) ZEN_ASSERT(Instance); + uint16_t BasePort = Instance.GetBasePort(); + std::string BaseUri; // TODO? + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); + auto RemoveRecoveringModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_RecoveringModules.erase(std::string(ModuleId)); @@ -762,30 +787,24 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) return; } - const uint16_t Port = RawInstance->GetBasePort(); - std::string BaseUri; // TODO? - if (Instance.RecoverFromCrash()) { + NewState = Instance.GetState(); Instance = {}; return; } // Restart threw but data dir is intact - run Dehydrate via Deprovision before cleanup. - Instance.Deprovision(); - Instance = {}; - - if (m_DeprovisionedModuleCallback) + try { - try - { - m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = Port}); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Deprovision callback for recovered module {} failed. Reason: '{}'", ModuleId, Ex.what()); - } + (void)Instance.Deprovision(); } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what()); + } + NewState = Instance.GetState(); + Instance = {}; std::unique_ptr<StorageServerInstance> DestroyInstance; { @@ -798,7 +817,7 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(It); } - m_FreePorts.push_back(Port); + m_FreePorts.push_back(BasePort); m_RecoveringModules.erase(std::string(ModuleId)); } RemoveRecoveringModule.Dismiss(); @@ -806,6 +825,7 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) try { DestroyInstance.reset(); + NewState = HubInstanceState::Unprovisioned; } catch (const std::exception& Ex) { @@ -884,6 +904,29 @@ Hub::WatchDog() } } +void +Hub::OnStateUpdate(std::string_view ModuleId, + HubInstanceState OldState, + HubInstanceState& NewState, + uint16_t BasePort, + std::string_view BaseUri) +{ + if (m_ModuleStateChangeCallback && OldState != NewState) + { + try + { + m_ModuleStateChangeCallback(ModuleId, + HubProvisionedInstanceInfo{.BaseUri = std::string(BaseUri), .Port = BasePort}, + OldState, + NewState); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Module state change callback for module '{}' failed. Reason: '{}'", ModuleId, Ex.what()); + } + } +} + #if ZEN_WITH_TESTS TEST_SUITE_BEGIN("server.hub"); @@ -897,14 +940,44 @@ namespace hub_testutils { return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir); } - std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir, - Hub::Configuration Config = {}, - Hub::ProvisionModuleCallbackFunc ProvisionCallback = {}, - Hub::ProvisionModuleCallbackFunc DeprovisionCallback = {}) + std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir, + Hub::Configuration Config = {}, + Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {}) { - return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(ProvisionCallback), std::move(DeprovisionCallback)); + return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback)); } + struct CallbackRecord + { + std::string ModuleId; + uint16_t Port; + }; + + struct StateChangeCapture + { + RwLock CallbackMutex; + std::vector<CallbackRecord> ProvisionCallbacks; + std::vector<CallbackRecord> DeprovisionCallbacks; + + auto CaptureFunc() + { + return [this](std::string_view ModuleId, + const HubProvisionedInstanceInfo& Info, + HubInstanceState PreviousState, + HubInstanceState NewState) { + ZEN_UNUSED(PreviousState); + if (NewState == HubInstanceState::Provisioned) + { + CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); + } + else if (NewState == HubInstanceState::Unprovisioned) + { + CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); + } + }; + } + }; + } // namespace hub_testutils TEST_CASE("hub.provision_basic") @@ -998,23 +1071,9 @@ TEST_CASE("hub.provision_callbacks") { ScopedTemporaryDirectory TempDir; - struct CallbackRecord - { - std::string ModuleId; - uint16_t Port; - }; - RwLock CallbackMutex; - std::vector<CallbackRecord> ProvisionRecords; - std::vector<CallbackRecord> DeprovisionRecords; - - auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { - CallbackMutex.WithExclusiveLock([&]() { ProvisionRecords.push_back({std::string(ModuleId), Info.Port}); }); - }; - auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { - CallbackMutex.WithExclusiveLock([&]() { DeprovisionRecords.push_back({std::string(ModuleId), Info.Port}); }); - }; + hub_testutils::StateChangeCapture CaptureInstance; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(ProvisionCb), std::move(DeprovisionCb)); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, CaptureInstance.CaptureFunc()); HubProvisionedInstanceInfo Info; std::string Reason; @@ -1023,11 +1082,11 @@ TEST_CASE("hub.provision_callbacks") REQUIRE_MESSAGE(ProvisionResult, Reason); { - RwLock::SharedLockScope _(CallbackMutex); - REQUIRE_EQ(ProvisionRecords.size(), 1u); - CHECK_EQ(ProvisionRecords[0].ModuleId, "cb_module"); - CHECK_EQ(ProvisionRecords[0].Port, Info.Port); - CHECK_NE(ProvisionRecords[0].Port, 0); + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u); + CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "cb_module"); + CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port); + CHECK_NE(CaptureInstance.ProvisionCallbacks[0].Port, 0); } { @@ -1044,11 +1103,11 @@ TEST_CASE("hub.provision_callbacks") } { - RwLock::SharedLockScope _(CallbackMutex); - REQUIRE_EQ(DeprovisionRecords.size(), 1u); - CHECK_EQ(DeprovisionRecords[0].ModuleId, "cb_module"); - CHECK_NE(DeprovisionRecords[0].Port, 0); - CHECK_EQ(ProvisionRecords.size(), 1u); + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); + CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "cb_module"); + CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port); + CHECK_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); } } @@ -1158,23 +1217,9 @@ TEST_CASE("hub.concurrent_callbacks") Config.BasePortNumber = 22300; Config.InstanceLimit = 10; - struct CallbackRecord - { - std::string ModuleId; - uint16_t Port; - }; - RwLock CallbackMutex; - std::vector<CallbackRecord> ProvisionCallbacks; - std::vector<CallbackRecord> DeprovisionCallbacks; - - auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { - CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); - }; - auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { - CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); - }; + hub_testutils::StateChangeCapture CaptureInstance; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, std::move(ProvisionCb), std::move(DeprovisionCb)); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, CaptureInstance.CaptureFunc()); constexpr int kHalf = 3; @@ -1189,9 +1234,9 @@ TEST_CASE("hub.concurrent_callbacks") CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); { - RwLock::ExclusiveLockScope _(CallbackMutex); - REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); - ProvisionCallbacks.clear(); + RwLock::ExclusiveLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); + CaptureInstance.ProvisionCallbacks.clear(); } // Concurrently provision kHalf new modules while deprovisioning the pre-provisioned ones. @@ -1248,17 +1293,17 @@ TEST_CASE("hub.concurrent_callbacks") // Each new_* module must have triggered exactly one provision callback with a non-zero port. // Each pre_* module must have triggered exactly one deprovision callback with a non-zero port. { - RwLock::SharedLockScope _(CallbackMutex); - REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); - REQUIRE_EQ(DeprovisionCallbacks.size(), static_cast<size_t>(kHalf)); + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); + REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), static_cast<size_t>(kHalf)); - for (const CallbackRecord& Record : ProvisionCallbacks) + for (const hub_testutils::CallbackRecord& Record : CaptureInstance.ProvisionCallbacks) { CHECK_NE(Record.Port, 0); const bool IsNewModule = Record.ModuleId.rfind("new_", 0) == 0; CHECK_MESSAGE(IsNewModule, Record.ModuleId); } - for (const CallbackRecord& Record : DeprovisionCallbacks) + for (const hub_testutils::CallbackRecord& Record : CaptureInstance.DeprovisionCallbacks) { CHECK_NE(Record.Port, 0); const bool IsPreModule = Record.ModuleId.rfind("pre_", 0) == 0; diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index f880dab52..28e77e729 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -53,12 +53,13 @@ public: std::string HydrationTargetSpecification; }; - typedef std::function<void(std::string_view ModuleId, const HubProvisionedInstanceInfo& Info)> ProvisionModuleCallbackFunc; + typedef std::function< + void(std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState)> + AsyncModuleStateChangeCallbackFunc; - Hub(const Configuration& Config, - ZenServerEnvironment&& RunEnvironment, - ProvisionModuleCallbackFunc&& ProvisionedModuleCallback = {}, - ProvisionModuleCallbackFunc&& DeprovisionedModuleCallback = {}); + Hub(const Configuration& Config, + ZenServerEnvironment&& RunEnvironment, + AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback = {}); ~Hub(); Hub(const Hub&) = delete; @@ -73,6 +74,11 @@ public: }; /** + * Deprovision all running instances + */ + void Shutdown(); + + /** * Provision a storage server instance for the given module ID. * * @param ModuleId The ID of the module to provision. @@ -139,8 +145,7 @@ private: const Configuration m_Config; ZenServerEnvironment m_RunEnvironment; - ProvisionModuleCallbackFunc m_ProvisionedModuleCallback; - ProvisionModuleCallbackFunc m_DeprovisionedModuleCallback; + AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback; std::string m_HydrationTargetSpecification; std::filesystem::path m_HydrationTempPath; @@ -170,6 +175,42 @@ private: void UpdateStats(); void UpdateCapacityMetrics(); bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); + + class InstanceStateUpdateGuard + { + public: + InstanceStateUpdateGuard(Hub& InHub, + std::string_view ModuleId, + HubInstanceState OldState, + HubInstanceState& NewState, + uint16_t BasePort, + const std::string& BaseUri) + : m_Hub(InHub) + , m_ModuleId(ModuleId) + , m_OldState(OldState) + , m_NewState(NewState) + , m_BasePort(BasePort) + , m_BaseUri(BaseUri) + { + } + ~InstanceStateUpdateGuard() { m_Hub.OnStateUpdate(m_ModuleId, m_OldState, m_NewState, m_BasePort, m_BaseUri); } + + private: + Hub& m_Hub; + const std::string m_ModuleId; + HubInstanceState m_OldState; + HubInstanceState& m_NewState; + uint16_t m_BasePort; + const std::string m_BaseUri; + }; + + void OnStateUpdate(std::string_view ModuleId, + HubInstanceState OldState, + HubInstanceState& NewState, + uint16_t BasePort, + std::string_view BaseUri); + + friend class InstanceStateUpdateGuard; }; #if ZEN_WITH_TESTS diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp index 0a9efcc44..99f0c29f3 100644 --- a/src/zenserver/hub/storageserverinstance.cpp +++ b/src/zenserver/hub/storageserverinstance.cpp @@ -69,13 +69,13 @@ StorageServerInstance::GetProcessMetrics(ProcessMetrics& OutMetrics) const OutMetrics.PeakPagefileUsage = m_PeakPagefileUsage.load(); } -void +bool StorageServerInstance::ProvisionLocked() { if (m_State.load() == HubInstanceState::Provisioned) { ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId); - return; + return false; } if (m_State.load() == HubInstanceState::Crashed) @@ -86,39 +86,45 @@ StorageServerInstance::ProvisionLocked() if (m_State.load() == HubInstanceState::Hibernated) { - if (WakeLocked()) - { - return; - } - // Wake failed; proceed with a fresh provision (discards hibernated data) - m_State = HubInstanceState::Unprovisioned; + return WakeLocked(); } - else if (m_State.load() != HubInstanceState::Unprovisioned) + + if (m_State.load() != HubInstanceState::Unprovisioned) { ZEN_WARN("Storage server instance for module '{}' is in unexpected state '{}', cannot provision", m_ModuleId, ToString(m_State.load())); - return; + return false; } ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir); m_State = HubInstanceState::Provisioning; - Hydrate(); - SpawnServerProcess(); - m_State = HubInstanceState::Provisioned; + try + { + Hydrate(); + SpawnServerProcess(); + m_State = HubInstanceState::Provisioned; + return true; + } + catch (...) + { + m_State = HubInstanceState::Unprovisioned; + throw; + } } -void +bool StorageServerInstance::DeprovisionLocked() { const HubInstanceState CurrentState = m_State.load(); - if (CurrentState != HubInstanceState::Provisioned && CurrentState != HubInstanceState::Crashed) + if (CurrentState != HubInstanceState::Provisioned && CurrentState != HubInstanceState::Crashed && + CurrentState != HubInstanceState::Hibernated) { ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned (state: '{}')", m_ModuleId, ToString(CurrentState)); - return; + return false; } ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId); @@ -126,13 +132,30 @@ StorageServerInstance::DeprovisionLocked() m_State = HubInstanceState::Deprovisioning; if (CurrentState == HubInstanceState::Provisioned) { - m_ServerInstance.Shutdown(); // Graceful; process may still be running + try + { + m_ServerInstance.Shutdown(); + } + catch (...) + { + m_State = HubInstanceState::Provisioned; // Shutdown failed; process may still be running + throw; + } } - // Crashed: process already dead; skip Shutdown + // Crashed or Hibernated: process already dead; skip Shutdown - Dehydrate(); + try + { + Dehydrate(); + } + catch (...) + { + m_State = HubInstanceState::Crashed; // Dehydrate failed; process is already dead + throw; + } m_State = HubInstanceState::Unprovisioned; + return true; } bool @@ -161,11 +184,10 @@ StorageServerInstance::HibernateLocked() m_State = HubInstanceState::Hibernated; return true; } - catch (const std::exception& Ex) + catch (...) { - ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", m_ModuleId, Ex.what()); m_State = HubInstanceState::Provisioned; // Shutdown failed; instance is still running - return false; + throw; } } @@ -191,11 +213,10 @@ StorageServerInstance::WakeLocked() m_State = HubInstanceState::Provisioned; return true; } - catch (const std::exception& Ex) + catch (...) { - ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", m_ModuleId, Ex.what()); m_State = HubInstanceState::Hibernated; - return false; + throw; } } @@ -418,18 +439,18 @@ StorageServerInstance::ExclusiveLockedPtr::IsRunning() const return m_Instance->m_State.load() == HubInstanceState::Provisioned && m_Instance->m_ServerInstance.IsRunning(); } -void +bool StorageServerInstance::ExclusiveLockedPtr::Provision() { ZEN_ASSERT(m_Instance != nullptr); - m_Instance->ProvisionLocked(); + return m_Instance->ProvisionLocked(); } -void +bool StorageServerInstance::ExclusiveLockedPtr::Deprovision() { ZEN_ASSERT(m_Instance != nullptr); - m_Instance->DeprovisionLocked(); + return m_Instance->DeprovisionLocked(); } bool diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h index 9963f640e..a0ca496dc 100644 --- a/src/zenserver/hub/storageserverinstance.h +++ b/src/zenserver/hub/storageserverinstance.h @@ -68,6 +68,11 @@ public: ZEN_ASSERT(m_Instance); return m_Instance->m_State.load(); } + uint16_t GetBasePort() const + { + ZEN_ASSERT(m_Instance); + return m_Instance->GetBasePort(); + } bool IsRunning() const; const ResourceMetrics& GetResourceMetrics() const @@ -114,6 +119,11 @@ public: ZEN_ASSERT(m_Instance); return m_Instance->m_State.load(); } + uint16_t GetBasePort() const + { + ZEN_ASSERT(m_Instance); + return m_Instance->GetBasePort(); + } bool IsRunning() const; const ResourceMetrics& GetResourceMetrics() const @@ -122,8 +132,12 @@ public: return m_Instance->m_ResourceMetrics; } - void Provision(); - void Deprovision(); + // For Provision, Deprovision, Hibernate, Wake: + // true = operation performed (state changed) + // false = precondition not met (wrong state), nothing attempted + // throws = operation attempted but failed; m_State corrected before throw + [[nodiscard]] bool Provision(); + [[nodiscard]] bool Deprovision(); [[nodiscard]] bool Hibernate(); [[nodiscard]] bool Wake(); [[nodiscard]] bool RecoverFromCrash(); // true = recovered; false = spawn failed (Crashed), caller must Deprovision() + cleanup @@ -136,8 +150,8 @@ public: [[nodiscard]] ExclusiveLockedPtr LockExclusive(bool Wait) { return ExclusiveLockedPtr(m_Lock, this, Wait); } private: - void ProvisionLocked(); - void DeprovisionLocked(); + [[nodiscard]] bool ProvisionLocked(); + [[nodiscard]] bool DeprovisionLocked(); [[nodiscard]] bool HibernateLocked(); [[nodiscard]] bool WakeLocked(); diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index 45a3211b2..f9ff655ec 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -169,14 +169,22 @@ ZenHubServer::~ZenHubServer() } void -ZenHubServer::OnProvisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) +ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId, + std::string_view ModuleId, + const HubProvisionedInstanceInfo& Info, + HubInstanceState PreviousState, + HubInstanceState NewState) { - if (m_ConsulClient) + ZEN_UNUSED(PreviousState); + if (!m_ConsulClient) + { + return; + } + if (NewState == HubInstanceState::Provisioned) { consul::ServiceRegistrationInfo ServiceInfo{ - .ServiceId = std::string(ModuleId), - .ServiceName = "zen-storage", - // .Address = "localhost", // Let the consul agent figure out out external address // TODO: Info.BaseUri? + .ServiceId = std::string(ModuleId), + .ServiceName = "zen-storage", .Port = Info.Port, .HealthEndpoint = "health", .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)), @@ -197,13 +205,7 @@ ZenHubServer::OnProvisioned(std::string_view HubInstanceId, std::string_view Mod ServiceInfo.ServiceName); } } -} - -void -ZenHubServer::OnDeprovisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) -{ - ZEN_UNUSED(HubInstanceId); - if (m_ConsulClient) + else if (NewState == HubInstanceState::Unprovisioned) { if (!m_ConsulClient->DeregisterService(ModuleId)) { @@ -216,6 +218,8 @@ ZenHubServer::OnDeprovisioned(std::string_view HubInstanceId, std::string_view M ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port); } } + // Transitional states (Provisioning, Deprovisioning, Hibernating, Waking, Recovering, Crashed) + // and Hibernated are intentionally ignored. } int @@ -268,6 +272,11 @@ ZenHubServer::Cleanup() m_Http->Close(); } + if (m_Hub) + { + m_Hub->Shutdown(); + } + m_FrontendService.reset(); m_HubService.reset(); m_ApiService.reset(); @@ -306,14 +315,14 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers", ServerConfig.HubInstanceHttpClass), - m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)]( - std::string_view ModuleId, - const HubProvisionedInstanceInfo& Info) { OnProvisioned(HubInstanceId, ModuleId, Info); } - : Hub::ProvisionModuleCallbackFunc{}, - m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)]( - std::string_view ModuleId, - const HubProvisionedInstanceInfo& Info) { OnDeprovisioned(HubInstanceId, ModuleId, Info); } - : Hub::ProvisionModuleCallbackFunc{}); + m_ConsulClient ? Hub::AsyncModuleStateChangeCallbackFunc{[this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)]( + std::string_view ModuleId, + const HubProvisionedInstanceInfo& Info, + HubInstanceState PreviousState, + HubInstanceState NewState) { + OnModuleStateChanged(HubInstanceId, ModuleId, Info, PreviousState, NewState); + }} + : Hub::AsyncModuleStateChangeCallbackFunc{}); ZEN_INFO("instantiating API service"); m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http); diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h index d44a13d86..0fb192b9f 100644 --- a/src/zenserver/hub/zenhubserver.h +++ b/src/zenserver/hub/zenhubserver.h @@ -2,6 +2,7 @@ #pragma once +#include "hubinstancestate.h" #include "zenserver.h" #include <zenutil/consul.h> @@ -89,8 +90,11 @@ public: void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } private: - void OnProvisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info); - void OnDeprovisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info); + void OnModuleStateChanged(std::string_view HubInstanceId, + std::string_view ModuleId, + const HubProvisionedInstanceInfo& Info, + HubInstanceState PreviousState, + HubInstanceState NewState); std::filesystem::path m_DataRoot; std::filesystem::path m_ContentRoot; |