diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-24 21:34:54 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2026-03-24 22:56:01 +0100 |
| commit | a8c052637b9b2f705413c5c994c83713b2f14231 (patch) | |
| tree | fb948931a33f8bb7ce2798b2c5fbed80b7bd9953 | |
| parent | use fixed size struct for hub instances (diff) | |
| download | zen-de/hub-instance-state-refactor.tar.xz zen-de/hub-instance-state-refactor.zip | |
remove m_FreePorts management, use Instance index as basis for port insteadde/hub-instance-state-refactor
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 217 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 8 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.cpp | 32 |
3 files changed, 105 insertions, 152 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index 8c8a98322..0674fe7bb 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -157,9 +157,6 @@ Hub::Hub(const Configuration& Config, m_InstanceLookup.reserve(Config.InstanceLimit); std::iota(m_FreeActiveInstanceIndexes.begin(), m_FreeActiveInstanceIndexes.end(), 0); - m_FreePorts.resize(Config.InstanceLimit); - std::iota(m_FreePorts.begin(), m_FreePorts.end(), Config.BasePortNumber); - #if ZEN_PLATFORM_WINDOWS if (m_Config.UseJobObject) { @@ -243,17 +240,8 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; bool IsNewInstance = false; - uint16_t AllocatedPort = 0; { RwLock::ExclusiveLockScope _(m_Lock); - auto RestoreAllocatedPort = MakeGuard([this, ModuleId, &IsNewInstance, &AllocatedPort]() { - if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) - { - m_FreePorts.push_back(AllocatedPort); - AllocatedPort = 0; - } - }); - if (auto It = m_ProvisioningModules.find(std::string(ModuleId)); It != m_ProvisioningModules.end()) { // Same operation already in flight -- return the already-allocated port. @@ -275,13 +263,13 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) IsNewInstance = true; - AllocatedPort = m_FreePorts.front(); - ZEN_ASSERT(AllocatedPort != 0); - m_FreePorts.pop_front(); + size_t ActiveInstanceIndex = m_FreeActiveInstanceIndexes.front(); + m_FreeActiveInstanceIndexes.pop_front(); + ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); auto NewInstance = std::make_unique<StorageServerInstance>( m_RunEnvironment, - StorageServerInstance::Configuration{.BasePort = AllocatedPort, + StorageServerInstance::Configuration{.BasePort = gsl::narrow<uint16_t>(m_Config.BasePortNumber + ActiveInstanceIndex), .HydrationTempPath = m_HydrationTempPath, .HydrationTargetSpecification = m_HydrationTargetSpecification, .HttpThreadCount = m_Config.InstanceHttpThreadCount, @@ -298,11 +286,6 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) Instance = NewInstance->LockExclusive(/*Wait*/ true); - size_t ActiveInstanceIndex = (size_t)-1; - ActiveInstanceIndex = m_FreeActiveInstanceIndexes.back(); - m_FreeActiveInstanceIndexes.pop_back(); - ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); - m_ActiveInstances[ActiveInstanceIndex].Instance = std::move(NewInstance); m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex); @@ -316,15 +299,15 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) } else { - const size_t ActiveInstanceIndex = It->second; - ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); - if (m_RecoveringModules.contains(std::string(ModuleId))) { ZEN_WARN("Attempted to provision module '{}' which is currently recovering", ModuleId); return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; } + size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(InstanceRaw); @@ -334,11 +317,10 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) return Response{EResponseCode::Completed}; } - Instance = InstanceRaw->LockExclusive(/*Wait*/ true); - AllocatedPort = InstanceRaw->GetBasePort(); + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); } - m_ProvisioningModules.emplace(std::string(ModuleId), AllocatedPort); + m_ProvisioningModules.emplace(std::string(ModuleId), Instance.GetBasePort()); } // NOTE: done while not holding the hub lock, to avoid blocking other operations. @@ -347,6 +329,8 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) ZEN_ASSERT(Instance); + OutInfo.Port = Instance.GetBasePort(); + if (m_WorkerPool) { m_BackgroundWorkLatch.AddCount(1); @@ -355,13 +339,12 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), - AllocatedPort, IsNewInstance, Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - CompleteProvision(*Instance, AllocatedPort, IsNewInstance); + CompleteProvision(*Instance, IsNewInstance); } catch (const std::exception& Ex) { @@ -392,10 +375,6 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) { RwLock::ExclusiveLockScope _(m_Lock); m_ProvisioningModules.erase(std::string(ModuleId)); - if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) - { - m_FreePorts.push_back(AllocatedPort); - } } throw; @@ -403,16 +382,14 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) } else { - CompleteProvision(Instance, AllocatedPort, IsNewInstance); + CompleteProvision(Instance, IsNewInstance); } - OutInfo.Port = AllocatedPort; - return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void -Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance) +Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, bool IsNewInstance) { const std::string ModuleId(Instance.GetModuleId()); uint16_t BasePort = Instance.GetBasePort(); @@ -424,10 +401,6 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint auto RemoveProvisioningModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_ProvisioningModules.erase(std::string(ModuleId)); - if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) - { - m_FreePorts.push_back(AllocatedPort); - } }); if (m_ShutdownFlag.load() == false) @@ -435,9 +408,8 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint try { (void)Instance.Provision(); // false = already in target state (idempotent); not an error - NewState = Instance.GetState(); - AllocatedPort = 0; - Instance = {}; + NewState = Instance.GetState(); + Instance = {}; } catch (const std::exception& Ex) { @@ -498,7 +470,6 @@ Hub::Deprovision(const std::string& ModuleId) Hub::Response Hub::InternalDeprovision(const std::string& ModuleId) { - std::unique_ptr<StorageServerInstance> RawInstance; StorageServerInstance::ExclusiveLockedPtr Instance; { @@ -532,11 +503,9 @@ Hub::InternalDeprovision(const std::string& ModuleId) { const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - RawInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + std::unique_ptr<StorageServerInstance>& RawInstance = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(RawInstance != nullptr); - m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); - m_InstanceLookup.erase(It); m_DeprovisioningModules.emplace(ModuleId, RawInstance->GetBasePort()); Instance = RawInstance->LockExclusive(/*Wait*/ true); @@ -547,24 +516,25 @@ Hub::InternalDeprovision(const std::string& ModuleId) // m_DeprovisioningModules tracks which modules are being deprovisioned, blocking // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. - ZEN_ASSERT(RawInstance); ZEN_ASSERT(Instance); if (m_WorkerPool) { + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + uint16_t BasePort = Instance.GetBasePort(); + + std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr = + std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)); m_BackgroundWorkLatch.AddCount(1); try { m_WorkerPool->ScheduleWork( - [this, - ModuleId = std::string(ModuleId), - Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)), - RawInstance = std::shared_ptr<StorageServerInstance>(std::move(RawInstance))]() mutable { + [this, ModuleId = std::string(ModuleId), Instance = std::move(SharedInstancePtr)]() mutable { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { CompleteDeprovision(*Instance); - RawInstance.reset(); } catch (const std::exception& Ex) { @@ -578,19 +548,10 @@ Hub::InternalDeprovision(const std::string& ModuleId) ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - uint16_t BasePort = Instance.GetBasePort(); InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, /*BaseUri*/ {}); - - // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly. - Instance = {}; - NewState = HubInstanceState::Unprovisioned; - { RwLock::ExclusiveLockScope _(m_Lock); m_DeprovisioningModules.erase(std::string(ModuleId)); - m_FreePorts.push_back(BasePort); } throw; } @@ -598,7 +559,6 @@ Hub::InternalDeprovision(const std::string& ModuleId) else { CompleteDeprovision(Instance); - RawInstance.reset(); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; @@ -614,14 +574,6 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance) HubInstanceState NewState = OldState; InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); - auto _ = MakeGuard([&] { - { - RwLock::ExclusiveLockScope _(m_Lock); - m_DeprovisioningModules.erase(std::string(ModuleId)); - m_FreePorts.push_back(BasePort); - } - }); - try { (void)Instance.Deprovision(); @@ -631,11 +583,21 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance) catch (const std::exception& Ex) { ZEN_ERROR("Failed to deprovision storage server instance for module '{}': {}", ModuleId, Ex.what()); - // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly. - NewState = HubInstanceState::Unprovisioned; + NewState = Instance.GetState(); Instance = {}; - throw; } + + std::unique_ptr<StorageServerInstance> DeleteInstance; + { + RwLock::ExclusiveLockScope _(m_Lock); + m_DeprovisioningModules.erase(std::string(ModuleId)); + auto It = m_InstanceLookup.find(std::string(ModuleId)); + ZEN_ASSERT(It != m_InstanceLookup.end()); + DeleteInstance = std::move(m_ActiveInstances[It->second].Instance); + m_FreeActiveInstanceIndexes.push_back(It->second); + m_InstanceLookup.erase(It); + } + DeleteInstance.reset(); } Hub::Response @@ -1021,16 +983,6 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) return false; } - // Since deprovisioning happens outside the lock and we don't return the port until the instance is fully shut down, we might be below - // the instance count limit but with no free ports available - if (m_FreePorts.empty()) - { - OutReason = fmt::format("no free ports available, deprovisioning of instances might be in flight ({})", - m_Config.InstanceLimit - m_InstanceLookup.size()); - - return false; - } - // TODO: handle additional resource metrics return true; @@ -1074,6 +1026,7 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) { return; } + ZEN_ASSERT(!m_RecoveringModules.contains(std::string(ModuleId))); m_RecoveringModules.emplace(std::string(ModuleId)); } @@ -1085,59 +1038,37 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) HubInstanceState OldState = Instance.GetState(); HubInstanceState NewState = OldState; - auto RemoveRecoveringModule = MakeGuard([&] { - RwLock::ExclusiveLockScope _(m_Lock); - m_RecoveringModules.erase(std::string(ModuleId)); - }); - - if (Instance.RecoverFromCrash()) - { - // Spawn succeeded -- instance is back in Provisioned state. - NewState = Instance.GetState(); - Instance = {}; - OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri); - return; - } - - // Spawn failed -- instance is now in Crashed state. Dehydrate before tearing down - // so any salvageable data is preserved. + // Instance is still crashed. Dehydrate before tearing down so any salvageable data is preserved. try { (void)Instance.Deprovision(); + Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what()); + Instance = {}; } - Instance = {}; std::unique_ptr<StorageServerInstance> DestroyInstance; { RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) - { - const size_t ActiveInstanceIndex = It->second; - ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); - m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); - m_InstanceLookup.erase(It); - } - m_FreePorts.push_back(BasePort); + auto It = m_InstanceLookup.find(std::string(ModuleId)); + ZEN_ASSERT(It != m_InstanceLookup.end()); + + const size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); m_RecoveringModules.erase(std::string(ModuleId)); } - RemoveRecoveringModule.Dismiss(); - try - { - DestroyInstance.reset(); - NewState = HubInstanceState::Unprovisioned; - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Failed to destroy recovered instance for module '{}': {}", ModuleId, Ex.what()); - } + DestroyInstance.reset(); - // Notify after all cleanup -- port is back in m_FreePorts and the callback sees + NewState = HubInstanceState::Unprovisioned; + + // Notify after all cleanup // a consistent end-state: module gone, transition complete. OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri); } @@ -1288,6 +1219,42 @@ namespace hub_testutils { } }; + // Poll until Find() returns false for the given module (i.e. async deprovision completes). + static bool WaitForInstanceGone(Hub& HubInstance, + std::string_view ModuleId, + std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200), + std::chrono::seconds Timeout = std::chrono::seconds(30)) + { + const auto Deadline = std::chrono::steady_clock::now() + Timeout; + while (std::chrono::steady_clock::now() < Deadline) + { + if (!HubInstance.Find(ModuleId)) + { + return true; + } + std::this_thread::sleep_for(PollInterval); + } + return !HubInstance.Find(ModuleId); + } + + // Poll until GetInstanceCount() reaches ExpectedCount (i.e. all async deprovisions complete). + static bool WaitForInstanceCount(Hub& HubInstance, + int ExpectedCount, + std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200), + std::chrono::seconds Timeout = std::chrono::seconds(30)) + { + const auto Deadline = std::chrono::steady_clock::now() + Timeout; + while (std::chrono::steady_clock::now() < Deadline) + { + if (HubInstance.GetInstanceCount() == ExpectedCount) + { + return true; + } + std::this_thread::sleep_for(PollInterval); + } + return HubInstance.GetInstanceCount() == ExpectedCount; + } + } // namespace hub_testutils TEST_CASE("hub.provision_basic") @@ -1844,12 +1811,12 @@ TEST_CASE("hub.async_hibernate_wake") CHECK(ModClient.Get("/health/")); } - // Deprovision + // Deprovision asynchronously and poll until the instance is gone { const Hub::Response R = HubInstance->Deprovision("async_hib_a"); CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); } - CHECK_FALSE(HubInstance->Find("async_hib_a")); + REQUIRE_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "async_hib_a"), "Instance did not deprovision within timeout"); } TEST_CASE("hub.recover_process_crash") @@ -2018,7 +1985,7 @@ TEST_CASE("hub.async_provision_concurrent") const Hub::Response DepResp = HubInstance->Deprovision(fmt::format("async_c{}", I)); CHECK_MESSAGE(DepResp.ResponseCode == Hub::EResponseCode::Accepted, DepResp.Message); } - CHECK_EQ(HubInstance->GetInstanceCount(), 0); + REQUIRE_MESSAGE(hub_testutils::WaitForInstanceCount(*HubInstance, 0), "Not all instances deprovisioned within timeout"); } TEST_CASE("hub.async_provision_shutdown_waits") diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index c94612621..a3f85a8c5 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -176,15 +176,15 @@ private: struct ActiveInstance { std::unique_ptr<StorageServerInstance> Instance; - std::atomic<HubInstanceState> State = HubInstanceState::Unprovisioned; + // std::atomic<HubInstanceState> State = HubInstanceState::Unprovisioned; // TODO: Move state from StorageServerInstance to + // here }; std::vector<ActiveInstance> m_ActiveInstances; - std::vector<size_t> m_FreeActiveInstanceIndexes; + std::deque<size_t> m_FreeActiveInstanceIndexes; ResourceMetrics m_ResourceLimits; SystemMetrics m_HostMetrics; std::atomic<int> m_MaxInstanceCount = 0; - std::deque<uint16_t> m_FreePorts; std::thread m_WatchDog; Event m_WatchDogEvent; @@ -197,7 +197,7 @@ private: bool IsModuleInFlightLocked(std::string_view ModuleId) const; Response InternalDeprovision(const std::string& ModuleId); - void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance); + void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, bool IsNewInstance); void AbortProvision(std::string_view ModuleId); void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance); void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance); diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp index 99f0c29f3..4710aba1e 100644 --- a/src/zenserver/hub/storageserverinstance.cpp +++ b/src/zenserver/hub/storageserverinstance.cpp @@ -132,15 +132,8 @@ StorageServerInstance::DeprovisionLocked() m_State = HubInstanceState::Deprovisioning; if (CurrentState == HubInstanceState::Provisioned) { - try - { - m_ServerInstance.Shutdown(); - } - catch (...) - { - m_State = HubInstanceState::Provisioned; // Shutdown failed; process may still be running - throw; - } + // m_ServerInstance.Shutdown() never throws unless there is a programming error (ZEN_ASSERT) + m_ServerInstance.Shutdown(); } // Crashed or Hibernated: process already dead; skip Shutdown @@ -148,10 +141,9 @@ StorageServerInstance::DeprovisionLocked() { Dehydrate(); } - catch (...) + catch (const std::exception& Ex) { - m_State = HubInstanceState::Crashed; // Dehydrate failed; process is already dead - throw; + ZEN_WARN("Dehydration of module {} failed, current state not saved. Reason: {}", m_ModuleId, Ex.what()); } m_State = HubInstanceState::Unprovisioned; @@ -178,17 +170,11 @@ StorageServerInstance::HibernateLocked() } m_State = HubInstanceState::Hibernating; - try - { - m_ServerInstance.Shutdown(); - m_State = HubInstanceState::Hibernated; - return true; - } - catch (...) - { - m_State = HubInstanceState::Provisioned; // Shutdown failed; instance is still running - throw; - } + + // m_ServerInstance.Shutdown() never throws unless there is a programming error (ZEN_ASSERT) + m_ServerInstance.Shutdown(); + m_State = HubInstanceState::Hibernated; + return true; } bool |