aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub/hub.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-27 09:51:29 +0100
committerGitHub Enterprise <[email protected]>2026-03-27 09:51:29 +0100
commite811745e5c37dd38a8fb9f4bc2892525401eabbd (patch)
tree63896cabc0eb895887dc8247bb573f0dfd696afa /src/zenserver/hub/hub.cpp
parenthub async provision/deprovision/hibernate/wake (#891) (diff)
downloadzen-e811745e5c37dd38a8fb9f4bc2892525401eabbd.tar.xz
zen-e811745e5c37dd38a8fb9f4bc2892525401eabbd.zip
hub instance state refactor (#892)
- Improvement: Provisioning a hibernated instance now automatically wakes it instead of requiring an explicit wake call first - Improvement: Deprovisioning now accepts instances in Crashed or Hibernated states, not just Provisioned - Improvement: Added `--consul-health-interval-seconds` and `--consul-deregister-after-seconds` options to control Consul health check behavior (defaults: 10s and 30s) - Improvement: Consul registration now occurs when provisioning starts; health check intervals are applied once provisioning completes
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
-rw-r--r--src/zenserver/hub/hub.cpp987
1 files changed, 553 insertions, 434 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index ceba21d8d..db406947a 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -131,6 +131,8 @@ Hub::Hub(const Configuration& Config,
, m_WorkerPool(OptionalWorkerPool)
, m_BackgroundWorkLatch(1)
, m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback))
+, m_ActiveInstances(Config.InstanceLimit)
+, m_FreeActiveInstanceIndexes(Config.InstanceLimit)
{
m_HostMetrics = GetSystemMetrics();
m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024;
@@ -153,10 +155,7 @@ Hub::Hub(const Configuration& Config,
ZEN_ASSERT(uint64_t(Config.BasePortNumber) + Config.InstanceLimit <= std::numeric_limits<uint16_t>::max());
m_InstanceLookup.reserve(Config.InstanceLimit);
- m_ActiveInstances.reserve(Config.InstanceLimit);
-
- m_FreePorts.resize(Config.InstanceLimit);
- std::iota(m_FreePorts.begin(), m_FreePorts.end(), Config.BasePortNumber);
+ std::iota(m_FreeActiveInstanceIndexes.begin(), m_FreeActiveInstanceIndexes.end(), 0);
#if ZEN_PLATFORM_WINDOWS
if (m_Config.UseJobObject)
@@ -240,26 +239,11 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
{
ZEN_ASSERT(!m_ShutdownFlag.load());
StorageServerInstance::ExclusiveLockedPtr Instance;
- bool IsNewInstance = false;
- uint16_t AllocatedPort = 0;
+ bool IsNewInstance = false;
+ size_t ActiveInstanceIndex = (size_t)-1;
+ HubInstanceState OldState = HubInstanceState::Unprovisioned;
{
RwLock::ExclusiveLockScope _(m_Lock);
- auto RestoreAllocatedPort = MakeGuard([this, ModuleId, &IsNewInstance, &AllocatedPort]() {
- if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId)))
- {
- m_FreePorts.push_back(AllocatedPort);
- AllocatedPort = 0;
- }
- });
-
- if (auto It = m_ProvisioningModules.find(std::string(ModuleId)); It != m_ProvisioningModules.end())
- {
- // Same operation already in flight -- return the already-allocated port.
- // RestoreAllocatedPort is a no-op here because IsNewInstance is still false
- // (we return before line 273 where it is set), so no port is double-freed.
- OutInfo.Port = It->second;
- return Response{EResponseCode::Accepted};
- }
if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end())
{
@@ -273,44 +257,48 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
IsNewInstance = true;
- AllocatedPort = m_FreePorts.front();
- ZEN_ASSERT(AllocatedPort != 0);
- m_FreePorts.pop_front();
+ ActiveInstanceIndex = m_FreeActiveInstanceIndexes.front();
+ m_FreeActiveInstanceIndexes.pop_front();
+ ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex);
- auto NewInstance = std::make_unique<StorageServerInstance>(
- m_RunEnvironment,
- StorageServerInstance::Configuration{.BasePort = AllocatedPort,
- .HydrationTempPath = m_HydrationTempPath,
- .HydrationTargetSpecification = m_HydrationTargetSpecification,
- .HttpThreadCount = m_Config.InstanceHttpThreadCount,
- .CoreLimit = m_Config.InstanceCoreLimit,
- .ConfigPath = m_Config.InstanceConfigPath},
- ModuleId);
+ try
+ {
+ auto NewInstance = std::make_unique<StorageServerInstance>(
+ m_RunEnvironment,
+ StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
+ .HydrationTempPath = m_HydrationTempPath,
+ .HydrationTargetSpecification = m_HydrationTargetSpecification,
+ .HttpThreadCount = m_Config.InstanceHttpThreadCount,
+ .CoreLimit = m_Config.InstanceCoreLimit,
+ .ConfigPath = m_Config.InstanceConfigPath},
+ ModuleId);
#if ZEN_PLATFORM_WINDOWS
- if (m_JobObject.IsValid())
- {
- NewInstance->SetJobObject(&m_JobObject);
- }
+ if (m_JobObject.IsValid())
+ {
+ NewInstance->SetJobObject(&m_JobObject);
+ }
#endif
- Instance = NewInstance->LockExclusive(/*Wait*/ true);
+ Instance = NewInstance->LockExclusive(/*Wait*/ true);
- size_t ActiveInstanceIndex = (size_t)-1;
- if (!m_FreeActiveInstanceIndexes.empty())
- {
- ActiveInstanceIndex = m_FreeActiveInstanceIndexes.back();
- m_FreeActiveInstanceIndexes.pop_back();
- ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex);
- m_ActiveInstances[ActiveInstanceIndex] = std::move(NewInstance);
+ m_ActiveInstances[ActiveInstanceIndex].Instance = std::move(NewInstance);
+ m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex);
+ // Set Provisioning while both hub lock and instance lock are held so that any
+ // concurrent Deprovision sees the in-flight state, not Unprovisioned.
+ OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioning);
}
- else
+ catch (const std::exception&)
{
- ActiveInstanceIndex = m_ActiveInstances.size();
- m_ActiveInstances.emplace_back(std::move(NewInstance));
+ Instance = {};
+ m_ActiveInstances[ActiveInstanceIndex].Instance.reset();
+ m_ActiveInstances[ActiveInstanceIndex].State.store(HubInstanceState::Unprovisioned);
+ m_InstanceLookup.erase(std::string(ModuleId));
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ throw;
}
- ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
- m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex);
+
+ OutInfo.Port = GetInstanceIndexAssignedPort(ActiveInstanceIndex);
ZEN_INFO("Created new storage server instance for module '{}'", ModuleId);
@@ -322,36 +310,68 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
}
else
{
- const size_t ActiveInstanceIndex = It->second;
+ ActiveInstanceIndex = It->second;
ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex);
- if (m_RecoveringModules.contains(std::string(ModuleId)))
- {
- ZEN_WARN("Attempted to provision module '{}' which is currently recovering", ModuleId);
- return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)};
- }
+ HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load();
- std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance;
ZEN_ASSERT(InstanceRaw);
- if (InstanceRaw->GetState() == HubInstanceState::Provisioned)
+ OutInfo.Port = InstanceRaw->GetBasePort();
+
+ switch (CurrentState)
{
- OutInfo.Port = InstanceRaw->GetBasePort();
- return Response{EResponseCode::Completed};
+ case HubInstanceState::Provisioning:
+ return Response{EResponseCode::Accepted};
+ case HubInstanceState::Crashed:
+ case HubInstanceState::Unprovisioned:
+ break;
+ case HubInstanceState::Provisioned:
+ return Response{EResponseCode::Completed};
+ case HubInstanceState::Hibernated:
+ _.ReleaseNow();
+ return Wake(std::string(ModuleId));
+ default:
+ return Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))};
}
- Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
- AllocatedPort = InstanceRaw->GetBasePort();
- }
+ Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
- m_ProvisioningModules.emplace(std::string(ModuleId), AllocatedPort);
+ // Re-validate state after acquiring the instance lock: a concurrent Provision may have
+ // completed between our hub-lock read and LockExclusive, transitioning the state away
+ // from Crashed/Unprovisioned.
+ HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+ if (ActualState != HubInstanceState::Crashed && ActualState != HubInstanceState::Unprovisioned)
+ {
+ Instance = {};
+ if (ActualState == HubInstanceState::Provisioned)
+ {
+ return Response{EResponseCode::Completed};
+ }
+ if (ActualState == HubInstanceState::Provisioning)
+ {
+ return Response{EResponseCode::Accepted};
+ }
+ return Response{
+ EResponseCode::Rejected,
+ fmt::format("Module '{}' state changed to '{}' before provision could proceed", ModuleId, ToString(ActualState))};
+ }
+ // Set Provisioning while both hub lock and instance lock are held so that any
+ // concurrent Deprovision sees the in-flight state, not Crashed/Unprovisioned.
+ OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioning);
+ }
}
// NOTE: done while not holding the hub lock, to avoid blocking other operations.
- // m_ProvisioningModules tracks which modules are being provisioned, blocking
- // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
+ // Both hub-lock paths above set OldState and updated the state to Provisioning before
+ // releasing the hub lock, so concurrent operations already see the in-flight state.
ZEN_ASSERT(Instance);
+ ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
+
+ NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Provisioning, OutInfo.Port, {});
if (m_WorkerPool)
{
@@ -361,13 +381,14 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
m_WorkerPool->ScheduleWork(
[this,
ModuleId = std::string(ModuleId),
- AllocatedPort,
+ ActiveInstanceIndex,
+ OldState,
IsNewInstance,
Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() {
auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
try
{
- CompleteProvision(*Instance, AllocatedPort, IsNewInstance);
+ CompleteProvision(*Instance, ActiveInstanceIndex, OldState, IsNewInstance);
}
catch (const std::exception& Ex)
{
@@ -378,121 +399,100 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
}
catch (const std::exception& DispatchEx)
{
+ // Dispatch failed: undo latch increment and roll back state.
ZEN_ERROR("Failed async dispatch provision of module '{}': {}", ModuleId, DispatchEx.what());
m_BackgroundWorkLatch.CountDown();
- if (IsNewInstance)
- {
- try
- {
- AbortProvision(ModuleId);
- }
- catch (const std::exception& DestroyEx)
- {
- ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what());
- }
- }
-
- Instance = {};
+ // dispatch failed before the lambda ran, so ActiveInstance::State is still Provisioning
+ NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, OldState, OutInfo.Port, {});
+ std::unique_ptr<StorageServerInstance> DestroyInstance;
{
- RwLock::ExclusiveLockScope _(m_Lock);
- m_ProvisioningModules.erase(std::string(ModuleId));
- if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId)))
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex);
+ if (IsNewInstance)
{
- m_FreePorts.push_back(AllocatedPort);
+ DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(std::string(ModuleId));
}
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState);
}
+ DestroyInstance.reset();
throw;
}
}
else
{
- CompleteProvision(Instance, AllocatedPort, IsNewInstance);
+ CompleteProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance);
}
- OutInfo.Port = AllocatedPort;
-
return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
}
void
-Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance)
+Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance,
+ size_t ActiveInstanceIndex,
+ HubInstanceState OldState,
+ bool IsNewInstance)
{
- const std::string ModuleId(Instance.GetModuleId());
- uint16_t BasePort = Instance.GetBasePort();
- std::string BaseUri; // TODO?
- HubInstanceState OldState = Instance.GetState();
- HubInstanceState NewState = OldState;
- InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
-
- auto RemoveProvisioningModule = MakeGuard([&] {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_ProvisioningModules.erase(std::string(ModuleId));
- if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId)))
- {
- m_FreePorts.push_back(AllocatedPort);
- }
- });
+ const std::string ModuleId(Instance.GetModuleId());
+ const uint16_t Port = Instance.GetBasePort();
+ std::string BaseUri; // TODO?
if (m_ShutdownFlag.load() == false)
{
try
{
- (void)Instance.Provision(); // false = already in target state (idempotent); not an error
- NewState = Instance.GetState();
- AllocatedPort = 0;
- Instance = {};
+ switch (OldState)
+ {
+ case HubInstanceState::Crashed:
+ case HubInstanceState::Unprovisioned:
+ Instance.Provision();
+ break;
+ case HubInstanceState::Hibernated:
+ ZEN_ASSERT(false); // unreachable: Provision redirects Hibernated->Wake before setting Provisioning
+ break;
+ default:
+ ZEN_ASSERT(false);
+ }
+ UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Provisioned, Port, BaseUri);
+ Instance = {};
+ return;
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
+ // Instance will be notified and removed below.
}
}
- if (Instance)
+ if (IsNewInstance)
{
- NewState = Instance.GetState();
+ NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Unprovisioned, Port, {});
Instance = {};
-
- if (IsNewInstance)
+ std::unique_ptr<StorageServerInstance> DestroyInstance;
{
- try
- {
- AbortProvision(ModuleId);
- NewState = HubInstanceState::Unprovisioned;
- }
- catch (const std::exception& DestroyEx)
- {
- ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what());
- }
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex);
+ DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(std::string(ModuleId));
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned);
}
+ DestroyInstance.reset();
}
-}
-
-void
-Hub::AbortProvision(std::string_view ModuleId)
-{
- std::unique_ptr<StorageServerInstance> DestroyInstance;
+ else
{
- RwLock::ExclusiveLockScope _(m_Lock);
- if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end())
- {
- const size_t ActiveInstanceIndex = It->second;
- ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]);
- ZEN_ASSERT(DestroyInstance);
- ZEN_ASSERT(!m_ActiveInstances[ActiveInstanceIndex]);
- m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
- m_InstanceLookup.erase(It);
- }
- else
- {
- ZEN_WARN("AbortProvision called for unknown module '{}'", ModuleId);
- }
+ // OldState = Crashed: restore without cleanup (instance stays in lookup)
+ NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, OldState, Port, {});
+ UpdateInstanceState(Instance, ActiveInstanceIndex, OldState);
+ Instance = {};
}
- DestroyInstance.reset();
}
Hub::Response
@@ -505,30 +505,11 @@ Hub::Deprovision(const std::string& ModuleId)
Hub::Response
Hub::InternalDeprovision(const std::string& ModuleId)
{
- std::unique_ptr<StorageServerInstance> RawInstance;
StorageServerInstance::ExclusiveLockedPtr Instance;
-
+ size_t ActiveInstanceIndex = (size_t)-1;
{
RwLock::ExclusiveLockScope _(m_Lock);
- if (m_ProvisioningModules.contains(ModuleId))
- {
- ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId);
-
- return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently being provisioned", ModuleId)};
- }
-
- if (m_RecoveringModules.contains(ModuleId))
- {
- ZEN_WARN("Attempted to deprovision module '{}' which is currently recovering", ModuleId);
- return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)};
- }
-
- if (m_DeprovisioningModules.contains(ModuleId))
- {
- return Response{EResponseCode::Accepted};
- }
-
if (auto It = m_InstanceLookup.find(ModuleId); It == m_InstanceLookup.end())
{
ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId);
@@ -537,41 +518,64 @@ Hub::InternalDeprovision(const std::string& ModuleId)
}
else
{
- const size_t ActiveInstanceIndex = It->second;
+ ActiveInstanceIndex = It->second;
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- RawInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]);
- ZEN_ASSERT(RawInstance != nullptr);
- m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
- m_InstanceLookup.erase(It);
- m_DeprovisioningModules.emplace(ModuleId, RawInstance->GetBasePort());
+ HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+
+ switch (CurrentState)
+ {
+ case HubInstanceState::Deprovisioning:
+ return Response{EResponseCode::Accepted};
+ case HubInstanceState::Crashed:
+ case HubInstanceState::Hibernated:
+ case HubInstanceState::Provisioned:
+ break;
+ case HubInstanceState::Unprovisioned:
+ return Response{EResponseCode::Completed};
+ case HubInstanceState::Recovering:
+ // Recovering is watchdog-managed; reject to avoid interfering with the in-progress
+ // recovery. The watchdog will transition to Provisioned or Unprovisioned, after
+ // which deprovision can be retried.
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)};
+ default:
+ return Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))};
+ }
+
+ std::unique_ptr<StorageServerInstance>& RawInstance = m_ActiveInstances[ActiveInstanceIndex].Instance;
+ ZEN_ASSERT(RawInstance != nullptr);
Instance = RawInstance->LockExclusive(/*Wait*/ true);
}
}
// NOTE: done while not holding the hub lock, to avoid blocking other operations.
- // m_DeprovisioningModules tracks which modules are being deprovisioned, blocking
- // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
+ // The exclusive instance lock acquired above prevents concurrent LockExclusive callers
+ // from modifying instance state. The state transition to Deprovisioning happens below,
+ // after the hub lock is released.
- ZEN_ASSERT(RawInstance);
ZEN_ASSERT(Instance);
+ ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
+
+ HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Deprovisioning);
+ const uint16_t Port = Instance.GetBasePort();
+ NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Deprovisioning, Port, {});
if (m_WorkerPool)
{
+ std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr =
+ std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance));
+
m_BackgroundWorkLatch.AddCount(1);
try
{
m_WorkerPool->ScheduleWork(
- [this,
- ModuleId = std::string(ModuleId),
- Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)),
- RawInstance = std::shared_ptr<StorageServerInstance>(std::move(RawInstance))]() mutable {
+ [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable {
auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
try
{
- CompleteDeprovision(*Instance);
- RawInstance.reset();
+ CompleteDeprovision(*Instance, ActiveInstanceIndex);
}
catch (const std::exception& Ex)
{
@@ -582,67 +586,69 @@ Hub::InternalDeprovision(const std::string& ModuleId)
}
catch (const std::exception& DispatchEx)
{
+ // Dispatch failed: undo latch increment and roll back state.
ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what());
m_BackgroundWorkLatch.CountDown();
- HubInstanceState OldState = Instance.GetState();
- HubInstanceState NewState = OldState;
- uint16_t BasePort = Instance.GetBasePort();
- InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, /*BaseUri*/ {});
-
- // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly.
- Instance = {};
- NewState = HubInstanceState::Unprovisioned;
-
+ NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, OldState, Port, {});
{
- RwLock::ExclusiveLockScope _(m_Lock);
- m_DeprovisioningModules.erase(std::string(ModuleId));
- m_FreePorts.push_back(BasePort);
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState);
}
+
throw;
}
}
else
{
- CompleteDeprovision(Instance);
- RawInstance.reset();
+ CompleteDeprovision(Instance, ActiveInstanceIndex);
}
return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
}
void
-Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance)
+Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex)
{
- const std::string ModuleId(Instance.GetModuleId());
- uint16_t BasePort = Instance.GetBasePort();
- std::string BaseUri; // TODO?
- HubInstanceState OldState = Instance.GetState();
- HubInstanceState NewState = OldState;
- InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
-
- auto _ = MakeGuard([&] {
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_DeprovisioningModules.erase(std::string(ModuleId));
- m_FreePorts.push_back(BasePort);
- }
- });
+ const std::string ModuleId(Instance.GetModuleId());
+ const uint16_t Port = Instance.GetBasePort();
try
{
- (void)Instance.Deprovision();
- NewState = Instance.GetState();
- Instance = {};
+ Instance.Deprovision();
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to deprovision storage server instance for module '{}': {}", ModuleId, Ex.what());
- // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly.
- NewState = HubInstanceState::Unprovisioned;
+ // Effectively unreachable: Shutdown() never throws and Dehydrate() failures are swallowed
+ // by DeprovisionLocked. Kept as a safety net; if somehow reached, transition to Crashed
+ // so the watchdog can attempt recovery.
Instance = {};
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Crashed);
+ }
+ NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Crashed, Port, {});
throw;
}
+
+ NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Unprovisioned, Port, {});
+ Instance = {};
+
+ std::unique_ptr<StorageServerInstance> DeleteInstance;
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ auto It = m_InstanceLookup.find(std::string(ModuleId));
+ ZEN_ASSERT_SLOW(It != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(It->second == ActiveInstanceIndex);
+ DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(It);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned);
+ }
+ DeleteInstance.reset();
}
Hub::Response
@@ -651,59 +657,64 @@ Hub::Hibernate(const std::string& ModuleId)
ZEN_ASSERT(!m_ShutdownFlag.load());
StorageServerInstance::ExclusiveLockedPtr Instance;
+ size_t ActiveInstanceIndex = (size_t)-1;
{
RwLock::ExclusiveLockScope _(m_Lock);
- if (m_HibernatingModules.contains(ModuleId))
- {
- return Response{EResponseCode::Accepted};
- }
-
- if (IsModuleInFlightLocked(ModuleId))
- {
- return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)};
- }
-
auto It = m_InstanceLookup.find(ModuleId);
if (It == m_InstanceLookup.end())
{
return Response{EResponseCode::NotFound};
}
- const size_t ActiveInstanceIndex = It->second;
+ ActiveInstanceIndex = It->second;
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
- ZEN_ASSERT(InstanceRaw);
- if (InstanceRaw->GetState() == HubInstanceState::Hibernated)
+ HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+
+ switch (CurrentState)
{
- return Response{EResponseCode::Completed};
+ case HubInstanceState::Hibernating:
+ return Response{EResponseCode::Accepted};
+ case HubInstanceState::Provisioned:
+ break;
+ case HubInstanceState::Hibernated:
+ return Response{EResponseCode::Completed};
+ default:
+ return Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))};
}
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance;
+ ZEN_ASSERT(InstanceRaw);
+
Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
- m_HibernatingModules.emplace(ModuleId, Instance.GetBasePort());
+
+ // Re-validate state after acquiring the instance lock: WatchDog may have transitioned
+ // Provisioned -> Crashed between our hub-lock read and the LockExclusive call above.
+
+ HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+ if (ActualState != HubInstanceState::Provisioned)
+ {
+ Instance = {};
+ return Response{
+ EResponseCode::Rejected,
+ fmt::format("Module '{}' state changed to '{}' before hibernate could proceed", ModuleId, ToString(ActualState))};
+ }
}
// NOTE: done while not holding the hub lock, to avoid blocking other operations.
- // m_HibernatingModules tracks which modules are being hibernated, blocking
- // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
+ // Any concurrent caller that acquired the hub lock and saw Provisioned will now block on
+ // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below
+ // will have already changed the state and the re-validate above will reject it.
ZEN_ASSERT(Instance);
+ ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
- // Validate state while holding the exclusive instance lock (state cannot change).
- // This gives a synchronous error response for invalid-state calls on both the sync
- // and async paths, matching the existing behaviour.
- if (Instance.GetState() != HubInstanceState::Provisioned)
- {
- const Response HibernateResp =
- Response{EResponseCode::Rejected,
- fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState()))};
- Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock)
- RwLock::ExclusiveLockScope _(m_Lock);
- m_HibernatingModules.erase(ModuleId);
- return HibernateResp;
- }
+ HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Hibernating);
+ const uint16_t Port = Instance.GetBasePort();
+ NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Hibernating, Port, {});
if (m_WorkerPool)
{
@@ -713,11 +724,13 @@ Hub::Hibernate(const std::string& ModuleId)
m_WorkerPool->ScheduleWork(
[this,
ModuleId = std::string(ModuleId),
+ ActiveInstanceIndex,
+ OldState,
Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() {
auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
try
{
- CompleteHibernate(*Instance);
+ CompleteHibernate(*Instance, ActiveInstanceIndex, OldState);
}
catch (const std::exception& Ex)
{
@@ -728,58 +741,47 @@ Hub::Hibernate(const std::string& ModuleId)
}
catch (const std::exception& DispatchEx)
{
- // Dispatch failed: undo the latch increment and tracking-set membership.
- // State has not changed so no callback is needed.
- // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock)
+ // Dispatch failed: undo latch increment and roll back state.
ZEN_ERROR("Failed async dispatch hibernate of module '{}': {}", ModuleId, DispatchEx.what());
m_BackgroundWorkLatch.CountDown();
- Instance = {};
+
+ NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, OldState, Port, {});
{
- RwLock::ExclusiveLockScope _(m_Lock);
- m_HibernatingModules.erase(std::string(ModuleId));
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState);
}
+
throw;
}
}
else
{
- CompleteHibernate(Instance);
+ CompleteHibernate(Instance, ActiveInstanceIndex, OldState);
}
return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
}
void
-Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance)
+Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState)
{
- const std::string ModuleId(Instance.GetModuleId());
- uint16_t BasePort = Instance.GetBasePort();
- std::string BaseUri; // TODO?
- HubInstanceState OldState = Instance.GetState();
- HubInstanceState NewState = OldState;
- InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
-
- auto RemoveHibernatingModule = MakeGuard([&] {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_HibernatingModules.erase(std::string(ModuleId));
- });
+ const std::string ModuleId(Instance.GetModuleId());
+ const uint16_t Port = Instance.GetBasePort();
try
{
- if (!Instance.Hibernate())
- {
- ZEN_WARN("Hibernate returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState()));
- NewState = Instance.GetState();
- Instance = {};
- return;
- }
- NewState = Instance.GetState();
+ Instance.Hibernate();
+ UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Hibernated);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, HubInstanceState::Hibernated, Port, {});
Instance = {};
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", ModuleId, Ex.what());
- NewState = Instance.GetState();
+ UpdateInstanceState(Instance, ActiveInstanceIndex, OldState);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, OldState, Port, {});
Instance = {};
throw;
}
@@ -791,59 +793,62 @@ Hub::Wake(const std::string& ModuleId)
ZEN_ASSERT(!m_ShutdownFlag.load());
StorageServerInstance::ExclusiveLockedPtr Instance;
+ size_t ActiveInstanceIndex = (size_t)-1;
{
RwLock::ExclusiveLockScope _(m_Lock);
- if (m_WakingModules.contains(ModuleId))
- {
- return Response{EResponseCode::Accepted};
- }
-
- if (IsModuleInFlightLocked(ModuleId))
- {
- return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)};
- }
-
auto It = m_InstanceLookup.find(ModuleId);
if (It == m_InstanceLookup.end())
{
return Response{EResponseCode::NotFound};
}
- const size_t ActiveInstanceIndex = It->second;
+ ActiveInstanceIndex = It->second;
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
- ZEN_ASSERT(InstanceRaw);
- if (InstanceRaw->GetState() == HubInstanceState::Provisioned)
+ HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+
+ switch (CurrentState)
{
- return Response{EResponseCode::Completed};
+ case HubInstanceState::Waking:
+ return Response{EResponseCode::Accepted};
+ case HubInstanceState::Hibernated:
+ break;
+ case HubInstanceState::Provisioned:
+ return Response{EResponseCode::Completed};
+ default:
+ return Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))};
}
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance;
+ ZEN_ASSERT(InstanceRaw);
+
Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
- m_WakingModules.emplace(ModuleId, Instance.GetBasePort());
+
+ // Re-validate state after acquiring the instance lock: a concurrent Wake or Deprovision may
+ // have transitioned Hibernated -> something else between our hub-lock read and LockExclusive.
+ HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+ if (ActualState != HubInstanceState::Hibernated)
+ {
+ Instance = {};
+ return Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' state changed to '{}' before wake could proceed", ModuleId, ToString(ActualState))};
+ }
}
// NOTE: done while not holding the hub lock, to avoid blocking other operations.
- // m_WakingModules tracks which modules are being woken, blocking
- // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
+ // Any concurrent caller that acquired the hub lock and saw Hibernated will now block on
+ // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below
+ // will have already changed the state and the re-validate above will reject it.
ZEN_ASSERT(Instance);
+ ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
- // Validate state while holding the exclusive instance lock (state cannot change).
- // This gives a synchronous error response for invalid-state calls on both the sync
- // and async paths, matching the existing behaviour.
- if (Instance.GetState() != HubInstanceState::Hibernated)
- {
- const Response WakeResp =
- Response{EResponseCode::Rejected,
- fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState()))};
- Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock)
- RwLock::ExclusiveLockScope _(m_Lock);
- m_WakingModules.erase(ModuleId);
- return WakeResp;
- }
+ HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Waking);
+ const uint16_t Port = Instance.GetBasePort();
+ NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Waking, Port, {});
if (m_WorkerPool)
{
@@ -853,11 +858,13 @@ Hub::Wake(const std::string& ModuleId)
m_WorkerPool->ScheduleWork(
[this,
ModuleId = std::string(ModuleId),
+ ActiveInstanceIndex,
+ OldState,
Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() {
auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
try
{
- CompleteWake(*Instance);
+ CompleteWake(*Instance, ActiveInstanceIndex, OldState);
}
catch (const std::exception& Ex)
{
@@ -868,58 +875,47 @@ Hub::Wake(const std::string& ModuleId)
}
catch (const std::exception& DispatchEx)
{
- // Dispatch failed: undo the latch increment and tracking-set membership.
- // State has not changed so no callback is needed.
- // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock)
+ // Dispatch failed: undo latch increment and roll back state.
ZEN_ERROR("Failed async dispatch wake of module '{}': {}", ModuleId, DispatchEx.what());
m_BackgroundWorkLatch.CountDown();
- Instance = {};
+
+ NotifyStateUpdate(ModuleId, HubInstanceState::Waking, OldState, Port, {});
{
- RwLock::ExclusiveLockScope _(m_Lock);
- m_WakingModules.erase(std::string(ModuleId));
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState);
}
+
throw;
}
}
else
{
- CompleteWake(Instance);
+ CompleteWake(Instance, ActiveInstanceIndex, OldState);
}
return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
}
void
-Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance)
+Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState)
{
- const std::string ModuleId(Instance.GetModuleId());
- uint16_t BasePort = Instance.GetBasePort();
- std::string BaseUri; // TODO?
- HubInstanceState OldState = Instance.GetState();
- HubInstanceState NewState = OldState;
- InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
-
- auto RemoveWakingModule = MakeGuard([&] {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_WakingModules.erase(std::string(ModuleId));
- });
+ const std::string ModuleId(Instance.GetModuleId());
+ const uint16_t Port = Instance.GetBasePort();
try
{
- if (!Instance.Wake())
- {
- ZEN_WARN("Wake returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState()));
- NewState = Instance.GetState();
- Instance = {};
- return;
- }
- NewState = Instance.GetState();
+ Instance.Wake();
+ UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Waking, HubInstanceState::Provisioned, Port, {});
Instance = {};
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", ModuleId, Ex.what());
- NewState = Instance.GetState();
+ UpdateInstanceState(Instance, ActiveInstanceIndex, OldState);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Waking, OldState, Port, {});
Instance = {};
throw;
}
@@ -935,10 +931,10 @@ Hub::Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo)
{
const size_t ActiveInstanceIndex = It->second;
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex];
+ const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance;
ZEN_ASSERT(Instance);
InstanceInfo Info{
- Instance->GetState(),
+ m_ActiveInstances[ActiveInstanceIndex].State.load(),
std::chrono::system_clock::now() // TODO
};
Instance->GetProcessMetrics(Info.Metrics);
@@ -959,10 +955,10 @@ Hub::EnumerateModules(std::function<void(std::string_view ModuleId, const Instan
RwLock::SharedLockScope _(m_Lock);
for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup)
{
- const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex];
+ const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance;
ZEN_ASSERT(Instance);
InstanceInfo Info{
- Instance->GetState(),
+ m_ActiveInstances[ActiveInstanceIndex].State.load(),
std::chrono::system_clock::now() // TODO
};
Instance->GetProcessMetrics(Info.Metrics);
@@ -1007,152 +1003,159 @@ Hub::UpdateStats()
bool
Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
{
- if (m_DeprovisioningModules.contains(std::string(ModuleId)))
- {
- OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId);
-
- return false;
- }
-
- if (m_ProvisioningModules.contains(std::string(ModuleId)))
- {
- OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId);
-
- return false;
- }
-
- if (gsl::narrow_cast<int>(m_InstanceLookup.size()) >= m_Config.InstanceLimit)
+ ZEN_UNUSED(ModuleId);
+ if (m_FreeActiveInstanceIndexes.empty())
{
OutReason = fmt::format("instance limit ({}) exceeded", m_Config.InstanceLimit);
return false;
}
- // Since deprovisioning happens outside the lock and we don't return the port until the instance is fully shut down, we might be below
- // the instance count limit but with no free ports available
- if (m_FreePorts.empty())
- {
- OutReason = fmt::format("no free ports available, deprovisioning of instances might be in flight ({})",
- m_Config.InstanceLimit - m_InstanceLookup.size());
-
- return false;
- }
-
// TODO: handle additional resource metrics
return true;
}
-bool
-Hub::IsModuleInFlightLocked(std::string_view ModuleId) const
+uint16_t
+Hub::GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const
+{
+ return gsl::narrow<uint16_t>(m_Config.BasePortNumber + ActiveInstanceIndex);
+}
+
+HubInstanceState
+Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState)
{
- const std::string Key(ModuleId);
- return m_ProvisioningModules.contains(Key) || m_DeprovisioningModules.contains(Key) || m_HibernatingModules.contains(Key) ||
- m_WakingModules.contains(Key) || m_RecoveringModules.contains(Key);
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+ ZEN_ASSERT_SLOW([](HubInstanceState From, HubInstanceState To) {
+ switch (From)
+ {
+ case HubInstanceState::Unprovisioned:
+ return To == HubInstanceState::Provisioning;
+ case HubInstanceState::Provisioned:
+ return To == HubInstanceState::Hibernating || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Crashed;
+ case HubInstanceState::Hibernated:
+ return To == HubInstanceState::Waking || To == HubInstanceState::Deprovisioning;
+ case HubInstanceState::Crashed:
+ return To == HubInstanceState::Provisioning || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Recovering;
+ case HubInstanceState::Provisioning:
+ return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned || To == HubInstanceState::Crashed;
+ case HubInstanceState::Hibernating:
+ return To == HubInstanceState::Hibernated || To == HubInstanceState::Provisioned;
+ case HubInstanceState::Waking:
+ return To == HubInstanceState::Provisioned || To == HubInstanceState::Hibernated;
+ case HubInstanceState::Deprovisioning:
+ return To == HubInstanceState::Unprovisioned || To == HubInstanceState::Provisioned || To == HubInstanceState::Hibernated ||
+ To == HubInstanceState::Crashed;
+ case HubInstanceState::Recovering:
+ return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned;
+ }
+ return false;
+ }(m_ActiveInstances[ActiveInstanceIndex].State.load(), NewState));
+ return m_ActiveInstances[ActiveInstanceIndex].State.exchange(NewState);
}
void
Hub::AttemptRecoverInstance(std::string_view ModuleId)
{
StorageServerInstance::ExclusiveLockedPtr Instance;
+ size_t ActiveInstanceIndex = (size_t)-1;
{
RwLock::ExclusiveLockScope _(m_Lock);
- if (IsModuleInFlightLocked(ModuleId))
- {
- return;
- }
-
auto It = m_InstanceLookup.find(std::string(ModuleId));
if (It == m_InstanceLookup.end())
{
return;
}
- const size_t ActiveInstanceIndex = It->second;
+ ActiveInstanceIndex = It->second;
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true);
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance;
+ ZEN_ASSERT(InstanceRaw);
+ HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+ if (CurrentState != HubInstanceState::Crashed)
+ {
+ return;
+ }
- // Definitive check while hub lock is held: exclusive instance lock is also held,
- // so state cannot change under us. Bail if state changed (e.g. concurrent deprovision)
- // or the process restarted since the watchdog fired.
- if (Instance.GetState() != HubInstanceState::Provisioned || Instance.IsRunning())
+ Instance = m_ActiveInstances[ActiveInstanceIndex].Instance->LockExclusive(/*Wait*/ false);
+ if (!Instance)
{
+ // Instance lock is held by another operation; the watchdog will retry on the next cycle if the state is still Crashed.
return;
}
- m_RecoveringModules.emplace(std::string(ModuleId));
+ ZEN_ASSERT(!Instance.IsRunning());
+
+ (void)UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Recovering);
}
ZEN_ASSERT(Instance);
+ ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
+ ZEN_ASSERT_SLOW(m_ActiveInstances[ActiveInstanceIndex].State.load() == HubInstanceState::Recovering);
- const uint16_t BasePort = Instance.GetBasePort();
- std::string BaseUri; // TODO?
- HubInstanceState OldState = Instance.GetState();
- HubInstanceState NewState = OldState;
+ NotifyStateUpdate(ModuleId, HubInstanceState::Crashed, HubInstanceState::Recovering, Instance.GetBasePort(), /*BaseUri*/ {});
- auto RemoveRecoveringModule = MakeGuard([&] {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_RecoveringModules.erase(std::string(ModuleId));
- });
-
- if (Instance.RecoverFromCrash())
- {
- // Spawn succeeded -- instance is back in Provisioned state.
- NewState = Instance.GetState();
- Instance = {};
- OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri);
- return;
- }
-
- // Spawn failed -- instance is now in Crashed state. Dehydrate before tearing down
- // so any salvageable data is preserved.
+ // Dehydrate before trying to recover so any salvageable data is preserved.
try
{
- (void)Instance.Deprovision();
+ Instance.Deprovision();
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what());
- }
- Instance = {};
-
- std::unique_ptr<StorageServerInstance> DestroyInstance;
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end())
+ NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Unprovisioned, Instance.GetBasePort(), /*BaseUri*/ {});
+ Instance = {};
+ std::unique_ptr<StorageServerInstance> DestroyInstance;
{
- const size_t ActiveInstanceIndex = It->second;
- ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]);
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ auto It = m_InstanceLookup.find(std::string(ModuleId));
+ ZEN_ASSERT_SLOW(It != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(ActiveInstanceIndex == It->second);
+
+ DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
m_InstanceLookup.erase(It);
+ (void)UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned);
}
- m_FreePorts.push_back(BasePort);
- m_RecoveringModules.erase(std::string(ModuleId));
+ DestroyInstance.reset();
+ return;
}
- RemoveRecoveringModule.Dismiss();
try
{
- DestroyInstance.reset();
- NewState = HubInstanceState::Unprovisioned;
+ Instance.Provision();
+ UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Provisioned, Instance.GetBasePort(), /*BaseUri*/ {});
+ Instance = {};
}
catch (const std::exception& Ex)
{
- ZEN_ERROR("Failed to destroy recovered instance for module '{}': {}", ModuleId, Ex.what());
- }
+ ZEN_ERROR("Failed to reprovision instance for module '{}' during crash recovery reprovision: {}", ModuleId, Ex.what());
+ NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Unprovisioned, Instance.GetBasePort(), /*BaseUri*/ {});
+ Instance = {};
+ std::unique_ptr<StorageServerInstance> DestroyInstance;
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ auto It = m_InstanceLookup.find(std::string(ModuleId));
+ ZEN_ASSERT_SLOW(It != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(ActiveInstanceIndex == It->second);
- // Notify after all cleanup -- port is back in m_FreePorts and the callback sees
- // a consistent end-state: module gone, transition complete.
- OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri);
+ DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(It);
+ (void)UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned);
+ }
+ DestroyInstance.reset();
+ return;
+ }
}
void
Hub::WatchDog()
{
- constexpr uint64_t WatchDogWakeupTimeMs = 5000;
+ constexpr uint64_t WatchDogWakeupTimeMs = 3000;
constexpr uint64_t WatchDogProcessingTimeMs = 500;
size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0
@@ -1179,7 +1182,7 @@ Hub::WatchDog()
{
CheckInstanceIndex = 0;
}
- StorageServerInstance* Instance = m_ActiveInstances[CheckInstanceIndex].get();
+ StorageServerInstance* Instance = m_ActiveInstances[CheckInstanceIndex].Instance.get();
if (Instance)
{
LockedInstance = Instance->LockShared(/*Wait*/ false);
@@ -1198,14 +1201,19 @@ Hub::WatchDog()
{
LockedInstance.UpdateMetrics();
}
- else if (LockedInstance.GetState() == HubInstanceState::Provisioned)
+ else if (m_ActiveInstances[CheckInstanceIndex].State.load() == HubInstanceState::Provisioned)
{
// Process is not running but state says it should be - instance died unexpectedly.
const std::string ModuleId(LockedInstance.GetModuleId());
+ const uint16_t Port = LockedInstance.GetBasePort();
+ UpdateInstanceState(LockedInstance, CheckInstanceIndex, HubInstanceState::Crashed);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {});
LockedInstance = {};
AttemptRecoverInstance(ModuleId);
}
- // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering, Crashed) - expected, skip.
+ // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip.
+ // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance
+ // lock was busy on a previous cycle and recovery is already pending.
LockedInstance = {};
// Rate-limit: pause briefly between live-instance checks and respond to shutdown.
@@ -1221,11 +1229,11 @@ Hub::WatchDog()
}
void
-Hub::OnStateUpdate(std::string_view ModuleId,
- HubInstanceState OldState,
- HubInstanceState& NewState,
- uint16_t BasePort,
- std::string_view BaseUri)
+Hub::NotifyStateUpdate(std::string_view ModuleId,
+ HubInstanceState OldState,
+ HubInstanceState NewState,
+ uint16_t BasePort,
+ std::string_view BaseUri)
{
if (m_ModuleStateChangeCallback && OldState != NewState)
{
@@ -1295,6 +1303,42 @@ namespace hub_testutils {
}
};
+ // Poll until Find() returns false for the given module (i.e. async deprovision completes).
+ static bool WaitForInstanceGone(Hub& HubInstance,
+ std::string_view ModuleId,
+ std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200),
+ std::chrono::seconds Timeout = std::chrono::seconds(30))
+ {
+ const auto Deadline = std::chrono::steady_clock::now() + Timeout;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (!HubInstance.Find(ModuleId))
+ {
+ return true;
+ }
+ std::this_thread::sleep_for(PollInterval);
+ }
+ return !HubInstance.Find(ModuleId);
+ }
+
+ // Poll until GetInstanceCount() reaches ExpectedCount (i.e. all async deprovisions complete).
+ static bool WaitForInstanceCount(Hub& HubInstance,
+ int ExpectedCount,
+ std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200),
+ std::chrono::seconds Timeout = std::chrono::seconds(30))
+ {
+ const auto Deadline = std::chrono::steady_clock::now() + Timeout;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (HubInstance.GetInstanceCount() == ExpectedCount)
+ {
+ return true;
+ }
+ std::this_thread::sleep_for(PollInterval);
+ }
+ return HubInstance.GetInstanceCount() == ExpectedCount;
+ }
+
} // namespace hub_testutils
TEST_CASE("hub.provision_basic")
@@ -1423,6 +1467,49 @@ TEST_CASE("hub.provision_callbacks")
}
}
+TEST_CASE("hub.provision_callback_sequence")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ struct TransitionRecord
+ {
+ HubInstanceState OldState;
+ HubInstanceState NewState;
+ };
+ RwLock CaptureMutex;
+ std::vector<TransitionRecord> Transitions;
+
+ auto CaptureFunc =
+ [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) {
+ ZEN_UNUSED(ModuleId);
+ ZEN_UNUSED(Info);
+ CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); });
+ };
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc));
+
+ HubProvisionedInstanceInfo Info;
+ {
+ const Hub::Response R = HubInstance->Provision("seq_module", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ {
+ const Hub::Response R = HubInstance->Deprovision("seq_module");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+
+ RwLock::SharedLockScope _(CaptureMutex);
+ REQUIRE_EQ(Transitions.size(), 4u);
+ CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned);
+ CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning);
+ CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning);
+ CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned);
+ CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned);
+ CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning);
+ CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning);
+ CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned);
+}
+
TEST_CASE("hub.instance_limit")
{
ScopedTemporaryDirectory TempDir;
@@ -1851,18 +1938,30 @@ TEST_CASE("hub.async_hibernate_wake")
CHECK(ModClient.Get("/health/"));
}
- // Deprovision
+ // Deprovision asynchronously and poll until the instance is gone
{
const Hub::Response R = HubInstance->Deprovision("async_hib_a");
CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message);
}
- CHECK_FALSE(HubInstance->Find("async_hib_a"));
+ REQUIRE_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "async_hib_a"), "Instance did not deprovision within timeout");
}
TEST_CASE("hub.recover_process_crash")
{
ScopedTemporaryDirectory TempDir;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
+
+ struct TransitionRecord
+ {
+ HubInstanceState OldState;
+ HubInstanceState NewState;
+ };
+ RwLock CaptureMutex;
+ std::vector<TransitionRecord> Transitions;
+ auto CaptureFunc = [&](std::string_view, const HubProvisionedInstanceInfo&, HubInstanceState OldState, HubInstanceState NewState) {
+ CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); });
+ };
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc));
HubProvisionedInstanceInfo Info;
{
@@ -1896,6 +1995,26 @@ TEST_CASE("hub.recover_process_crash")
}
}
CHECK_MESSAGE(Recovered, "Instance did not recover within timeout");
+
+ // Verify the full crash/recovery callback sequence
+ {
+ RwLock::SharedLockScope _(CaptureMutex);
+ REQUIRE_GE(Transitions.size(), 3u);
+ // Find the Provisioned->Crashed transition
+ const auto CrashedIt = std::find_if(Transitions.begin(), Transitions.end(), [](const TransitionRecord& R) {
+ return R.OldState == HubInstanceState::Provisioned && R.NewState == HubInstanceState::Crashed;
+ });
+ REQUIRE_NE(CrashedIt, Transitions.end());
+ // Recovery sequence follows: Crashed->Recovering, Recovering->Provisioned
+ const auto RecoveringIt = CrashedIt + 1;
+ REQUIRE_NE(RecoveringIt, Transitions.end());
+ CHECK_EQ(RecoveringIt->OldState, HubInstanceState::Crashed);
+ CHECK_EQ(RecoveringIt->NewState, HubInstanceState::Recovering);
+ const auto RecoveredIt = RecoveringIt + 1;
+ REQUIRE_NE(RecoveredIt, Transitions.end());
+ CHECK_EQ(RecoveredIt->OldState, HubInstanceState::Recovering);
+ CHECK_EQ(RecoveredIt->NewState, HubInstanceState::Provisioned);
+ }
}
TEST_CASE("hub.recover_process_crash_then_deprovision")
@@ -2025,7 +2144,7 @@ TEST_CASE("hub.async_provision_concurrent")
const Hub::Response DepResp = HubInstance->Deprovision(fmt::format("async_c{}", I));
CHECK_MESSAGE(DepResp.ResponseCode == Hub::EResponseCode::Accepted, DepResp.Message);
}
- CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+ REQUIRE_MESSAGE(hub_testutils::WaitForInstanceCount(*HubInstance, 0), "Not all instances deprovisioned within timeout");
}
TEST_CASE("hub.async_provision_shutdown_waits")
@@ -2101,7 +2220,7 @@ Hub::TerminateModuleForTesting(const std::string& ModuleId)
{
return;
}
- StorageServerInstance::SharedLockedPtr Locked = m_ActiveInstances[It->second]->LockShared(/*Wait*/ true);
+ StorageServerInstance::SharedLockedPtr Locked = m_ActiveInstances[It->second].Instance->LockShared(/*Wait*/ true);
if (Locked)
{
Locked.TerminateForTesting();