diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-27 09:51:29 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-27 09:51:29 +0100 |
| commit | e811745e5c37dd38a8fb9f4bc2892525401eabbd (patch) | |
| tree | 63896cabc0eb895887dc8247bb573f0dfd696afa /src/zenserver | |
| parent | hub async provision/deprovision/hibernate/wake (#891) (diff) | |
| download | zen-e811745e5c37dd38a8fb9f4bc2892525401eabbd.tar.xz zen-e811745e5c37dd38a8fb9f4bc2892525401eabbd.zip | |
hub instance state refactor (#892)
- Improvement: Provisioning a hibernated instance now automatically wakes it instead of requiring an explicit wake call first
- Improvement: Deprovisioning now accepts instances in Crashed or Hibernated states, not just Provisioned
- Improvement: Added `--consul-health-interval-seconds` and `--consul-deregister-after-seconds` options to control Consul health check behavior (defaults: 10s and 30s)
- Improvement: Consul registration now occurs when provisioning starts; health check intervals are applied once provisioning completes
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 987 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 125 | ||||
| -rw-r--r-- | src/zenserver/hub/hubinstancestate.h | 21 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.cpp | 174 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.h | 43 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 33 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.h | 20 |
7 files changed, 721 insertions, 682 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index ceba21d8d..db406947a 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -131,6 +131,8 @@ Hub::Hub(const Configuration& Config, , m_WorkerPool(OptionalWorkerPool) , m_BackgroundWorkLatch(1) , m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback)) +, m_ActiveInstances(Config.InstanceLimit) +, m_FreeActiveInstanceIndexes(Config.InstanceLimit) { m_HostMetrics = GetSystemMetrics(); m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; @@ -153,10 +155,7 @@ Hub::Hub(const Configuration& Config, ZEN_ASSERT(uint64_t(Config.BasePortNumber) + Config.InstanceLimit <= std::numeric_limits<uint16_t>::max()); m_InstanceLookup.reserve(Config.InstanceLimit); - m_ActiveInstances.reserve(Config.InstanceLimit); - - m_FreePorts.resize(Config.InstanceLimit); - std::iota(m_FreePorts.begin(), m_FreePorts.end(), Config.BasePortNumber); + std::iota(m_FreeActiveInstanceIndexes.begin(), m_FreeActiveInstanceIndexes.end(), 0); #if ZEN_PLATFORM_WINDOWS if (m_Config.UseJobObject) @@ -240,26 +239,11 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) { ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; - bool IsNewInstance = false; - uint16_t AllocatedPort = 0; + bool IsNewInstance = false; + size_t ActiveInstanceIndex = (size_t)-1; + HubInstanceState OldState = HubInstanceState::Unprovisioned; { RwLock::ExclusiveLockScope _(m_Lock); - auto RestoreAllocatedPort = MakeGuard([this, ModuleId, &IsNewInstance, &AllocatedPort]() { - if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) - { - m_FreePorts.push_back(AllocatedPort); - AllocatedPort = 0; - } - }); - - if (auto It = m_ProvisioningModules.find(std::string(ModuleId)); It != m_ProvisioningModules.end()) - { - // Same operation already in flight -- return the already-allocated port. - // RestoreAllocatedPort is a no-op here because IsNewInstance is still false - // (we return before line 273 where it is set), so no port is double-freed. - OutInfo.Port = It->second; - return Response{EResponseCode::Accepted}; - } if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end()) { @@ -273,44 +257,48 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) IsNewInstance = true; - AllocatedPort = m_FreePorts.front(); - ZEN_ASSERT(AllocatedPort != 0); - m_FreePorts.pop_front(); + ActiveInstanceIndex = m_FreeActiveInstanceIndexes.front(); + m_FreeActiveInstanceIndexes.pop_front(); + ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); - auto NewInstance = std::make_unique<StorageServerInstance>( - m_RunEnvironment, - StorageServerInstance::Configuration{.BasePort = AllocatedPort, - .HydrationTempPath = m_HydrationTempPath, - .HydrationTargetSpecification = m_HydrationTargetSpecification, - .HttpThreadCount = m_Config.InstanceHttpThreadCount, - .CoreLimit = m_Config.InstanceCoreLimit, - .ConfigPath = m_Config.InstanceConfigPath}, - ModuleId); + try + { + auto NewInstance = std::make_unique<StorageServerInstance>( + m_RunEnvironment, + StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), + .HydrationTempPath = m_HydrationTempPath, + .HydrationTargetSpecification = m_HydrationTargetSpecification, + .HttpThreadCount = m_Config.InstanceHttpThreadCount, + .CoreLimit = m_Config.InstanceCoreLimit, + .ConfigPath = m_Config.InstanceConfigPath}, + ModuleId); #if ZEN_PLATFORM_WINDOWS - if (m_JobObject.IsValid()) - { - NewInstance->SetJobObject(&m_JobObject); - } + if (m_JobObject.IsValid()) + { + NewInstance->SetJobObject(&m_JobObject); + } #endif - Instance = NewInstance->LockExclusive(/*Wait*/ true); + Instance = NewInstance->LockExclusive(/*Wait*/ true); - size_t ActiveInstanceIndex = (size_t)-1; - if (!m_FreeActiveInstanceIndexes.empty()) - { - ActiveInstanceIndex = m_FreeActiveInstanceIndexes.back(); - m_FreeActiveInstanceIndexes.pop_back(); - ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); - m_ActiveInstances[ActiveInstanceIndex] = std::move(NewInstance); + m_ActiveInstances[ActiveInstanceIndex].Instance = std::move(NewInstance); + m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex); + // Set Provisioning while both hub lock and instance lock are held so that any + // concurrent Deprovision sees the in-flight state, not Unprovisioned. + OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioning); } - else + catch (const std::exception&) { - ActiveInstanceIndex = m_ActiveInstances.size(); - m_ActiveInstances.emplace_back(std::move(NewInstance)); + Instance = {}; + m_ActiveInstances[ActiveInstanceIndex].Instance.reset(); + m_ActiveInstances[ActiveInstanceIndex].State.store(HubInstanceState::Unprovisioned); + m_InstanceLookup.erase(std::string(ModuleId)); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + throw; } - ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); - m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex); + + OutInfo.Port = GetInstanceIndexAssignedPort(ActiveInstanceIndex); ZEN_INFO("Created new storage server instance for module '{}'", ModuleId); @@ -322,36 +310,68 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) } else { - const size_t ActiveInstanceIndex = It->second; + ActiveInstanceIndex = It->second; ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); - if (m_RecoveringModules.contains(std::string(ModuleId))) - { - ZEN_WARN("Attempted to provision module '{}' which is currently recovering", ModuleId); - return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; - } + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); - std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(InstanceRaw); - if (InstanceRaw->GetState() == HubInstanceState::Provisioned) + OutInfo.Port = InstanceRaw->GetBasePort(); + + switch (CurrentState) { - OutInfo.Port = InstanceRaw->GetBasePort(); - return Response{EResponseCode::Completed}; + case HubInstanceState::Provisioning: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Crashed: + case HubInstanceState::Unprovisioned: + break; + case HubInstanceState::Provisioned: + return Response{EResponseCode::Completed}; + case HubInstanceState::Hibernated: + _.ReleaseNow(); + return Wake(std::string(ModuleId)); + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; } - Instance = InstanceRaw->LockExclusive(/*Wait*/ true); - AllocatedPort = InstanceRaw->GetBasePort(); - } + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); - m_ProvisioningModules.emplace(std::string(ModuleId), AllocatedPort); + // Re-validate state after acquiring the instance lock: a concurrent Provision may have + // completed between our hub-lock read and LockExclusive, transitioning the state away + // from Crashed/Unprovisioned. + HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (ActualState != HubInstanceState::Crashed && ActualState != HubInstanceState::Unprovisioned) + { + Instance = {}; + if (ActualState == HubInstanceState::Provisioned) + { + return Response{EResponseCode::Completed}; + } + if (ActualState == HubInstanceState::Provisioning) + { + return Response{EResponseCode::Accepted}; + } + return Response{ + EResponseCode::Rejected, + fmt::format("Module '{}' state changed to '{}' before provision could proceed", ModuleId, ToString(ActualState))}; + } + // Set Provisioning while both hub lock and instance lock are held so that any + // concurrent Deprovision sees the in-flight state, not Crashed/Unprovisioned. + OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioning); + } } // NOTE: done while not holding the hub lock, to avoid blocking other operations. - // m_ProvisioningModules tracks which modules are being provisioned, blocking - // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + // Both hub-lock paths above set OldState and updated the state to Provisioning before + // releasing the hub lock, so concurrent operations already see the in-flight state. ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); + + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Provisioning, OutInfo.Port, {}); if (m_WorkerPool) { @@ -361,13 +381,14 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), - AllocatedPort, + ActiveInstanceIndex, + OldState, IsNewInstance, Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - CompleteProvision(*Instance, AllocatedPort, IsNewInstance); + CompleteProvision(*Instance, ActiveInstanceIndex, OldState, IsNewInstance); } catch (const std::exception& Ex) { @@ -378,121 +399,100 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) } catch (const std::exception& DispatchEx) { + // Dispatch failed: undo latch increment and roll back state. ZEN_ERROR("Failed async dispatch provision of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); - if (IsNewInstance) - { - try - { - AbortProvision(ModuleId); - } - catch (const std::exception& DestroyEx) - { - ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what()); - } - } - - Instance = {}; + // dispatch failed before the lambda ran, so ActiveInstance::State is still Provisioning + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, OldState, OutInfo.Port, {}); + std::unique_ptr<StorageServerInstance> DestroyInstance; { - RwLock::ExclusiveLockScope _(m_Lock); - m_ProvisioningModules.erase(std::string(ModuleId)); - if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + if (IsNewInstance) { - m_FreePorts.push_back(AllocatedPort); + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(std::string(ModuleId)); } + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); } + DestroyInstance.reset(); throw; } } else { - CompleteProvision(Instance, AllocatedPort, IsNewInstance); + CompleteProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance); } - OutInfo.Port = AllocatedPort; - return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void -Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance) +Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState OldState, + bool IsNewInstance) { - const std::string ModuleId(Instance.GetModuleId()); - uint16_t BasePort = Instance.GetBasePort(); - std::string BaseUri; // TODO? - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); - - auto RemoveProvisioningModule = MakeGuard([&] { - RwLock::ExclusiveLockScope _(m_Lock); - m_ProvisioningModules.erase(std::string(ModuleId)); - if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) - { - m_FreePorts.push_back(AllocatedPort); - } - }); + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); + std::string BaseUri; // TODO? if (m_ShutdownFlag.load() == false) { try { - (void)Instance.Provision(); // false = already in target state (idempotent); not an error - NewState = Instance.GetState(); - AllocatedPort = 0; - Instance = {}; + switch (OldState) + { + case HubInstanceState::Crashed: + case HubInstanceState::Unprovisioned: + Instance.Provision(); + break; + case HubInstanceState::Hibernated: + ZEN_ASSERT(false); // unreachable: Provision redirects Hibernated->Wake before setting Provisioning + break; + default: + ZEN_ASSERT(false); + } + UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Provisioned, Port, BaseUri); + Instance = {}; + return; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); + // Instance will be notified and removed below. } } - if (Instance) + if (IsNewInstance) { - NewState = Instance.GetState(); + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Unprovisioned, Port, {}); Instance = {}; - - if (IsNewInstance) + std::unique_ptr<StorageServerInstance> DestroyInstance; { - try - { - AbortProvision(ModuleId); - NewState = HubInstanceState::Unprovisioned; - } - catch (const std::exception& DestroyEx) - { - ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what()); - } + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(std::string(ModuleId)); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); } + DestroyInstance.reset(); } -} - -void -Hub::AbortProvision(std::string_view ModuleId) -{ - std::unique_ptr<StorageServerInstance> DestroyInstance; + else { - RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) - { - const size_t ActiveInstanceIndex = It->second; - ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); - ZEN_ASSERT(DestroyInstance); - ZEN_ASSERT(!m_ActiveInstances[ActiveInstanceIndex]); - m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); - m_InstanceLookup.erase(It); - } - else - { - ZEN_WARN("AbortProvision called for unknown module '{}'", ModuleId); - } + // OldState = Crashed: restore without cleanup (instance stays in lookup) + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, OldState, Port, {}); + UpdateInstanceState(Instance, ActiveInstanceIndex, OldState); + Instance = {}; } - DestroyInstance.reset(); } Hub::Response @@ -505,30 +505,11 @@ Hub::Deprovision(const std::string& ModuleId) Hub::Response Hub::InternalDeprovision(const std::string& ModuleId) { - std::unique_ptr<StorageServerInstance> RawInstance; StorageServerInstance::ExclusiveLockedPtr Instance; - + size_t ActiveInstanceIndex = (size_t)-1; { RwLock::ExclusiveLockScope _(m_Lock); - if (m_ProvisioningModules.contains(ModuleId)) - { - ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId); - - return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently being provisioned", ModuleId)}; - } - - if (m_RecoveringModules.contains(ModuleId)) - { - ZEN_WARN("Attempted to deprovision module '{}' which is currently recovering", ModuleId); - return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; - } - - if (m_DeprovisioningModules.contains(ModuleId)) - { - return Response{EResponseCode::Accepted}; - } - if (auto It = m_InstanceLookup.find(ModuleId); It == m_InstanceLookup.end()) { ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); @@ -537,41 +518,64 @@ Hub::InternalDeprovision(const std::string& ModuleId) } else { - const size_t ActiveInstanceIndex = It->second; + ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - RawInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); - ZEN_ASSERT(RawInstance != nullptr); - m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); - m_InstanceLookup.erase(It); - m_DeprovisioningModules.emplace(ModuleId, RawInstance->GetBasePort()); + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + + switch (CurrentState) + { + case HubInstanceState::Deprovisioning: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Crashed: + case HubInstanceState::Hibernated: + case HubInstanceState::Provisioned: + break; + case HubInstanceState::Unprovisioned: + return Response{EResponseCode::Completed}; + case HubInstanceState::Recovering: + // Recovering is watchdog-managed; reject to avoid interfering with the in-progress + // recovery. The watchdog will transition to Provisioned or Unprovisioned, after + // which deprovision can be retried. + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; + } + + std::unique_ptr<StorageServerInstance>& RawInstance = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(RawInstance != nullptr); Instance = RawInstance->LockExclusive(/*Wait*/ true); } } // NOTE: done while not holding the hub lock, to avoid blocking other operations. - // m_DeprovisioningModules tracks which modules are being deprovisioned, blocking - // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + // The exclusive instance lock acquired above prevents concurrent LockExclusive callers + // from modifying instance state. The state transition to Deprovisioning happens below, + // after the hub lock is released. - ZEN_ASSERT(RawInstance); ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); + + HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Deprovisioning); + const uint16_t Port = Instance.GetBasePort(); + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Deprovisioning, Port, {}); if (m_WorkerPool) { + std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr = + std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)); + m_BackgroundWorkLatch.AddCount(1); try { m_WorkerPool->ScheduleWork( - [this, - ModuleId = std::string(ModuleId), - Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)), - RawInstance = std::shared_ptr<StorageServerInstance>(std::move(RawInstance))]() mutable { + [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - CompleteDeprovision(*Instance); - RawInstance.reset(); + CompleteDeprovision(*Instance, ActiveInstanceIndex); } catch (const std::exception& Ex) { @@ -582,67 +586,69 @@ Hub::InternalDeprovision(const std::string& ModuleId) } catch (const std::exception& DispatchEx) { + // Dispatch failed: undo latch increment and roll back state. ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - uint16_t BasePort = Instance.GetBasePort(); - InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, /*BaseUri*/ {}); - - // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly. - Instance = {}; - NewState = HubInstanceState::Unprovisioned; - + NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, OldState, Port, {}); { - RwLock::ExclusiveLockScope _(m_Lock); - m_DeprovisioningModules.erase(std::string(ModuleId)); - m_FreePorts.push_back(BasePort); + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); } + throw; } } else { - CompleteDeprovision(Instance); - RawInstance.reset(); + CompleteDeprovision(Instance, ActiveInstanceIndex); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void -Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance) +Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex) { - const std::string ModuleId(Instance.GetModuleId()); - uint16_t BasePort = Instance.GetBasePort(); - std::string BaseUri; // TODO? - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); - - auto _ = MakeGuard([&] { - { - RwLock::ExclusiveLockScope _(m_Lock); - m_DeprovisioningModules.erase(std::string(ModuleId)); - m_FreePorts.push_back(BasePort); - } - }); + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); try { - (void)Instance.Deprovision(); - NewState = Instance.GetState(); - Instance = {}; + Instance.Deprovision(); } catch (const std::exception& Ex) { ZEN_ERROR("Failed to deprovision storage server instance for module '{}': {}", ModuleId, Ex.what()); - // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly. - NewState = HubInstanceState::Unprovisioned; + // Effectively unreachable: Shutdown() never throws and Dehydrate() failures are swallowed + // by DeprovisionLocked. Kept as a safety net; if somehow reached, transition to Crashed + // so the watchdog can attempt recovery. Instance = {}; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Crashed); + } + NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Crashed, Port, {}); throw; } + + NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Unprovisioned, Port, {}); + Instance = {}; + + std::unique_ptr<StorageServerInstance> DeleteInstance; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + auto It = m_InstanceLookup.find(std::string(ModuleId)); + ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(It->second == ActiveInstanceIndex); + DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); + } + DeleteInstance.reset(); } Hub::Response @@ -651,59 +657,64 @@ Hub::Hibernate(const std::string& ModuleId) ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; + size_t ActiveInstanceIndex = (size_t)-1; { RwLock::ExclusiveLockScope _(m_Lock); - if (m_HibernatingModules.contains(ModuleId)) - { - return Response{EResponseCode::Accepted}; - } - - if (IsModuleInFlightLocked(ModuleId)) - { - return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)}; - } - auto It = m_InstanceLookup.find(ModuleId); if (It == m_InstanceLookup.end()) { return Response{EResponseCode::NotFound}; } - const size_t ActiveInstanceIndex = It->second; + ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; - ZEN_ASSERT(InstanceRaw); - if (InstanceRaw->GetState() == HubInstanceState::Hibernated) + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + + switch (CurrentState) { - return Response{EResponseCode::Completed}; + case HubInstanceState::Hibernating: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Provisioned: + break; + case HubInstanceState::Hibernated: + return Response{EResponseCode::Completed}; + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; } + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(InstanceRaw); + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); - m_HibernatingModules.emplace(ModuleId, Instance.GetBasePort()); + + // Re-validate state after acquiring the instance lock: WatchDog may have transitioned + // Provisioned -> Crashed between our hub-lock read and the LockExclusive call above. + + HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (ActualState != HubInstanceState::Provisioned) + { + Instance = {}; + return Response{ + EResponseCode::Rejected, + fmt::format("Module '{}' state changed to '{}' before hibernate could proceed", ModuleId, ToString(ActualState))}; + } } // NOTE: done while not holding the hub lock, to avoid blocking other operations. - // m_HibernatingModules tracks which modules are being hibernated, blocking - // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + // Any concurrent caller that acquired the hub lock and saw Provisioned will now block on + // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below + // will have already changed the state and the re-validate above will reject it. ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); - // Validate state while holding the exclusive instance lock (state cannot change). - // This gives a synchronous error response for invalid-state calls on both the sync - // and async paths, matching the existing behaviour. - if (Instance.GetState() != HubInstanceState::Provisioned) - { - const Response HibernateResp = - Response{EResponseCode::Rejected, - fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState()))}; - Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) - RwLock::ExclusiveLockScope _(m_Lock); - m_HibernatingModules.erase(ModuleId); - return HibernateResp; - } + HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Hibernating); + const uint16_t Port = Instance.GetBasePort(); + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Hibernating, Port, {}); if (m_WorkerPool) { @@ -713,11 +724,13 @@ Hub::Hibernate(const std::string& ModuleId) m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), + ActiveInstanceIndex, + OldState, Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - CompleteHibernate(*Instance); + CompleteHibernate(*Instance, ActiveInstanceIndex, OldState); } catch (const std::exception& Ex) { @@ -728,58 +741,47 @@ Hub::Hibernate(const std::string& ModuleId) } catch (const std::exception& DispatchEx) { - // Dispatch failed: undo the latch increment and tracking-set membership. - // State has not changed so no callback is needed. - // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) + // Dispatch failed: undo latch increment and roll back state. ZEN_ERROR("Failed async dispatch hibernate of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); - Instance = {}; + + NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, OldState, Port, {}); { - RwLock::ExclusiveLockScope _(m_Lock); - m_HibernatingModules.erase(std::string(ModuleId)); + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); } + throw; } } else { - CompleteHibernate(Instance); + CompleteHibernate(Instance, ActiveInstanceIndex, OldState); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void -Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance) +Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) { - const std::string ModuleId(Instance.GetModuleId()); - uint16_t BasePort = Instance.GetBasePort(); - std::string BaseUri; // TODO? - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); - - auto RemoveHibernatingModule = MakeGuard([&] { - RwLock::ExclusiveLockScope _(m_Lock); - m_HibernatingModules.erase(std::string(ModuleId)); - }); + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); try { - if (!Instance.Hibernate()) - { - ZEN_WARN("Hibernate returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState())); - NewState = Instance.GetState(); - Instance = {}; - return; - } - NewState = Instance.GetState(); + Instance.Hibernate(); + UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Hibernated); + NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, HubInstanceState::Hibernated, Port, {}); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", ModuleId, Ex.what()); - NewState = Instance.GetState(); + UpdateInstanceState(Instance, ActiveInstanceIndex, OldState); + NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, OldState, Port, {}); Instance = {}; throw; } @@ -791,59 +793,62 @@ Hub::Wake(const std::string& ModuleId) ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; + size_t ActiveInstanceIndex = (size_t)-1; { RwLock::ExclusiveLockScope _(m_Lock); - if (m_WakingModules.contains(ModuleId)) - { - return Response{EResponseCode::Accepted}; - } - - if (IsModuleInFlightLocked(ModuleId)) - { - return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)}; - } - auto It = m_InstanceLookup.find(ModuleId); if (It == m_InstanceLookup.end()) { return Response{EResponseCode::NotFound}; } - const size_t ActiveInstanceIndex = It->second; + ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; - ZEN_ASSERT(InstanceRaw); - if (InstanceRaw->GetState() == HubInstanceState::Provisioned) + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + + switch (CurrentState) { - return Response{EResponseCode::Completed}; + case HubInstanceState::Waking: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Hibernated: + break; + case HubInstanceState::Provisioned: + return Response{EResponseCode::Completed}; + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; } + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(InstanceRaw); + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); - m_WakingModules.emplace(ModuleId, Instance.GetBasePort()); + + // Re-validate state after acquiring the instance lock: a concurrent Wake or Deprovision may + // have transitioned Hibernated -> something else between our hub-lock read and LockExclusive. + HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (ActualState != HubInstanceState::Hibernated) + { + Instance = {}; + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' state changed to '{}' before wake could proceed", ModuleId, ToString(ActualState))}; + } } // NOTE: done while not holding the hub lock, to avoid blocking other operations. - // m_WakingModules tracks which modules are being woken, blocking - // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + // Any concurrent caller that acquired the hub lock and saw Hibernated will now block on + // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below + // will have already changed the state and the re-validate above will reject it. ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); - // Validate state while holding the exclusive instance lock (state cannot change). - // This gives a synchronous error response for invalid-state calls on both the sync - // and async paths, matching the existing behaviour. - if (Instance.GetState() != HubInstanceState::Hibernated) - { - const Response WakeResp = - Response{EResponseCode::Rejected, - fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState()))}; - Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) - RwLock::ExclusiveLockScope _(m_Lock); - m_WakingModules.erase(ModuleId); - return WakeResp; - } + HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Waking); + const uint16_t Port = Instance.GetBasePort(); + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Waking, Port, {}); if (m_WorkerPool) { @@ -853,11 +858,13 @@ Hub::Wake(const std::string& ModuleId) m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), + ActiveInstanceIndex, + OldState, Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - CompleteWake(*Instance); + CompleteWake(*Instance, ActiveInstanceIndex, OldState); } catch (const std::exception& Ex) { @@ -868,58 +875,47 @@ Hub::Wake(const std::string& ModuleId) } catch (const std::exception& DispatchEx) { - // Dispatch failed: undo the latch increment and tracking-set membership. - // State has not changed so no callback is needed. - // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) + // Dispatch failed: undo latch increment and roll back state. ZEN_ERROR("Failed async dispatch wake of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); - Instance = {}; + + NotifyStateUpdate(ModuleId, HubInstanceState::Waking, OldState, Port, {}); { - RwLock::ExclusiveLockScope _(m_Lock); - m_WakingModules.erase(std::string(ModuleId)); + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); } + throw; } } else { - CompleteWake(Instance); + CompleteWake(Instance, ActiveInstanceIndex, OldState); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void -Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance) +Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) { - const std::string ModuleId(Instance.GetModuleId()); - uint16_t BasePort = Instance.GetBasePort(); - std::string BaseUri; // TODO? - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); - - auto RemoveWakingModule = MakeGuard([&] { - RwLock::ExclusiveLockScope _(m_Lock); - m_WakingModules.erase(std::string(ModuleId)); - }); + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); try { - if (!Instance.Wake()) - { - ZEN_WARN("Wake returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState())); - NewState = Instance.GetState(); - Instance = {}; - return; - } - NewState = Instance.GetState(); + Instance.Wake(); + UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); + NotifyStateUpdate(ModuleId, HubInstanceState::Waking, HubInstanceState::Provisioned, Port, {}); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", ModuleId, Ex.what()); - NewState = Instance.GetState(); + UpdateInstanceState(Instance, ActiveInstanceIndex, OldState); + NotifyStateUpdate(ModuleId, HubInstanceState::Waking, OldState, Port, {}); Instance = {}; throw; } @@ -935,10 +931,10 @@ Hub::Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo) { const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex]; + const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(Instance); InstanceInfo Info{ - Instance->GetState(), + m_ActiveInstances[ActiveInstanceIndex].State.load(), std::chrono::system_clock::now() // TODO }; Instance->GetProcessMetrics(Info.Metrics); @@ -959,10 +955,10 @@ Hub::EnumerateModules(std::function<void(std::string_view ModuleId, const Instan RwLock::SharedLockScope _(m_Lock); for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup) { - const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex]; + const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(Instance); InstanceInfo Info{ - Instance->GetState(), + m_ActiveInstances[ActiveInstanceIndex].State.load(), std::chrono::system_clock::now() // TODO }; Instance->GetProcessMetrics(Info.Metrics); @@ -1007,152 +1003,159 @@ Hub::UpdateStats() bool Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) { - if (m_DeprovisioningModules.contains(std::string(ModuleId))) - { - OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId); - - return false; - } - - if (m_ProvisioningModules.contains(std::string(ModuleId))) - { - OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId); - - return false; - } - - if (gsl::narrow_cast<int>(m_InstanceLookup.size()) >= m_Config.InstanceLimit) + ZEN_UNUSED(ModuleId); + if (m_FreeActiveInstanceIndexes.empty()) { OutReason = fmt::format("instance limit ({}) exceeded", m_Config.InstanceLimit); return false; } - // Since deprovisioning happens outside the lock and we don't return the port until the instance is fully shut down, we might be below - // the instance count limit but with no free ports available - if (m_FreePorts.empty()) - { - OutReason = fmt::format("no free ports available, deprovisioning of instances might be in flight ({})", - m_Config.InstanceLimit - m_InstanceLookup.size()); - - return false; - } - // TODO: handle additional resource metrics return true; } -bool -Hub::IsModuleInFlightLocked(std::string_view ModuleId) const +uint16_t +Hub::GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const +{ + return gsl::narrow<uint16_t>(m_Config.BasePortNumber + ActiveInstanceIndex); +} + +HubInstanceState +Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState) { - const std::string Key(ModuleId); - return m_ProvisioningModules.contains(Key) || m_DeprovisioningModules.contains(Key) || m_HibernatingModules.contains(Key) || - m_WakingModules.contains(Key) || m_RecoveringModules.contains(Key); + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + ZEN_ASSERT_SLOW([](HubInstanceState From, HubInstanceState To) { + switch (From) + { + case HubInstanceState::Unprovisioned: + return To == HubInstanceState::Provisioning; + case HubInstanceState::Provisioned: + return To == HubInstanceState::Hibernating || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Crashed; + case HubInstanceState::Hibernated: + return To == HubInstanceState::Waking || To == HubInstanceState::Deprovisioning; + case HubInstanceState::Crashed: + return To == HubInstanceState::Provisioning || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Recovering; + case HubInstanceState::Provisioning: + return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned || To == HubInstanceState::Crashed; + case HubInstanceState::Hibernating: + return To == HubInstanceState::Hibernated || To == HubInstanceState::Provisioned; + case HubInstanceState::Waking: + return To == HubInstanceState::Provisioned || To == HubInstanceState::Hibernated; + case HubInstanceState::Deprovisioning: + return To == HubInstanceState::Unprovisioned || To == HubInstanceState::Provisioned || To == HubInstanceState::Hibernated || + To == HubInstanceState::Crashed; + case HubInstanceState::Recovering: + return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned; + } + return false; + }(m_ActiveInstances[ActiveInstanceIndex].State.load(), NewState)); + return m_ActiveInstances[ActiveInstanceIndex].State.exchange(NewState); } void Hub::AttemptRecoverInstance(std::string_view ModuleId) { StorageServerInstance::ExclusiveLockedPtr Instance; + size_t ActiveInstanceIndex = (size_t)-1; { RwLock::ExclusiveLockScope _(m_Lock); - if (IsModuleInFlightLocked(ModuleId)) - { - return; - } - auto It = m_InstanceLookup.find(std::string(ModuleId)); if (It == m_InstanceLookup.end()) { return; } - const size_t ActiveInstanceIndex = It->second; + ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true); + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(InstanceRaw); + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (CurrentState != HubInstanceState::Crashed) + { + return; + } - // Definitive check while hub lock is held: exclusive instance lock is also held, - // so state cannot change under us. Bail if state changed (e.g. concurrent deprovision) - // or the process restarted since the watchdog fired. - if (Instance.GetState() != HubInstanceState::Provisioned || Instance.IsRunning()) + Instance = m_ActiveInstances[ActiveInstanceIndex].Instance->LockExclusive(/*Wait*/ false); + if (!Instance) { + // Instance lock is held by another operation; the watchdog will retry on the next cycle if the state is still Crashed. return; } - m_RecoveringModules.emplace(std::string(ModuleId)); + ZEN_ASSERT(!Instance.IsRunning()); + + (void)UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Recovering); } ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); + ZEN_ASSERT_SLOW(m_ActiveInstances[ActiveInstanceIndex].State.load() == HubInstanceState::Recovering); - const uint16_t BasePort = Instance.GetBasePort(); - std::string BaseUri; // TODO? - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; + NotifyStateUpdate(ModuleId, HubInstanceState::Crashed, HubInstanceState::Recovering, Instance.GetBasePort(), /*BaseUri*/ {}); - auto RemoveRecoveringModule = MakeGuard([&] { - RwLock::ExclusiveLockScope _(m_Lock); - m_RecoveringModules.erase(std::string(ModuleId)); - }); - - if (Instance.RecoverFromCrash()) - { - // Spawn succeeded -- instance is back in Provisioned state. - NewState = Instance.GetState(); - Instance = {}; - OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri); - return; - } - - // Spawn failed -- instance is now in Crashed state. Dehydrate before tearing down - // so any salvageable data is preserved. + // Dehydrate before trying to recover so any salvageable data is preserved. try { - (void)Instance.Deprovision(); + Instance.Deprovision(); } catch (const std::exception& Ex) { ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what()); - } - Instance = {}; - - std::unique_ptr<StorageServerInstance> DestroyInstance; - { - RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) + NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Unprovisioned, Instance.GetBasePort(), /*BaseUri*/ {}); + Instance = {}; + std::unique_ptr<StorageServerInstance> DestroyInstance; { - const size_t ActiveInstanceIndex = It->second; - ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); + RwLock::ExclusiveLockScope HubLock(m_Lock); + auto It = m_InstanceLookup.find(std::string(ModuleId)); + ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(ActiveInstanceIndex == It->second); + + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(It); + (void)UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); } - m_FreePorts.push_back(BasePort); - m_RecoveringModules.erase(std::string(ModuleId)); + DestroyInstance.reset(); + return; } - RemoveRecoveringModule.Dismiss(); try { - DestroyInstance.reset(); - NewState = HubInstanceState::Unprovisioned; + Instance.Provision(); + UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); + NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Provisioned, Instance.GetBasePort(), /*BaseUri*/ {}); + Instance = {}; } catch (const std::exception& Ex) { - ZEN_ERROR("Failed to destroy recovered instance for module '{}': {}", ModuleId, Ex.what()); - } + ZEN_ERROR("Failed to reprovision instance for module '{}' during crash recovery reprovision: {}", ModuleId, Ex.what()); + NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Unprovisioned, Instance.GetBasePort(), /*BaseUri*/ {}); + Instance = {}; + std::unique_ptr<StorageServerInstance> DestroyInstance; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + auto It = m_InstanceLookup.find(std::string(ModuleId)); + ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(ActiveInstanceIndex == It->second); - // Notify after all cleanup -- port is back in m_FreePorts and the callback sees - // a consistent end-state: module gone, transition complete. - OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri); + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); + (void)UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); + } + DestroyInstance.reset(); + return; + } } void Hub::WatchDog() { - constexpr uint64_t WatchDogWakeupTimeMs = 5000; + constexpr uint64_t WatchDogWakeupTimeMs = 3000; constexpr uint64_t WatchDogProcessingTimeMs = 500; size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0 @@ -1179,7 +1182,7 @@ Hub::WatchDog() { CheckInstanceIndex = 0; } - StorageServerInstance* Instance = m_ActiveInstances[CheckInstanceIndex].get(); + StorageServerInstance* Instance = m_ActiveInstances[CheckInstanceIndex].Instance.get(); if (Instance) { LockedInstance = Instance->LockShared(/*Wait*/ false); @@ -1198,14 +1201,19 @@ Hub::WatchDog() { LockedInstance.UpdateMetrics(); } - else if (LockedInstance.GetState() == HubInstanceState::Provisioned) + else if (m_ActiveInstances[CheckInstanceIndex].State.load() == HubInstanceState::Provisioned) { // Process is not running but state says it should be - instance died unexpectedly. const std::string ModuleId(LockedInstance.GetModuleId()); + const uint16_t Port = LockedInstance.GetBasePort(); + UpdateInstanceState(LockedInstance, CheckInstanceIndex, HubInstanceState::Crashed); + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {}); LockedInstance = {}; AttemptRecoverInstance(ModuleId); } - // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering, Crashed) - expected, skip. + // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip. + // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance + // lock was busy on a previous cycle and recovery is already pending. LockedInstance = {}; // Rate-limit: pause briefly between live-instance checks and respond to shutdown. @@ -1221,11 +1229,11 @@ Hub::WatchDog() } void -Hub::OnStateUpdate(std::string_view ModuleId, - HubInstanceState OldState, - HubInstanceState& NewState, - uint16_t BasePort, - std::string_view BaseUri) +Hub::NotifyStateUpdate(std::string_view ModuleId, + HubInstanceState OldState, + HubInstanceState NewState, + uint16_t BasePort, + std::string_view BaseUri) { if (m_ModuleStateChangeCallback && OldState != NewState) { @@ -1295,6 +1303,42 @@ namespace hub_testutils { } }; + // Poll until Find() returns false for the given module (i.e. async deprovision completes). + static bool WaitForInstanceGone(Hub& HubInstance, + std::string_view ModuleId, + std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200), + std::chrono::seconds Timeout = std::chrono::seconds(30)) + { + const auto Deadline = std::chrono::steady_clock::now() + Timeout; + while (std::chrono::steady_clock::now() < Deadline) + { + if (!HubInstance.Find(ModuleId)) + { + return true; + } + std::this_thread::sleep_for(PollInterval); + } + return !HubInstance.Find(ModuleId); + } + + // Poll until GetInstanceCount() reaches ExpectedCount (i.e. all async deprovisions complete). + static bool WaitForInstanceCount(Hub& HubInstance, + int ExpectedCount, + std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200), + std::chrono::seconds Timeout = std::chrono::seconds(30)) + { + const auto Deadline = std::chrono::steady_clock::now() + Timeout; + while (std::chrono::steady_clock::now() < Deadline) + { + if (HubInstance.GetInstanceCount() == ExpectedCount) + { + return true; + } + std::this_thread::sleep_for(PollInterval); + } + return HubInstance.GetInstanceCount() == ExpectedCount; + } + } // namespace hub_testutils TEST_CASE("hub.provision_basic") @@ -1423,6 +1467,49 @@ TEST_CASE("hub.provision_callbacks") } } +TEST_CASE("hub.provision_callback_sequence") +{ + ScopedTemporaryDirectory TempDir; + + struct TransitionRecord + { + HubInstanceState OldState; + HubInstanceState NewState; + }; + RwLock CaptureMutex; + std::vector<TransitionRecord> Transitions; + + auto CaptureFunc = + [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) { + ZEN_UNUSED(ModuleId); + ZEN_UNUSED(Info); + CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); + }; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc)); + + HubProvisionedInstanceInfo Info; + { + const Hub::Response R = HubInstance->Provision("seq_module", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + { + const Hub::Response R = HubInstance->Deprovision("seq_module"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + + RwLock::SharedLockScope _(CaptureMutex); + REQUIRE_EQ(Transitions.size(), 4u); + CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned); + CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning); + CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning); + CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned); + CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned); + CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning); + CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning); + CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned); +} + TEST_CASE("hub.instance_limit") { ScopedTemporaryDirectory TempDir; @@ -1851,18 +1938,30 @@ TEST_CASE("hub.async_hibernate_wake") CHECK(ModClient.Get("/health/")); } - // Deprovision + // Deprovision asynchronously and poll until the instance is gone { const Hub::Response R = HubInstance->Deprovision("async_hib_a"); CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); } - CHECK_FALSE(HubInstance->Find("async_hib_a")); + REQUIRE_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "async_hib_a"), "Instance did not deprovision within timeout"); } TEST_CASE("hub.recover_process_crash") { ScopedTemporaryDirectory TempDir; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + struct TransitionRecord + { + HubInstanceState OldState; + HubInstanceState NewState; + }; + RwLock CaptureMutex; + std::vector<TransitionRecord> Transitions; + auto CaptureFunc = [&](std::string_view, const HubProvisionedInstanceInfo&, HubInstanceState OldState, HubInstanceState NewState) { + CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); + }; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc)); HubProvisionedInstanceInfo Info; { @@ -1896,6 +1995,26 @@ TEST_CASE("hub.recover_process_crash") } } CHECK_MESSAGE(Recovered, "Instance did not recover within timeout"); + + // Verify the full crash/recovery callback sequence + { + RwLock::SharedLockScope _(CaptureMutex); + REQUIRE_GE(Transitions.size(), 3u); + // Find the Provisioned->Crashed transition + const auto CrashedIt = std::find_if(Transitions.begin(), Transitions.end(), [](const TransitionRecord& R) { + return R.OldState == HubInstanceState::Provisioned && R.NewState == HubInstanceState::Crashed; + }); + REQUIRE_NE(CrashedIt, Transitions.end()); + // Recovery sequence follows: Crashed->Recovering, Recovering->Provisioned + const auto RecoveringIt = CrashedIt + 1; + REQUIRE_NE(RecoveringIt, Transitions.end()); + CHECK_EQ(RecoveringIt->OldState, HubInstanceState::Crashed); + CHECK_EQ(RecoveringIt->NewState, HubInstanceState::Recovering); + const auto RecoveredIt = RecoveringIt + 1; + REQUIRE_NE(RecoveredIt, Transitions.end()); + CHECK_EQ(RecoveredIt->OldState, HubInstanceState::Recovering); + CHECK_EQ(RecoveredIt->NewState, HubInstanceState::Provisioned); + } } TEST_CASE("hub.recover_process_crash_then_deprovision") @@ -2025,7 +2144,7 @@ TEST_CASE("hub.async_provision_concurrent") const Hub::Response DepResp = HubInstance->Deprovision(fmt::format("async_c{}", I)); CHECK_MESSAGE(DepResp.ResponseCode == Hub::EResponseCode::Accepted, DepResp.Message); } - CHECK_EQ(HubInstance->GetInstanceCount(), 0); + REQUIRE_MESSAGE(hub_testutils::WaitForInstanceCount(*HubInstance, 0), "Not all instances deprovisioned within timeout"); } TEST_CASE("hub.async_provision_shutdown_waits") @@ -2101,7 +2220,7 @@ Hub::TerminateModuleForTesting(const std::string& ModuleId) { return; } - StorageServerInstance::SharedLockedPtr Locked = m_ActiveInstances[It->second]->LockShared(/*Wait*/ true); + StorageServerInstance::SharedLockedPtr Locked = m_ActiveInstances[It->second].Instance->LockShared(/*Wait*/ true); if (Locked) { Locked.TerminateForTesting(); diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index f0bde10fc..eb2a06587 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -165,72 +165,79 @@ private: #if ZEN_PLATFORM_WINDOWS JobObject m_JobObject; #endif - RwLock m_Lock; - std::unordered_map<std::string, size_t> m_InstanceLookup; - std::unordered_map<std::string, uint16_t> m_DeprovisioningModules; - std::unordered_map<std::string, uint16_t> m_ProvisioningModules; - std::unordered_map<std::string, uint16_t> m_HibernatingModules; - std::unordered_map<std::string, uint16_t> m_WakingModules; - std::unordered_set<std::string> m_RecoveringModules; - std::vector<std::unique_ptr<StorageServerInstance>> m_ActiveInstances; - std::vector<size_t> m_FreeActiveInstanceIndexes; - ResourceMetrics m_ResourceLimits; - SystemMetrics m_HostMetrics; - std::atomic<int> m_MaxInstanceCount = 0; - std::deque<uint16_t> m_FreePorts; - std::thread m_WatchDog; + RwLock m_Lock; + std::unordered_map<std::string, size_t> m_InstanceLookup; + + struct ActiveInstance + { + // Invariant: Instance == nullptr if and only if State == Unprovisioned. + // Both fields are only created/destroyed under the hub's exclusive lock. + // State is an atomic because the watchdog reads it under a shared instance lock + // without holding the hub lock. + std::unique_ptr<StorageServerInstance> Instance; + std::atomic<HubInstanceState> State = HubInstanceState::Unprovisioned; + // TODO: We should move current metrics here (from StorageServerInstance) + }; + + // UpdateInstanceState is overloaded to accept a locked instance pointer (exclusive or shared) or the hub exclusive + // lock scope as a proof token that the caller holds an appropriate lock before mutating ActiveInstance::State. + // State mutation and notification (NotifyStateUpdate) are intentionally decoupled — see NotifyStateUpdate below. + + HubInstanceState UpdateInstanceState(const StorageServerInstance::ExclusiveLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState NewState) + { + ZEN_ASSERT(Instance); + return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState); + } + HubInstanceState UpdateInstanceState(const StorageServerInstance::SharedLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState NewState) + { + ZEN_ASSERT(Instance); + return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState); + } + HubInstanceState UpdateInstanceState(const RwLock::ExclusiveLockScope& HubLock, size_t ActiveInstanceIndex, HubInstanceState NewState) + { + ZEN_UNUSED(HubLock); + return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState); + } + HubInstanceState UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState); + + std::vector<ActiveInstance> m_ActiveInstances; + std::deque<size_t> m_FreeActiveInstanceIndexes; + ResourceMetrics m_ResourceLimits; + SystemMetrics m_HostMetrics; + std::atomic<int> m_MaxInstanceCount = 0; + std::thread m_WatchDog; Event m_WatchDogEvent; void WatchDog(); void AttemptRecoverInstance(std::string_view ModuleId); - void UpdateStats(); - void UpdateCapacityMetrics(); - bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); - bool IsModuleInFlightLocked(std::string_view ModuleId) const; + void UpdateStats(); + void UpdateCapacityMetrics(); + bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); + uint16_t GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const; Response InternalDeprovision(const std::string& ModuleId); - void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance); - void AbortProvision(std::string_view ModuleId); - void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance); - void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance); - void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance); - - class InstanceStateUpdateGuard - { - public: - InstanceStateUpdateGuard(Hub& InHub, - std::string_view ModuleId, - HubInstanceState OldState, - HubInstanceState& NewState, - uint16_t BasePort, - const std::string& BaseUri) - : m_Hub(InHub) - , m_ModuleId(ModuleId) - , m_OldState(OldState) - , m_NewState(NewState) - , m_BasePort(BasePort) - , m_BaseUri(BaseUri) - { - } - ~InstanceStateUpdateGuard() { m_Hub.OnStateUpdate(m_ModuleId, m_OldState, m_NewState, m_BasePort, m_BaseUri); } - - private: - Hub& m_Hub; - const std::string m_ModuleId; - HubInstanceState m_OldState; - HubInstanceState& m_NewState; - uint16_t m_BasePort; - const std::string m_BaseUri; - }; - - void OnStateUpdate(std::string_view ModuleId, - HubInstanceState OldState, - HubInstanceState& NewState, - uint16_t BasePort, - std::string_view BaseUri); - - friend class InstanceStateUpdateGuard; + void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState OldState, + bool IsNewInstance); + void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex); + void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + + // Notifications may fire slightly out of sync with the Hub's internal State flag. + // The guarantee is that notifications are sent in the correct order, but the State + // flag may be updated either before or after the notification fires depending on the + // code path. Callers must not assume a specific ordering between the two. + void NotifyStateUpdate(std::string_view ModuleId, + HubInstanceState OldState, + HubInstanceState NewState, + uint16_t BasePort, + std::string_view BaseUri); }; #if ZEN_WITH_TESTS diff --git a/src/zenserver/hub/hubinstancestate.h b/src/zenserver/hub/hubinstancestate.h index 2dee89ff0..c895f75d1 100644 --- a/src/zenserver/hub/hubinstancestate.h +++ b/src/zenserver/hub/hubinstancestate.h @@ -9,15 +9,18 @@ namespace zen { enum class HubInstanceState : uint32_t { - Unprovisioned, // Initial state; process not running - Provisioning, // Hydrating and spawning process - Provisioned, // Process running and serving requests - Hibernating, // Shutting down process, preserving data on disk - Hibernated, // Process stopped, data preserved; can be woken - Waking, // Starting process from preserved data - Deprovisioning, // Shutting down process and cleaning up data - Crashed, // Process died unexpectedly while Provisioned; recovery pending - Recovering, // Attempting in-place restart after a crash + // Stable states - possible to initiate state change to a different stable state via the transitioning states + Unprovisioned, // Initial state; process not running + Provisioned, // Process running and serving requests + Hibernated, // Process stopped, data preserved; can be woken + Crashed, // Process died unexpectedly while Provisioned; recovery pending + + // Transitioning states - there is explicit ownership during this state and it may not be stolen + Provisioning, // Unprovisioned -> Provisioned (Hydrating and spawning process) + Hibernating, // Provisioned -> Hibernated (Shutting down process, preserving data on disk) + Waking, // Hibernated -> Provisioned (Starting process from preserved data) + Deprovisioning, // Provisioned/Hibernated/Crashed -> Unprovisioned (Shutting down process and cleaning up data) + Recovering, // Crashed -> Provisioned/Deprovisioned (Attempting in-place restart after a crash) }; std::string_view ToString(HubInstanceState State); diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp index 99f0c29f3..6b139dbf1 100644 --- a/src/zenserver/hub/storageserverinstance.cpp +++ b/src/zenserver/hub/storageserverinstance.cpp @@ -69,177 +69,86 @@ StorageServerInstance::GetProcessMetrics(ProcessMetrics& OutMetrics) const OutMetrics.PeakPagefileUsage = m_PeakPagefileUsage.load(); } -bool +void StorageServerInstance::ProvisionLocked() { - if (m_State.load() == HubInstanceState::Provisioned) + if (m_ServerInstance.IsRunning()) { ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId); - return false; - } - - if (m_State.load() == HubInstanceState::Crashed) - { - ZEN_WARN("Storage server instance for module '{}' is in crashed state; re-provisioning from scratch", m_ModuleId); - m_State = HubInstanceState::Unprovisioned; - } - - if (m_State.load() == HubInstanceState::Hibernated) - { - return WakeLocked(); - } - - if (m_State.load() != HubInstanceState::Unprovisioned) - { - ZEN_WARN("Storage server instance for module '{}' is in unexpected state '{}', cannot provision", - m_ModuleId, - ToString(m_State.load())); - return false; + return; } ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir); - - m_State = HubInstanceState::Provisioning; try { Hydrate(); SpawnServerProcess(); - m_State = HubInstanceState::Provisioned; - return true; } - catch (...) + catch (const std::exception& Ex) { - m_State = HubInstanceState::Unprovisioned; + ZEN_WARN("Failed spawning server instance for module '{}', at '{}' during provisioning. Reason: {}", + m_ModuleId, + m_BaseDir, + Ex.what()); throw; } } -bool +void StorageServerInstance::DeprovisionLocked() { - const HubInstanceState CurrentState = m_State.load(); - if (CurrentState != HubInstanceState::Provisioned && CurrentState != HubInstanceState::Crashed && - CurrentState != HubInstanceState::Hibernated) + if (m_ServerInstance.IsRunning()) { - ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned (state: '{}')", - m_ModuleId, - ToString(CurrentState)); - return false; - } - - ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId); - - m_State = HubInstanceState::Deprovisioning; - if (CurrentState == HubInstanceState::Provisioned) - { - try - { - m_ServerInstance.Shutdown(); - } - catch (...) - { - m_State = HubInstanceState::Provisioned; // Shutdown failed; process may still be running - throw; - } + // m_ServerInstance.Shutdown() never throws. + m_ServerInstance.Shutdown(); } - // Crashed or Hibernated: process already dead; skip Shutdown + // Crashed or Hibernated: process already dead; skip Shutdown. + // Dehydrate preserves instance state for future re-provisioning. Failure means saved state + // may be stale or absent, but the process is already dead so the slot can still be released. + // Swallow the exception and proceed with cleanup rather than leaving the module stuck. try { Dehydrate(); } - catch (...) + catch (const std::exception& Ex) { - m_State = HubInstanceState::Crashed; // Dehydrate failed; process is already dead - throw; + ZEN_WARN("Dehydration of module {} failed during deprovisioning, current state not saved. Reason: {}", m_ModuleId, Ex.what()); } - - m_State = HubInstanceState::Unprovisioned; - return true; } -bool +void StorageServerInstance::HibernateLocked() { // Signal server to shut down, but keep data around for later wake - if (m_State.load() != HubInstanceState::Provisioned) - { - ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned (state: '{}')", - m_ModuleId, - ToString(m_State.load())); - return false; - } - if (!m_ServerInstance.IsRunning()) { - ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not running", m_ModuleId); - return false; + return; } - m_State = HubInstanceState::Hibernating; - try - { - m_ServerInstance.Shutdown(); - m_State = HubInstanceState::Hibernated; - return true; - } - catch (...) - { - m_State = HubInstanceState::Provisioned; // Shutdown failed; instance is still running - throw; - } + // m_ServerInstance.Shutdown() never throws. + m_ServerInstance.Shutdown(); } -bool +void StorageServerInstance::WakeLocked() { // Start server in-place using existing data - if (m_State.load() != HubInstanceState::Hibernated) + if (m_ServerInstance.IsRunning()) { - ZEN_WARN("Attempted to wake storage server instance for module '{}' which is not hibernated (state: '{}')", - m_ModuleId, - ToString(m_State.load())); - return false; + return; } - ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); - - m_State = HubInstanceState::Waking; try { SpawnServerProcess(); - m_State = HubInstanceState::Provisioned; - return true; - } - catch (...) - { - m_State = HubInstanceState::Hibernated; - throw; - } -} - -bool -StorageServerInstance::RecoverCrashedLocked() -{ - ZEN_ASSERT(m_State.load() == HubInstanceState::Provisioned); - ZEN_ASSERT(!m_ServerInstance.IsRunning()); - - ZEN_WARN("Storage server instance for module '{}' has crashed; attempting in-place recovery", m_ModuleId); - m_State = HubInstanceState::Recovering; - try - { - SpawnServerProcess(); - m_State = HubInstanceState::Provisioned; - ZEN_INFO("Storage server instance for module '{}' recovered successfully", m_ModuleId); - return true; } catch (const std::exception& Ex) { - ZEN_ERROR("Failed to restart module '{}': {}", m_ModuleId, Ex.what()); - m_State = HubInstanceState::Crashed; - return false; + ZEN_WARN("Failed spawning server instance for module '{}', at '{}' during waking. Reason: {}", m_ModuleId, m_BaseDir, Ex.what()); + throw; } } @@ -337,13 +246,13 @@ bool StorageServerInstance::SharedLockedPtr::IsRunning() const { ZEN_ASSERT(m_Instance != nullptr); - return m_Instance->m_State.load() == HubInstanceState::Provisioned && m_Instance->m_ServerInstance.IsRunning(); + return m_Instance->m_ServerInstance.IsRunning(); } void StorageServerInstance::UpdateMetricsLocked() { - if (m_State.load() == HubInstanceState::Provisioned) + if (m_ServerInstance.IsRunning()) { ProcessMetrics Metrics; zen::GetProcessMetrics(m_ServerInstance.GetProcessHandle(), Metrics); @@ -436,42 +345,35 @@ bool StorageServerInstance::ExclusiveLockedPtr::IsRunning() const { ZEN_ASSERT(m_Instance != nullptr); - return m_Instance->m_State.load() == HubInstanceState::Provisioned && m_Instance->m_ServerInstance.IsRunning(); + return m_Instance->m_ServerInstance.IsRunning(); } -bool +void StorageServerInstance::ExclusiveLockedPtr::Provision() { ZEN_ASSERT(m_Instance != nullptr); - return m_Instance->ProvisionLocked(); + m_Instance->ProvisionLocked(); } -bool +void StorageServerInstance::ExclusiveLockedPtr::Deprovision() { ZEN_ASSERT(m_Instance != nullptr); - return m_Instance->DeprovisionLocked(); + m_Instance->DeprovisionLocked(); } -bool +void StorageServerInstance::ExclusiveLockedPtr::Hibernate() { ZEN_ASSERT(m_Instance != nullptr); - return m_Instance->HibernateLocked(); + m_Instance->HibernateLocked(); } -bool +void StorageServerInstance::ExclusiveLockedPtr::Wake() { ZEN_ASSERT(m_Instance != nullptr); - return m_Instance->WakeLocked(); -} - -bool -StorageServerInstance::ExclusiveLockedPtr::RecoverFromCrash() -{ - ZEN_ASSERT(m_Instance != nullptr); - return m_Instance->RecoverCrashedLocked(); + m_Instance->WakeLocked(); } } // namespace zen diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h index a0ca496dc..94c47630c 100644 --- a/src/zenserver/hub/storageserverinstance.h +++ b/src/zenserver/hub/storageserverinstance.h @@ -2,7 +2,6 @@ #pragma once -#include "hubinstancestate.h" #include "resourcemetrics.h" #include <zenutil/zenserverprocess.h> @@ -38,8 +37,7 @@ public: const ResourceMetrics& GetResourceMetrics() const { return m_ResourceMetrics; } inline std::string_view GetModuleId() const { return m_ModuleId; } - inline HubInstanceState GetState() const { return m_State.load(); } - inline uint16_t GetBasePort() const { return m_Config.BasePort; }; + inline uint16_t GetBasePort() const { return m_Config.BasePort; } void GetProcessMetrics(ProcessMetrics& OutMetrics) const; #if ZEN_PLATFORM_WINDOWS @@ -63,12 +61,7 @@ public: operator bool() const { return m_Instance != nullptr; } std::string_view GetModuleId() const; - HubInstanceState GetState() const - { - ZEN_ASSERT(m_Instance); - return m_Instance->m_State.load(); - } - uint16_t GetBasePort() const + uint16_t GetBasePort() const { ZEN_ASSERT(m_Instance); return m_Instance->GetBasePort(); @@ -114,12 +107,7 @@ public: operator bool() const { return m_Instance != nullptr; } std::string_view GetModuleId() const; - HubInstanceState GetState() const - { - ZEN_ASSERT(m_Instance); - return m_Instance->m_State.load(); - } - uint16_t GetBasePort() const + uint16_t GetBasePort() const { ZEN_ASSERT(m_Instance); return m_Instance->GetBasePort(); @@ -132,15 +120,10 @@ public: return m_Instance->m_ResourceMetrics; } - // For Provision, Deprovision, Hibernate, Wake: - // true = operation performed (state changed) - // false = precondition not met (wrong state), nothing attempted - // throws = operation attempted but failed; m_State corrected before throw - [[nodiscard]] bool Provision(); - [[nodiscard]] bool Deprovision(); - [[nodiscard]] bool Hibernate(); - [[nodiscard]] bool Wake(); - [[nodiscard]] bool RecoverFromCrash(); // true = recovered; false = spawn failed (Crashed), caller must Deprovision() + cleanup + void Provision(); + void Deprovision(); + void Hibernate(); + void Wake(); private: RwLock* m_Lock = nullptr; @@ -150,12 +133,11 @@ public: [[nodiscard]] ExclusiveLockedPtr LockExclusive(bool Wait) { return ExclusiveLockedPtr(m_Lock, this, Wait); } private: - [[nodiscard]] bool ProvisionLocked(); - [[nodiscard]] bool DeprovisionLocked(); + void ProvisionLocked(); + void DeprovisionLocked(); - [[nodiscard]] bool HibernateLocked(); - [[nodiscard]] bool WakeLocked(); - [[nodiscard]] bool RecoverCrashedLocked(); // true = recovered (Provisioned); false = spawn failed (Crashed) + void HibernateLocked(); + void WakeLocked(); void UpdateMetricsLocked(); @@ -164,8 +146,7 @@ private: std::string m_ModuleId; ZenServerInstance m_ServerInstance; - std::atomic<HubInstanceState> m_State{HubInstanceState::Unprovisioned}; - std::filesystem::path m_BaseDir; + std::filesystem::path m_BaseDir; std::filesystem::path m_TempDir; ResourceMetrics m_ResourceMetrics; diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index 4d70fbddd..269de28c2 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -74,6 +74,20 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("hub", "", + "consul-health-interval-seconds", + "Interval in seconds between Consul health checks", + cxxopts::value<uint32_t>(m_ServerOptions.ConsulHealthIntervalSeconds)->default_value("10"), + "<seconds>"); + + Options.add_option("hub", + "", + "consul-deregister-after-seconds", + "Seconds after which Consul deregisters an unhealthy service", + cxxopts::value<uint32_t>(m_ServerOptions.ConsulDeregisterAfterSeconds)->default_value("30"), + "<seconds>"); + + Options.add_option("hub", + "", "hub-base-port-number", "Base port number for provisioned instances", cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber)->default_value("21000"), @@ -181,7 +195,8 @@ ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId, { return; } - if (NewState == HubInstanceState::Provisioned) + + if (NewState == HubInstanceState::Provisioning || NewState == HubInstanceState::Provisioned) { consul::ServiceRegistrationInfo ServiceInfo{ .ServiceId = std::string(ModuleId), @@ -191,8 +206,12 @@ ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId, .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)), std::make_pair("zen-hub", std::string(HubInstanceId)), std::make_pair("version", std::string(ZEN_CFG_VERSION))}, - .HealthIntervalSeconds = 10, - .DeregisterAfterSeconds = 30}; + .HealthIntervalSeconds = NewState == HubInstanceState::Provisioning + ? 0u + : m_ConsulHealthIntervalSeconds, // Disable health checks while not finished provisioning + .DeregisterAfterSeconds = NewState == HubInstanceState::Provisioning + ? 0u + : m_ConsulDeregisterAfterSeconds}; // Disable health checks while not finished provisioning if (!m_ConsulClient->RegisterService(ServiceInfo)) { @@ -219,7 +238,7 @@ ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId, ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port); } } - // Transitional states (Provisioning, Deprovisioning, Hibernating, Waking, Recovering, Crashed) + // Transitional states (Deprovisioning, Hibernating, Waking, Recovering, Crashed) // and Hibernated are intentionally ignored. } @@ -385,7 +404,9 @@ ZenHubServer::InitializeConsulRegistration(const ZenHubServerConfig& ServerConfi try { - m_ConsulClient = std::make_unique<consul::ConsulClient>(ServerConfig.ConsulEndpoint, ConsulAccessToken); + m_ConsulClient = std::make_unique<consul::ConsulClient>(ServerConfig.ConsulEndpoint, ConsulAccessToken); + m_ConsulHealthIntervalSeconds = ServerConfig.ConsulHealthIntervalSeconds; + m_ConsulDeregisterAfterSeconds = ServerConfig.ConsulDeregisterAfterSeconds; consul::ServiceRegistrationInfo Info; Info.ServiceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId); @@ -399,6 +420,8 @@ ZenHubServer::InitializeConsulRegistration(const ZenHubServerConfig& ServerConfi std::make_pair("base-port-number", fmt::format("{}", ServerConfig.HubBasePortNumber)), std::make_pair("instance-limit", fmt::format("{}", ServerConfig.HubInstanceLimit)), std::make_pair("use-job-object", fmt::format("{}", ServerConfig.HubUseJobObject))}; + Info.HealthIntervalSeconds = ServerConfig.ConsulHealthIntervalSeconds; + Info.DeregisterAfterSeconds = ServerConfig.ConsulDeregisterAfterSeconds; m_ConsulRegistration = std::make_unique<consul::ServiceRegistration>(m_ConsulClient.get(), Info); diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h index 0fb192b9f..c4a45a8fe 100644 --- a/src/zenserver/hub/zenhubserver.h +++ b/src/zenserver/hub/zenhubserver.h @@ -26,14 +26,16 @@ struct ZenHubServerConfig : public ZenServerConfig std::string InstanceId; // For use in notifications std::string ConsulEndpoint; // If set, enables Consul service registration std::string ConsulTokenEnv; // Environment variable name to read a Consul token from; defaults to CONSUL_HTTP_TOKEN if empty - uint16_t HubBasePortNumber = 21000; - int HubInstanceLimit = 1000; - bool HubUseJobObject = true; - std::string HubInstanceHttpClass = "asio"; - uint32_t HubInstanceHttpThreadCount = 0; // Automatic - int HubInstanceCoreLimit = 0; // Automatic - std::filesystem::path HubInstanceConfigPath; // Path to Lua config file - std::string HydrationTargetSpecification; // hydration/dehydration target specification + uint32_t ConsulHealthIntervalSeconds = 10; // Interval in seconds between Consul health checks + uint32_t ConsulDeregisterAfterSeconds = 30; // Seconds before Consul deregisters an unhealthy service + uint16_t HubBasePortNumber = 21000; + int HubInstanceLimit = 1000; + bool HubUseJobObject = true; + std::string HubInstanceHttpClass = "asio"; + uint32_t HubInstanceHttpThreadCount = 0; // Automatic + int HubInstanceCoreLimit = 0; // Automatic + std::filesystem::path HubInstanceConfigPath; // Path to Lua config file + std::string HydrationTargetSpecification; // hydration/dehydration target specification }; class Hub; @@ -108,6 +110,8 @@ private: std::unique_ptr<consul::ConsulClient> m_ConsulClient; std::unique_ptr<consul::ServiceRegistration> m_ConsulRegistration; + uint32_t m_ConsulHealthIntervalSeconds = 10; + uint32_t m_ConsulDeregisterAfterSeconds = 30; void InitializeState(const ZenHubServerConfig& ServerConfig); void InitializeServices(const ZenHubServerConfig& ServerConfig); |