diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-27 09:51:29 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-27 09:51:29 +0100 |
| commit | e811745e5c37dd38a8fb9f4bc2892525401eabbd (patch) | |
| tree | 63896cabc0eb895887dc8247bb573f0dfd696afa /src/zenserver/hub/hub.cpp | |
| parent | hub async provision/deprovision/hibernate/wake (#891) (diff) | |
| download | zen-e811745e5c37dd38a8fb9f4bc2892525401eabbd.tar.xz zen-e811745e5c37dd38a8fb9f4bc2892525401eabbd.zip | |
hub instance state refactor (#892)
- Improvement: Provisioning a hibernated instance now automatically wakes it instead of requiring an explicit wake call first
- Improvement: Deprovisioning now accepts instances in Crashed or Hibernated states, not just Provisioned
- Improvement: Added `--consul-health-interval-seconds` and `--consul-deregister-after-seconds` options to control Consul health check behavior (defaults: 10s and 30s)
- Improvement: Consul registration now occurs when provisioning starts; health check intervals are applied once provisioning completes
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 987 |
1 files changed, 553 insertions, 434 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index ceba21d8d..db406947a 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -131,6 +131,8 @@ Hub::Hub(const Configuration& Config, , m_WorkerPool(OptionalWorkerPool) , m_BackgroundWorkLatch(1) , m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback)) +, m_ActiveInstances(Config.InstanceLimit) +, m_FreeActiveInstanceIndexes(Config.InstanceLimit) { m_HostMetrics = GetSystemMetrics(); m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; @@ -153,10 +155,7 @@ Hub::Hub(const Configuration& Config, ZEN_ASSERT(uint64_t(Config.BasePortNumber) + Config.InstanceLimit <= std::numeric_limits<uint16_t>::max()); m_InstanceLookup.reserve(Config.InstanceLimit); - m_ActiveInstances.reserve(Config.InstanceLimit); - - m_FreePorts.resize(Config.InstanceLimit); - std::iota(m_FreePorts.begin(), m_FreePorts.end(), Config.BasePortNumber); + std::iota(m_FreeActiveInstanceIndexes.begin(), m_FreeActiveInstanceIndexes.end(), 0); #if ZEN_PLATFORM_WINDOWS if (m_Config.UseJobObject) @@ -240,26 +239,11 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) { ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; - bool IsNewInstance = false; - uint16_t AllocatedPort = 0; + bool IsNewInstance = false; + size_t ActiveInstanceIndex = (size_t)-1; + HubInstanceState OldState = HubInstanceState::Unprovisioned; { RwLock::ExclusiveLockScope _(m_Lock); - auto RestoreAllocatedPort = MakeGuard([this, ModuleId, &IsNewInstance, &AllocatedPort]() { - if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) - { - m_FreePorts.push_back(AllocatedPort); - AllocatedPort = 0; - } - }); - - if (auto It = m_ProvisioningModules.find(std::string(ModuleId)); It != m_ProvisioningModules.end()) - { - // Same operation already in flight -- return the already-allocated port. - // RestoreAllocatedPort is a no-op here because IsNewInstance is still false - // (we return before line 273 where it is set), so no port is double-freed. - OutInfo.Port = It->second; - return Response{EResponseCode::Accepted}; - } if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end()) { @@ -273,44 +257,48 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) IsNewInstance = true; - AllocatedPort = m_FreePorts.front(); - ZEN_ASSERT(AllocatedPort != 0); - m_FreePorts.pop_front(); + ActiveInstanceIndex = m_FreeActiveInstanceIndexes.front(); + m_FreeActiveInstanceIndexes.pop_front(); + ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); - auto NewInstance = std::make_unique<StorageServerInstance>( - m_RunEnvironment, - StorageServerInstance::Configuration{.BasePort = AllocatedPort, - .HydrationTempPath = m_HydrationTempPath, - .HydrationTargetSpecification = m_HydrationTargetSpecification, - .HttpThreadCount = m_Config.InstanceHttpThreadCount, - .CoreLimit = m_Config.InstanceCoreLimit, - .ConfigPath = m_Config.InstanceConfigPath}, - ModuleId); + try + { + auto NewInstance = std::make_unique<StorageServerInstance>( + m_RunEnvironment, + StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), + .HydrationTempPath = m_HydrationTempPath, + .HydrationTargetSpecification = m_HydrationTargetSpecification, + .HttpThreadCount = m_Config.InstanceHttpThreadCount, + .CoreLimit = m_Config.InstanceCoreLimit, + .ConfigPath = m_Config.InstanceConfigPath}, + ModuleId); #if ZEN_PLATFORM_WINDOWS - if (m_JobObject.IsValid()) - { - NewInstance->SetJobObject(&m_JobObject); - } + if (m_JobObject.IsValid()) + { + NewInstance->SetJobObject(&m_JobObject); + } #endif - Instance = NewInstance->LockExclusive(/*Wait*/ true); + Instance = NewInstance->LockExclusive(/*Wait*/ true); - size_t ActiveInstanceIndex = (size_t)-1; - if (!m_FreeActiveInstanceIndexes.empty()) - { - ActiveInstanceIndex = m_FreeActiveInstanceIndexes.back(); - m_FreeActiveInstanceIndexes.pop_back(); - ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); - m_ActiveInstances[ActiveInstanceIndex] = std::move(NewInstance); + m_ActiveInstances[ActiveInstanceIndex].Instance = std::move(NewInstance); + m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex); + // Set Provisioning while both hub lock and instance lock are held so that any + // concurrent Deprovision sees the in-flight state, not Unprovisioned. + OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioning); } - else + catch (const std::exception&) { - ActiveInstanceIndex = m_ActiveInstances.size(); - m_ActiveInstances.emplace_back(std::move(NewInstance)); + Instance = {}; + m_ActiveInstances[ActiveInstanceIndex].Instance.reset(); + m_ActiveInstances[ActiveInstanceIndex].State.store(HubInstanceState::Unprovisioned); + m_InstanceLookup.erase(std::string(ModuleId)); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + throw; } - ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); - m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex); + + OutInfo.Port = GetInstanceIndexAssignedPort(ActiveInstanceIndex); ZEN_INFO("Created new storage server instance for module '{}'", ModuleId); @@ -322,36 +310,68 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) } else { - const size_t ActiveInstanceIndex = It->second; + ActiveInstanceIndex = It->second; ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); - if (m_RecoveringModules.contains(std::string(ModuleId))) - { - ZEN_WARN("Attempted to provision module '{}' which is currently recovering", ModuleId); - return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; - } + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); - std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(InstanceRaw); - if (InstanceRaw->GetState() == HubInstanceState::Provisioned) + OutInfo.Port = InstanceRaw->GetBasePort(); + + switch (CurrentState) { - OutInfo.Port = InstanceRaw->GetBasePort(); - return Response{EResponseCode::Completed}; + case HubInstanceState::Provisioning: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Crashed: + case HubInstanceState::Unprovisioned: + break; + case HubInstanceState::Provisioned: + return Response{EResponseCode::Completed}; + case HubInstanceState::Hibernated: + _.ReleaseNow(); + return Wake(std::string(ModuleId)); + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; } - Instance = InstanceRaw->LockExclusive(/*Wait*/ true); - AllocatedPort = InstanceRaw->GetBasePort(); - } + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); - m_ProvisioningModules.emplace(std::string(ModuleId), AllocatedPort); + // Re-validate state after acquiring the instance lock: a concurrent Provision may have + // completed between our hub-lock read and LockExclusive, transitioning the state away + // from Crashed/Unprovisioned. + HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (ActualState != HubInstanceState::Crashed && ActualState != HubInstanceState::Unprovisioned) + { + Instance = {}; + if (ActualState == HubInstanceState::Provisioned) + { + return Response{EResponseCode::Completed}; + } + if (ActualState == HubInstanceState::Provisioning) + { + return Response{EResponseCode::Accepted}; + } + return Response{ + EResponseCode::Rejected, + fmt::format("Module '{}' state changed to '{}' before provision could proceed", ModuleId, ToString(ActualState))}; + } + // Set Provisioning while both hub lock and instance lock are held so that any + // concurrent Deprovision sees the in-flight state, not Crashed/Unprovisioned. + OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioning); + } } // NOTE: done while not holding the hub lock, to avoid blocking other operations. - // m_ProvisioningModules tracks which modules are being provisioned, blocking - // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + // Both hub-lock paths above set OldState and updated the state to Provisioning before + // releasing the hub lock, so concurrent operations already see the in-flight state. ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); + + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Provisioning, OutInfo.Port, {}); if (m_WorkerPool) { @@ -361,13 +381,14 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), - AllocatedPort, + ActiveInstanceIndex, + OldState, IsNewInstance, Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - CompleteProvision(*Instance, AllocatedPort, IsNewInstance); + CompleteProvision(*Instance, ActiveInstanceIndex, OldState, IsNewInstance); } catch (const std::exception& Ex) { @@ -378,121 +399,100 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) } catch (const std::exception& DispatchEx) { + // Dispatch failed: undo latch increment and roll back state. ZEN_ERROR("Failed async dispatch provision of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); - if (IsNewInstance) - { - try - { - AbortProvision(ModuleId); - } - catch (const std::exception& DestroyEx) - { - ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what()); - } - } - - Instance = {}; + // dispatch failed before the lambda ran, so ActiveInstance::State is still Provisioning + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, OldState, OutInfo.Port, {}); + std::unique_ptr<StorageServerInstance> DestroyInstance; { - RwLock::ExclusiveLockScope _(m_Lock); - m_ProvisioningModules.erase(std::string(ModuleId)); - if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + if (IsNewInstance) { - m_FreePorts.push_back(AllocatedPort); + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(std::string(ModuleId)); } + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); } + DestroyInstance.reset(); throw; } } else { - CompleteProvision(Instance, AllocatedPort, IsNewInstance); + CompleteProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance); } - OutInfo.Port = AllocatedPort; - return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void -Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance) +Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState OldState, + bool IsNewInstance) { - const std::string ModuleId(Instance.GetModuleId()); - uint16_t BasePort = Instance.GetBasePort(); - std::string BaseUri; // TODO? - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); - - auto RemoveProvisioningModule = MakeGuard([&] { - RwLock::ExclusiveLockScope _(m_Lock); - m_ProvisioningModules.erase(std::string(ModuleId)); - if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) - { - m_FreePorts.push_back(AllocatedPort); - } - }); + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); + std::string BaseUri; // TODO? if (m_ShutdownFlag.load() == false) { try { - (void)Instance.Provision(); // false = already in target state (idempotent); not an error - NewState = Instance.GetState(); - AllocatedPort = 0; - Instance = {}; + switch (OldState) + { + case HubInstanceState::Crashed: + case HubInstanceState::Unprovisioned: + Instance.Provision(); + break; + case HubInstanceState::Hibernated: + ZEN_ASSERT(false); // unreachable: Provision redirects Hibernated->Wake before setting Provisioning + break; + default: + ZEN_ASSERT(false); + } + UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Provisioned, Port, BaseUri); + Instance = {}; + return; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); + // Instance will be notified and removed below. } } - if (Instance) + if (IsNewInstance) { - NewState = Instance.GetState(); + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Unprovisioned, Port, {}); Instance = {}; - - if (IsNewInstance) + std::unique_ptr<StorageServerInstance> DestroyInstance; { - try - { - AbortProvision(ModuleId); - NewState = HubInstanceState::Unprovisioned; - } - catch (const std::exception& DestroyEx) - { - ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what()); - } + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(std::string(ModuleId)); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); } + DestroyInstance.reset(); } -} - -void -Hub::AbortProvision(std::string_view ModuleId) -{ - std::unique_ptr<StorageServerInstance> DestroyInstance; + else { - RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) - { - const size_t ActiveInstanceIndex = It->second; - ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); - ZEN_ASSERT(DestroyInstance); - ZEN_ASSERT(!m_ActiveInstances[ActiveInstanceIndex]); - m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); - m_InstanceLookup.erase(It); - } - else - { - ZEN_WARN("AbortProvision called for unknown module '{}'", ModuleId); - } + // OldState = Crashed: restore without cleanup (instance stays in lookup) + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, OldState, Port, {}); + UpdateInstanceState(Instance, ActiveInstanceIndex, OldState); + Instance = {}; } - DestroyInstance.reset(); } Hub::Response @@ -505,30 +505,11 @@ Hub::Deprovision(const std::string& ModuleId) Hub::Response Hub::InternalDeprovision(const std::string& ModuleId) { - std::unique_ptr<StorageServerInstance> RawInstance; StorageServerInstance::ExclusiveLockedPtr Instance; - + size_t ActiveInstanceIndex = (size_t)-1; { RwLock::ExclusiveLockScope _(m_Lock); - if (m_ProvisioningModules.contains(ModuleId)) - { - ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId); - - return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently being provisioned", ModuleId)}; - } - - if (m_RecoveringModules.contains(ModuleId)) - { - ZEN_WARN("Attempted to deprovision module '{}' which is currently recovering", ModuleId); - return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; - } - - if (m_DeprovisioningModules.contains(ModuleId)) - { - return Response{EResponseCode::Accepted}; - } - if (auto It = m_InstanceLookup.find(ModuleId); It == m_InstanceLookup.end()) { ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); @@ -537,41 +518,64 @@ Hub::InternalDeprovision(const std::string& ModuleId) } else { - const size_t ActiveInstanceIndex = It->second; + ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - RawInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); - ZEN_ASSERT(RawInstance != nullptr); - m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); - m_InstanceLookup.erase(It); - m_DeprovisioningModules.emplace(ModuleId, RawInstance->GetBasePort()); + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + + switch (CurrentState) + { + case HubInstanceState::Deprovisioning: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Crashed: + case HubInstanceState::Hibernated: + case HubInstanceState::Provisioned: + break; + case HubInstanceState::Unprovisioned: + return Response{EResponseCode::Completed}; + case HubInstanceState::Recovering: + // Recovering is watchdog-managed; reject to avoid interfering with the in-progress + // recovery. The watchdog will transition to Provisioned or Unprovisioned, after + // which deprovision can be retried. + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; + } + + std::unique_ptr<StorageServerInstance>& RawInstance = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(RawInstance != nullptr); Instance = RawInstance->LockExclusive(/*Wait*/ true); } } // NOTE: done while not holding the hub lock, to avoid blocking other operations. - // m_DeprovisioningModules tracks which modules are being deprovisioned, blocking - // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + // The exclusive instance lock acquired above prevents concurrent LockExclusive callers + // from modifying instance state. The state transition to Deprovisioning happens below, + // after the hub lock is released. - ZEN_ASSERT(RawInstance); ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); + + HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Deprovisioning); + const uint16_t Port = Instance.GetBasePort(); + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Deprovisioning, Port, {}); if (m_WorkerPool) { + std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr = + std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)); + m_BackgroundWorkLatch.AddCount(1); try { m_WorkerPool->ScheduleWork( - [this, - ModuleId = std::string(ModuleId), - Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)), - RawInstance = std::shared_ptr<StorageServerInstance>(std::move(RawInstance))]() mutable { + [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - CompleteDeprovision(*Instance); - RawInstance.reset(); + CompleteDeprovision(*Instance, ActiveInstanceIndex); } catch (const std::exception& Ex) { @@ -582,67 +586,69 @@ Hub::InternalDeprovision(const std::string& ModuleId) } catch (const std::exception& DispatchEx) { + // Dispatch failed: undo latch increment and roll back state. ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - uint16_t BasePort = Instance.GetBasePort(); - InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, /*BaseUri*/ {}); - - // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly. - Instance = {}; - NewState = HubInstanceState::Unprovisioned; - + NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, OldState, Port, {}); { - RwLock::ExclusiveLockScope _(m_Lock); - m_DeprovisioningModules.erase(std::string(ModuleId)); - m_FreePorts.push_back(BasePort); + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); } + throw; } } else { - CompleteDeprovision(Instance); - RawInstance.reset(); + CompleteDeprovision(Instance, ActiveInstanceIndex); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void -Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance) +Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex) { - const std::string ModuleId(Instance.GetModuleId()); - uint16_t BasePort = Instance.GetBasePort(); - std::string BaseUri; // TODO? - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); - - auto _ = MakeGuard([&] { - { - RwLock::ExclusiveLockScope _(m_Lock); - m_DeprovisioningModules.erase(std::string(ModuleId)); - m_FreePorts.push_back(BasePort); - } - }); + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); try { - (void)Instance.Deprovision(); - NewState = Instance.GetState(); - Instance = {}; + Instance.Deprovision(); } catch (const std::exception& Ex) { ZEN_ERROR("Failed to deprovision storage server instance for module '{}': {}", ModuleId, Ex.what()); - // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly. - NewState = HubInstanceState::Unprovisioned; + // Effectively unreachable: Shutdown() never throws and Dehydrate() failures are swallowed + // by DeprovisionLocked. Kept as a safety net; if somehow reached, transition to Crashed + // so the watchdog can attempt recovery. Instance = {}; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Crashed); + } + NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Crashed, Port, {}); throw; } + + NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Unprovisioned, Port, {}); + Instance = {}; + + std::unique_ptr<StorageServerInstance> DeleteInstance; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + auto It = m_InstanceLookup.find(std::string(ModuleId)); + ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(It->second == ActiveInstanceIndex); + DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); + } + DeleteInstance.reset(); } Hub::Response @@ -651,59 +657,64 @@ Hub::Hibernate(const std::string& ModuleId) ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; + size_t ActiveInstanceIndex = (size_t)-1; { RwLock::ExclusiveLockScope _(m_Lock); - if (m_HibernatingModules.contains(ModuleId)) - { - return Response{EResponseCode::Accepted}; - } - - if (IsModuleInFlightLocked(ModuleId)) - { - return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)}; - } - auto It = m_InstanceLookup.find(ModuleId); if (It == m_InstanceLookup.end()) { return Response{EResponseCode::NotFound}; } - const size_t ActiveInstanceIndex = It->second; + ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; - ZEN_ASSERT(InstanceRaw); - if (InstanceRaw->GetState() == HubInstanceState::Hibernated) + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + + switch (CurrentState) { - return Response{EResponseCode::Completed}; + case HubInstanceState::Hibernating: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Provisioned: + break; + case HubInstanceState::Hibernated: + return Response{EResponseCode::Completed}; + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; } + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(InstanceRaw); + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); - m_HibernatingModules.emplace(ModuleId, Instance.GetBasePort()); + + // Re-validate state after acquiring the instance lock: WatchDog may have transitioned + // Provisioned -> Crashed between our hub-lock read and the LockExclusive call above. + + HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (ActualState != HubInstanceState::Provisioned) + { + Instance = {}; + return Response{ + EResponseCode::Rejected, + fmt::format("Module '{}' state changed to '{}' before hibernate could proceed", ModuleId, ToString(ActualState))}; + } } // NOTE: done while not holding the hub lock, to avoid blocking other operations. - // m_HibernatingModules tracks which modules are being hibernated, blocking - // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + // Any concurrent caller that acquired the hub lock and saw Provisioned will now block on + // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below + // will have already changed the state and the re-validate above will reject it. ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); - // Validate state while holding the exclusive instance lock (state cannot change). - // This gives a synchronous error response for invalid-state calls on both the sync - // and async paths, matching the existing behaviour. - if (Instance.GetState() != HubInstanceState::Provisioned) - { - const Response HibernateResp = - Response{EResponseCode::Rejected, - fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState()))}; - Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) - RwLock::ExclusiveLockScope _(m_Lock); - m_HibernatingModules.erase(ModuleId); - return HibernateResp; - } + HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Hibernating); + const uint16_t Port = Instance.GetBasePort(); + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Hibernating, Port, {}); if (m_WorkerPool) { @@ -713,11 +724,13 @@ Hub::Hibernate(const std::string& ModuleId) m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), + ActiveInstanceIndex, + OldState, Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - CompleteHibernate(*Instance); + CompleteHibernate(*Instance, ActiveInstanceIndex, OldState); } catch (const std::exception& Ex) { @@ -728,58 +741,47 @@ Hub::Hibernate(const std::string& ModuleId) } catch (const std::exception& DispatchEx) { - // Dispatch failed: undo the latch increment and tracking-set membership. - // State has not changed so no callback is needed. - // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) + // Dispatch failed: undo latch increment and roll back state. ZEN_ERROR("Failed async dispatch hibernate of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); - Instance = {}; + + NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, OldState, Port, {}); { - RwLock::ExclusiveLockScope _(m_Lock); - m_HibernatingModules.erase(std::string(ModuleId)); + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); } + throw; } } else { - CompleteHibernate(Instance); + CompleteHibernate(Instance, ActiveInstanceIndex, OldState); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void -Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance) +Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) { - const std::string ModuleId(Instance.GetModuleId()); - uint16_t BasePort = Instance.GetBasePort(); - std::string BaseUri; // TODO? - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); - - auto RemoveHibernatingModule = MakeGuard([&] { - RwLock::ExclusiveLockScope _(m_Lock); - m_HibernatingModules.erase(std::string(ModuleId)); - }); + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); try { - if (!Instance.Hibernate()) - { - ZEN_WARN("Hibernate returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState())); - NewState = Instance.GetState(); - Instance = {}; - return; - } - NewState = Instance.GetState(); + Instance.Hibernate(); + UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Hibernated); + NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, HubInstanceState::Hibernated, Port, {}); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", ModuleId, Ex.what()); - NewState = Instance.GetState(); + UpdateInstanceState(Instance, ActiveInstanceIndex, OldState); + NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, OldState, Port, {}); Instance = {}; throw; } @@ -791,59 +793,62 @@ Hub::Wake(const std::string& ModuleId) ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; + size_t ActiveInstanceIndex = (size_t)-1; { RwLock::ExclusiveLockScope _(m_Lock); - if (m_WakingModules.contains(ModuleId)) - { - return Response{EResponseCode::Accepted}; - } - - if (IsModuleInFlightLocked(ModuleId)) - { - return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)}; - } - auto It = m_InstanceLookup.find(ModuleId); if (It == m_InstanceLookup.end()) { return Response{EResponseCode::NotFound}; } - const size_t ActiveInstanceIndex = It->second; + ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; - ZEN_ASSERT(InstanceRaw); - if (InstanceRaw->GetState() == HubInstanceState::Provisioned) + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + + switch (CurrentState) { - return Response{EResponseCode::Completed}; + case HubInstanceState::Waking: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Hibernated: + break; + case HubInstanceState::Provisioned: + return Response{EResponseCode::Completed}; + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; } + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(InstanceRaw); + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); - m_WakingModules.emplace(ModuleId, Instance.GetBasePort()); + + // Re-validate state after acquiring the instance lock: a concurrent Wake or Deprovision may + // have transitioned Hibernated -> something else between our hub-lock read and LockExclusive. + HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (ActualState != HubInstanceState::Hibernated) + { + Instance = {}; + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' state changed to '{}' before wake could proceed", ModuleId, ToString(ActualState))}; + } } // NOTE: done while not holding the hub lock, to avoid blocking other operations. - // m_WakingModules tracks which modules are being woken, blocking - // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + // Any concurrent caller that acquired the hub lock and saw Hibernated will now block on + // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below + // will have already changed the state and the re-validate above will reject it. ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); - // Validate state while holding the exclusive instance lock (state cannot change). - // This gives a synchronous error response for invalid-state calls on both the sync - // and async paths, matching the existing behaviour. - if (Instance.GetState() != HubInstanceState::Hibernated) - { - const Response WakeResp = - Response{EResponseCode::Rejected, - fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState()))}; - Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) - RwLock::ExclusiveLockScope _(m_Lock); - m_WakingModules.erase(ModuleId); - return WakeResp; - } + HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Waking); + const uint16_t Port = Instance.GetBasePort(); + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Waking, Port, {}); if (m_WorkerPool) { @@ -853,11 +858,13 @@ Hub::Wake(const std::string& ModuleId) m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), + ActiveInstanceIndex, + OldState, Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - CompleteWake(*Instance); + CompleteWake(*Instance, ActiveInstanceIndex, OldState); } catch (const std::exception& Ex) { @@ -868,58 +875,47 @@ Hub::Wake(const std::string& ModuleId) } catch (const std::exception& DispatchEx) { - // Dispatch failed: undo the latch increment and tracking-set membership. - // State has not changed so no callback is needed. - // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) + // Dispatch failed: undo latch increment and roll back state. ZEN_ERROR("Failed async dispatch wake of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); - Instance = {}; + + NotifyStateUpdate(ModuleId, HubInstanceState::Waking, OldState, Port, {}); { - RwLock::ExclusiveLockScope _(m_Lock); - m_WakingModules.erase(std::string(ModuleId)); + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); } + throw; } } else { - CompleteWake(Instance); + CompleteWake(Instance, ActiveInstanceIndex, OldState); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void -Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance) +Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) { - const std::string ModuleId(Instance.GetModuleId()); - uint16_t BasePort = Instance.GetBasePort(); - std::string BaseUri; // TODO? - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); - - auto RemoveWakingModule = MakeGuard([&] { - RwLock::ExclusiveLockScope _(m_Lock); - m_WakingModules.erase(std::string(ModuleId)); - }); + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); try { - if (!Instance.Wake()) - { - ZEN_WARN("Wake returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState())); - NewState = Instance.GetState(); - Instance = {}; - return; - } - NewState = Instance.GetState(); + Instance.Wake(); + UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); + NotifyStateUpdate(ModuleId, HubInstanceState::Waking, HubInstanceState::Provisioned, Port, {}); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", ModuleId, Ex.what()); - NewState = Instance.GetState(); + UpdateInstanceState(Instance, ActiveInstanceIndex, OldState); + NotifyStateUpdate(ModuleId, HubInstanceState::Waking, OldState, Port, {}); Instance = {}; throw; } @@ -935,10 +931,10 @@ Hub::Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo) { const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex]; + const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(Instance); InstanceInfo Info{ - Instance->GetState(), + m_ActiveInstances[ActiveInstanceIndex].State.load(), std::chrono::system_clock::now() // TODO }; Instance->GetProcessMetrics(Info.Metrics); @@ -959,10 +955,10 @@ Hub::EnumerateModules(std::function<void(std::string_view ModuleId, const Instan RwLock::SharedLockScope _(m_Lock); for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup) { - const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex]; + const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(Instance); InstanceInfo Info{ - Instance->GetState(), + m_ActiveInstances[ActiveInstanceIndex].State.load(), std::chrono::system_clock::now() // TODO }; Instance->GetProcessMetrics(Info.Metrics); @@ -1007,152 +1003,159 @@ Hub::UpdateStats() bool Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) { - if (m_DeprovisioningModules.contains(std::string(ModuleId))) - { - OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId); - - return false; - } - - if (m_ProvisioningModules.contains(std::string(ModuleId))) - { - OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId); - - return false; - } - - if (gsl::narrow_cast<int>(m_InstanceLookup.size()) >= m_Config.InstanceLimit) + ZEN_UNUSED(ModuleId); + if (m_FreeActiveInstanceIndexes.empty()) { OutReason = fmt::format("instance limit ({}) exceeded", m_Config.InstanceLimit); return false; } - // Since deprovisioning happens outside the lock and we don't return the port until the instance is fully shut down, we might be below - // the instance count limit but with no free ports available - if (m_FreePorts.empty()) - { - OutReason = fmt::format("no free ports available, deprovisioning of instances might be in flight ({})", - m_Config.InstanceLimit - m_InstanceLookup.size()); - - return false; - } - // TODO: handle additional resource metrics return true; } -bool -Hub::IsModuleInFlightLocked(std::string_view ModuleId) const +uint16_t +Hub::GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const +{ + return gsl::narrow<uint16_t>(m_Config.BasePortNumber + ActiveInstanceIndex); +} + +HubInstanceState +Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState) { - const std::string Key(ModuleId); - return m_ProvisioningModules.contains(Key) || m_DeprovisioningModules.contains(Key) || m_HibernatingModules.contains(Key) || - m_WakingModules.contains(Key) || m_RecoveringModules.contains(Key); + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + ZEN_ASSERT_SLOW([](HubInstanceState From, HubInstanceState To) { + switch (From) + { + case HubInstanceState::Unprovisioned: + return To == HubInstanceState::Provisioning; + case HubInstanceState::Provisioned: + return To == HubInstanceState::Hibernating || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Crashed; + case HubInstanceState::Hibernated: + return To == HubInstanceState::Waking || To == HubInstanceState::Deprovisioning; + case HubInstanceState::Crashed: + return To == HubInstanceState::Provisioning || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Recovering; + case HubInstanceState::Provisioning: + return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned || To == HubInstanceState::Crashed; + case HubInstanceState::Hibernating: + return To == HubInstanceState::Hibernated || To == HubInstanceState::Provisioned; + case HubInstanceState::Waking: + return To == HubInstanceState::Provisioned || To == HubInstanceState::Hibernated; + case HubInstanceState::Deprovisioning: + return To == HubInstanceState::Unprovisioned || To == HubInstanceState::Provisioned || To == HubInstanceState::Hibernated || + To == HubInstanceState::Crashed; + case HubInstanceState::Recovering: + return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned; + } + return false; + }(m_ActiveInstances[ActiveInstanceIndex].State.load(), NewState)); + return m_ActiveInstances[ActiveInstanceIndex].State.exchange(NewState); } void Hub::AttemptRecoverInstance(std::string_view ModuleId) { StorageServerInstance::ExclusiveLockedPtr Instance; + size_t ActiveInstanceIndex = (size_t)-1; { RwLock::ExclusiveLockScope _(m_Lock); - if (IsModuleInFlightLocked(ModuleId)) - { - return; - } - auto It = m_InstanceLookup.find(std::string(ModuleId)); if (It == m_InstanceLookup.end()) { return; } - const size_t ActiveInstanceIndex = It->second; + ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true); + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(InstanceRaw); + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (CurrentState != HubInstanceState::Crashed) + { + return; + } - // Definitive check while hub lock is held: exclusive instance lock is also held, - // so state cannot change under us. Bail if state changed (e.g. concurrent deprovision) - // or the process restarted since the watchdog fired. - if (Instance.GetState() != HubInstanceState::Provisioned || Instance.IsRunning()) + Instance = m_ActiveInstances[ActiveInstanceIndex].Instance->LockExclusive(/*Wait*/ false); + if (!Instance) { + // Instance lock is held by another operation; the watchdog will retry on the next cycle if the state is still Crashed. return; } - m_RecoveringModules.emplace(std::string(ModuleId)); + ZEN_ASSERT(!Instance.IsRunning()); + + (void)UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Recovering); } ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); + ZEN_ASSERT_SLOW(m_ActiveInstances[ActiveInstanceIndex].State.load() == HubInstanceState::Recovering); - const uint16_t BasePort = Instance.GetBasePort(); - std::string BaseUri; // TODO? - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; + NotifyStateUpdate(ModuleId, HubInstanceState::Crashed, HubInstanceState::Recovering, Instance.GetBasePort(), /*BaseUri*/ {}); - auto RemoveRecoveringModule = MakeGuard([&] { - RwLock::ExclusiveLockScope _(m_Lock); - m_RecoveringModules.erase(std::string(ModuleId)); - }); - - if (Instance.RecoverFromCrash()) - { - // Spawn succeeded -- instance is back in Provisioned state. - NewState = Instance.GetState(); - Instance = {}; - OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri); - return; - } - - // Spawn failed -- instance is now in Crashed state. Dehydrate before tearing down - // so any salvageable data is preserved. + // Dehydrate before trying to recover so any salvageable data is preserved. try { - (void)Instance.Deprovision(); + Instance.Deprovision(); } catch (const std::exception& Ex) { ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what()); - } - Instance = {}; - - std::unique_ptr<StorageServerInstance> DestroyInstance; - { - RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) + NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Unprovisioned, Instance.GetBasePort(), /*BaseUri*/ {}); + Instance = {}; + std::unique_ptr<StorageServerInstance> DestroyInstance; { - const size_t ActiveInstanceIndex = It->second; - ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); + RwLock::ExclusiveLockScope HubLock(m_Lock); + auto It = m_InstanceLookup.find(std::string(ModuleId)); + ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(ActiveInstanceIndex == It->second); + + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(It); + (void)UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); } - m_FreePorts.push_back(BasePort); - m_RecoveringModules.erase(std::string(ModuleId)); + DestroyInstance.reset(); + return; } - RemoveRecoveringModule.Dismiss(); try { - DestroyInstance.reset(); - NewState = HubInstanceState::Unprovisioned; + Instance.Provision(); + UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); + NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Provisioned, Instance.GetBasePort(), /*BaseUri*/ {}); + Instance = {}; } catch (const std::exception& Ex) { - ZEN_ERROR("Failed to destroy recovered instance for module '{}': {}", ModuleId, Ex.what()); - } + ZEN_ERROR("Failed to reprovision instance for module '{}' during crash recovery reprovision: {}", ModuleId, Ex.what()); + NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Unprovisioned, Instance.GetBasePort(), /*BaseUri*/ {}); + Instance = {}; + std::unique_ptr<StorageServerInstance> DestroyInstance; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + auto It = m_InstanceLookup.find(std::string(ModuleId)); + ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(ActiveInstanceIndex == It->second); - // Notify after all cleanup -- port is back in m_FreePorts and the callback sees - // a consistent end-state: module gone, transition complete. - OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri); + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); + (void)UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); + } + DestroyInstance.reset(); + return; + } } void Hub::WatchDog() { - constexpr uint64_t WatchDogWakeupTimeMs = 5000; + constexpr uint64_t WatchDogWakeupTimeMs = 3000; constexpr uint64_t WatchDogProcessingTimeMs = 500; size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0 @@ -1179,7 +1182,7 @@ Hub::WatchDog() { CheckInstanceIndex = 0; } - StorageServerInstance* Instance = m_ActiveInstances[CheckInstanceIndex].get(); + StorageServerInstance* Instance = m_ActiveInstances[CheckInstanceIndex].Instance.get(); if (Instance) { LockedInstance = Instance->LockShared(/*Wait*/ false); @@ -1198,14 +1201,19 @@ Hub::WatchDog() { LockedInstance.UpdateMetrics(); } - else if (LockedInstance.GetState() == HubInstanceState::Provisioned) + else if (m_ActiveInstances[CheckInstanceIndex].State.load() == HubInstanceState::Provisioned) { // Process is not running but state says it should be - instance died unexpectedly. const std::string ModuleId(LockedInstance.GetModuleId()); + const uint16_t Port = LockedInstance.GetBasePort(); + UpdateInstanceState(LockedInstance, CheckInstanceIndex, HubInstanceState::Crashed); + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {}); LockedInstance = {}; AttemptRecoverInstance(ModuleId); } - // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering, Crashed) - expected, skip. + // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip. + // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance + // lock was busy on a previous cycle and recovery is already pending. LockedInstance = {}; // Rate-limit: pause briefly between live-instance checks and respond to shutdown. @@ -1221,11 +1229,11 @@ Hub::WatchDog() } void -Hub::OnStateUpdate(std::string_view ModuleId, - HubInstanceState OldState, - HubInstanceState& NewState, - uint16_t BasePort, - std::string_view BaseUri) +Hub::NotifyStateUpdate(std::string_view ModuleId, + HubInstanceState OldState, + HubInstanceState NewState, + uint16_t BasePort, + std::string_view BaseUri) { if (m_ModuleStateChangeCallback && OldState != NewState) { @@ -1295,6 +1303,42 @@ namespace hub_testutils { } }; + // Poll until Find() returns false for the given module (i.e. async deprovision completes). + static bool WaitForInstanceGone(Hub& HubInstance, + std::string_view ModuleId, + std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200), + std::chrono::seconds Timeout = std::chrono::seconds(30)) + { + const auto Deadline = std::chrono::steady_clock::now() + Timeout; + while (std::chrono::steady_clock::now() < Deadline) + { + if (!HubInstance.Find(ModuleId)) + { + return true; + } + std::this_thread::sleep_for(PollInterval); + } + return !HubInstance.Find(ModuleId); + } + + // Poll until GetInstanceCount() reaches ExpectedCount (i.e. all async deprovisions complete). + static bool WaitForInstanceCount(Hub& HubInstance, + int ExpectedCount, + std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200), + std::chrono::seconds Timeout = std::chrono::seconds(30)) + { + const auto Deadline = std::chrono::steady_clock::now() + Timeout; + while (std::chrono::steady_clock::now() < Deadline) + { + if (HubInstance.GetInstanceCount() == ExpectedCount) + { + return true; + } + std::this_thread::sleep_for(PollInterval); + } + return HubInstance.GetInstanceCount() == ExpectedCount; + } + } // namespace hub_testutils TEST_CASE("hub.provision_basic") @@ -1423,6 +1467,49 @@ TEST_CASE("hub.provision_callbacks") } } +TEST_CASE("hub.provision_callback_sequence") +{ + ScopedTemporaryDirectory TempDir; + + struct TransitionRecord + { + HubInstanceState OldState; + HubInstanceState NewState; + }; + RwLock CaptureMutex; + std::vector<TransitionRecord> Transitions; + + auto CaptureFunc = + [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) { + ZEN_UNUSED(ModuleId); + ZEN_UNUSED(Info); + CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); + }; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc)); + + HubProvisionedInstanceInfo Info; + { + const Hub::Response R = HubInstance->Provision("seq_module", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + { + const Hub::Response R = HubInstance->Deprovision("seq_module"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + + RwLock::SharedLockScope _(CaptureMutex); + REQUIRE_EQ(Transitions.size(), 4u); + CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned); + CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning); + CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning); + CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned); + CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned); + CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning); + CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning); + CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned); +} + TEST_CASE("hub.instance_limit") { ScopedTemporaryDirectory TempDir; @@ -1851,18 +1938,30 @@ TEST_CASE("hub.async_hibernate_wake") CHECK(ModClient.Get("/health/")); } - // Deprovision + // Deprovision asynchronously and poll until the instance is gone { const Hub::Response R = HubInstance->Deprovision("async_hib_a"); CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); } - CHECK_FALSE(HubInstance->Find("async_hib_a")); + REQUIRE_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "async_hib_a"), "Instance did not deprovision within timeout"); } TEST_CASE("hub.recover_process_crash") { ScopedTemporaryDirectory TempDir; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + struct TransitionRecord + { + HubInstanceState OldState; + HubInstanceState NewState; + }; + RwLock CaptureMutex; + std::vector<TransitionRecord> Transitions; + auto CaptureFunc = [&](std::string_view, const HubProvisionedInstanceInfo&, HubInstanceState OldState, HubInstanceState NewState) { + CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); + }; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc)); HubProvisionedInstanceInfo Info; { @@ -1896,6 +1995,26 @@ TEST_CASE("hub.recover_process_crash") } } CHECK_MESSAGE(Recovered, "Instance did not recover within timeout"); + + // Verify the full crash/recovery callback sequence + { + RwLock::SharedLockScope _(CaptureMutex); + REQUIRE_GE(Transitions.size(), 3u); + // Find the Provisioned->Crashed transition + const auto CrashedIt = std::find_if(Transitions.begin(), Transitions.end(), [](const TransitionRecord& R) { + return R.OldState == HubInstanceState::Provisioned && R.NewState == HubInstanceState::Crashed; + }); + REQUIRE_NE(CrashedIt, Transitions.end()); + // Recovery sequence follows: Crashed->Recovering, Recovering->Provisioned + const auto RecoveringIt = CrashedIt + 1; + REQUIRE_NE(RecoveringIt, Transitions.end()); + CHECK_EQ(RecoveringIt->OldState, HubInstanceState::Crashed); + CHECK_EQ(RecoveringIt->NewState, HubInstanceState::Recovering); + const auto RecoveredIt = RecoveringIt + 1; + REQUIRE_NE(RecoveredIt, Transitions.end()); + CHECK_EQ(RecoveredIt->OldState, HubInstanceState::Recovering); + CHECK_EQ(RecoveredIt->NewState, HubInstanceState::Provisioned); + } } TEST_CASE("hub.recover_process_crash_then_deprovision") @@ -2025,7 +2144,7 @@ TEST_CASE("hub.async_provision_concurrent") const Hub::Response DepResp = HubInstance->Deprovision(fmt::format("async_c{}", I)); CHECK_MESSAGE(DepResp.ResponseCode == Hub::EResponseCode::Accepted, DepResp.Message); } - CHECK_EQ(HubInstance->GetInstanceCount(), 0); + REQUIRE_MESSAGE(hub_testutils::WaitForInstanceCount(*HubInstance, 0), "Not all instances deprovisioned within timeout"); } TEST_CASE("hub.async_provision_shutdown_waits") @@ -2101,7 +2220,7 @@ Hub::TerminateModuleForTesting(const std::string& ModuleId) { return; } - StorageServerInstance::SharedLockedPtr Locked = m_ActiveInstances[It->second]->LockShared(/*Wait*/ true); + StorageServerInstance::SharedLockedPtr Locked = m_ActiveInstances[It->second].Instance->LockShared(/*Wait*/ true); if (Locked) { Locked.TerminateForTesting(); |