diff options
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 1090 |
1 files changed, 826 insertions, 264 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index 6a2609443..ceba21d8d 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -10,6 +10,7 @@ #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/timer.h> +#include <zencore/workthreadpool.h> ZEN_THIRD_PARTY_INCLUDES_START #include <EASTL/fixed_vector.h> @@ -20,7 +21,6 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <zencore/filesystem.h> # include <zencore/testing.h> # include <zencore/testutils.h> -# include <zencore/workthreadpool.h> # include <zenhttp/httpclient.h> #endif @@ -122,9 +122,14 @@ private: ////////////////////////////////////////////////////////////////////////// -Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) +Hub::Hub(const Configuration& Config, + ZenServerEnvironment&& RunEnvironment, + WorkerThreadPool* OptionalWorkerPool, + AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) : m_Config(Config) , m_RunEnvironment(std::move(RunEnvironment)) +, m_WorkerPool(OptionalWorkerPool) +, m_BackgroundWorkLatch(1) , m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback)) { m_HostMetrics = GetSystemMetrics(); @@ -196,42 +201,44 @@ Hub::Shutdown() m_WatchDog = {}; - m_Lock.WithExclusiveLock([this] { - for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup) - { - std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; - { - StorageServerInstance::ExclusiveLockedPtr Instance(InstanceRaw->LockExclusive(/*Wait*/ true)); + bool Expected = false; + bool WaitForBackgroundWork = m_ShutdownFlag.compare_exchange_strong(Expected, true); + if (WaitForBackgroundWork && m_WorkerPool) + { + m_BackgroundWorkLatch.CountDown(); + m_BackgroundWorkLatch.Wait(); + // Shutdown flag is set and all background work is drained, safe to shut down remaining instances - uint16_t BasePort = Instance.GetBasePort(); - std::string BaseUri; // TODO? - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); + m_BackgroundWorkLatch.Reset(1); + } - try - { - (void)Instance.Deprovision(); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what()); - } - // Instance is being destroyed; always report Unprovisioned so callbacks (e.g. Consul) fire. - NewState = HubInstanceState::Unprovisioned; - Instance = {}; + EnumerateModules([&](std::string_view ModuleId, const InstanceInfo& Info) { + ZEN_UNUSED(Info); // This might need to be checked to avoid spurious non-relevant warnings... + try + { + const Response DepResp = InternalDeprovision(std::string(ModuleId)); + if (DepResp.ResponseCode != EResponseCode::Completed && DepResp.ResponseCode != EResponseCode::Accepted) + { + ZEN_WARN("Deprovision instance for module '{}' during hub shutdown rejected: {}", ModuleId, DepResp.Message); } - InstanceRaw.reset(); } - m_InstanceLookup.clear(); - m_ActiveInstances.clear(); - m_FreeActiveInstanceIndexes.clear(); + catch (const std::exception& Ex) + { + ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what()); + } }); + + if (WaitForBackgroundWork && m_WorkerPool) + { + m_BackgroundWorkLatch.CountDown(); + m_BackgroundWorkLatch.Wait(); + } } -bool -Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason) +Hub::Response +Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) { + ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; bool IsNewInstance = false; uint16_t AllocatedPort = 0; @@ -245,6 +252,15 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s } }); + if (auto It = m_ProvisioningModules.find(std::string(ModuleId)); It != m_ProvisioningModules.end()) + { + // Same operation already in flight -- return the already-allocated port. + // RestoreAllocatedPort is a no-op here because IsNewInstance is still false + // (we return before line 273 where it is set), so no port is double-freed. + OutInfo.Port = It->second; + return Response{EResponseCode::Accepted}; + } + if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end()) { std::string Reason; @@ -252,9 +268,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s { ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason); - OutReason = Reason; - - return false; + return Response{EResponseCode::Rejected, Reason}; } IsNewInstance = true; @@ -313,21 +327,100 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s if (m_RecoveringModules.contains(std::string(ModuleId))) { - OutReason = fmt::format("Module '{}' is currently recovering from a crash", ModuleId); ZEN_WARN("Attempted to provision module '{}' which is currently recovering", ModuleId); - return false; + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; } std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; - Instance = InstanceRaw->LockExclusive(/*Wait*/ true); - AllocatedPort = InstanceRaw->GetBasePort(); + ZEN_ASSERT(InstanceRaw); + + if (InstanceRaw->GetState() == HubInstanceState::Provisioned) + { + OutInfo.Port = InstanceRaw->GetBasePort(); + return Response{EResponseCode::Completed}; + } + + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); + AllocatedPort = InstanceRaw->GetBasePort(); } - m_ProvisioningModules.emplace(std::string(ModuleId)); + m_ProvisioningModules.emplace(std::string(ModuleId), AllocatedPort); } + // NOTE: done while not holding the hub lock, to avoid blocking other operations. + // m_ProvisioningModules tracks which modules are being provisioned, blocking + // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + ZEN_ASSERT(Instance); + if (m_WorkerPool) + { + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, + ModuleId = std::string(ModuleId), + AllocatedPort, + IsNewInstance, + Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + try + { + CompleteProvision(*Instance, AllocatedPort, IsNewInstance); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async provision of module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + ZEN_ERROR("Failed async dispatch provision of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + + if (IsNewInstance) + { + try + { + AbortProvision(ModuleId); + } + catch (const std::exception& DestroyEx) + { + ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what()); + } + } + + Instance = {}; + + { + RwLock::ExclusiveLockScope _(m_Lock); + m_ProvisioningModules.erase(std::string(ModuleId)); + if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) + { + m_FreePorts.push_back(AllocatedPort); + } + } + + throw; + } + } + else + { + CompleteProvision(Instance, AllocatedPort, IsNewInstance); + } + + OutInfo.Port = AllocatedPort; + + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; +} + +void +Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance) +{ + const std::string ModuleId(Instance.GetModuleId()); uint16_t BasePort = Instance.GetBasePort(); std::string BaseUri; // TODO? HubInstanceState OldState = Instance.GetState(); @@ -340,47 +433,34 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) { m_FreePorts.push_back(AllocatedPort); - AllocatedPort = 0; } }); - // NOTE: this is done while not holding the hub lock, as provisioning may take time - // and we don't want to block other operations. We track which modules are being - // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision - // those modules while in this state. - - try + if (m_ShutdownFlag.load() == false) { - (void)Instance.Provision(); // false = already in target state (idempotent); not an error - NewState = Instance.GetState(); - Instance = {}; + try + { + (void)Instance.Provision(); // false = already in target state (idempotent); not an error + NewState = Instance.GetState(); + AllocatedPort = 0; + Instance = {}; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); + } } - catch (const std::exception& Ex) + + if (Instance) { - ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); NewState = Instance.GetState(); Instance = {}; if (IsNewInstance) { - // Clean up failed instance provisioning - std::unique_ptr<StorageServerInstance> DestroyInstance; - { - RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) - { - const size_t ActiveInstanceIndex = It->second; - ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); - ZEN_ASSERT(DestroyInstance); - ZEN_ASSERT(!m_ActiveInstances[ActiveInstanceIndex]); - m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); - m_InstanceLookup.erase(It); - } - } try { - DestroyInstance.reset(); + AbortProvision(ModuleId); NewState = HubInstanceState::Unprovisioned; } catch (const std::exception& DestroyEx) @@ -388,18 +468,42 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what()); } } - throw; } +} - OutReason.clear(); - OutInfo.Port = AllocatedPort; - // TODO: base URI? Would need to know what host name / IP to use +void +Hub::AbortProvision(std::string_view ModuleId) +{ + std::unique_ptr<StorageServerInstance> DestroyInstance; + { + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) + { + const size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); + ZEN_ASSERT(DestroyInstance); + ZEN_ASSERT(!m_ActiveInstances[ActiveInstanceIndex]); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); + } + else + { + ZEN_WARN("AbortProvision called for unknown module '{}'", ModuleId); + } + } + DestroyInstance.reset(); +} - return true; +Hub::Response +Hub::Deprovision(const std::string& ModuleId) +{ + ZEN_ASSERT(!m_ShutdownFlag.load()); + return InternalDeprovision(ModuleId); } -bool -Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) +Hub::Response +Hub::InternalDeprovision(const std::string& ModuleId) { std::unique_ptr<StorageServerInstance> RawInstance; StorageServerInstance::ExclusiveLockedPtr Instance; @@ -409,26 +513,27 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) if (m_ProvisioningModules.contains(ModuleId)) { - OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId); - ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId); - return false; + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently being provisioned", ModuleId)}; } if (m_RecoveringModules.contains(ModuleId)) { - OutReason = fmt::format("Module '{}' is currently recovering from a crash", ModuleId); ZEN_WARN("Attempted to deprovision module '{}' which is currently recovering", ModuleId); - return false; + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; + } + + if (m_DeprovisioningModules.contains(ModuleId)) + { + return Response{EResponseCode::Accepted}; } if (auto It = m_InstanceLookup.find(ModuleId); It == m_InstanceLookup.end()) { ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); - OutReason.clear(); // empty = not found (-> 404) - return false; + return Response{EResponseCode::NotFound}; } else { @@ -436,33 +541,90 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); RawInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); ZEN_ASSERT(RawInstance != nullptr); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(It); - m_DeprovisioningModules.emplace(ModuleId); + m_DeprovisioningModules.emplace(ModuleId, RawInstance->GetBasePort()); Instance = RawInstance->LockExclusive(/*Wait*/ true); } } + // NOTE: done while not holding the hub lock, to avoid blocking other operations. + // m_DeprovisioningModules tracks which modules are being deprovisioned, blocking + // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + ZEN_ASSERT(RawInstance); ZEN_ASSERT(Instance); + if (m_WorkerPool) + { + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, + ModuleId = std::string(ModuleId), + Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)), + RawInstance = std::shared_ptr<StorageServerInstance>(std::move(RawInstance))]() mutable { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + try + { + CompleteDeprovision(*Instance); + RawInstance.reset(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async deprovision of module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + uint16_t BasePort = Instance.GetBasePort(); + InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, /*BaseUri*/ {}); + + // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly. + Instance = {}; + NewState = HubInstanceState::Unprovisioned; + + { + RwLock::ExclusiveLockScope _(m_Lock); + m_DeprovisioningModules.erase(std::string(ModuleId)); + m_FreePorts.push_back(BasePort); + } + throw; + } + } + else + { + CompleteDeprovision(Instance); + RawInstance.reset(); + } + + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; +} + +void +Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance) +{ + const std::string ModuleId(Instance.GetModuleId()); uint16_t BasePort = Instance.GetBasePort(); std::string BaseUri; // TODO? HubInstanceState OldState = Instance.GetState(); HubInstanceState NewState = OldState; InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); - // The module is deprovisioned outside the hub lock to avoid blocking other operations. - // - // To ensure that no new provisioning can occur while we're deprovisioning, - // we add the module ID to m_DeprovisioningModules and remove it once - // deprovisioning is complete. - auto _ = MakeGuard([&] { { RwLock::ExclusiveLockScope _(m_Lock); - m_DeprovisioningModules.erase(ModuleId); + m_DeprovisioningModules.erase(std::string(ModuleId)); m_FreePorts.push_back(BasePort); } }); @@ -470,6 +632,8 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) try { (void)Instance.Deprovision(); + NewState = Instance.GetState(); + Instance = {}; } catch (const std::exception& Ex) { @@ -479,42 +643,116 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) Instance = {}; throw; } - NewState = Instance.GetState(); - Instance = {}; - OutReason.clear(); - - return true; } -bool -Hub::Hibernate(const std::string& ModuleId, std::string& OutReason) +Hub::Response +Hub::Hibernate(const std::string& ModuleId) { + ZEN_ASSERT(!m_ShutdownFlag.load()); + StorageServerInstance::ExclusiveLockedPtr Instance; { RwLock::ExclusiveLockScope _(m_Lock); - if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) || - m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId) || m_RecoveringModules.contains(ModuleId)) + if (m_HibernatingModules.contains(ModuleId)) + { + return Response{EResponseCode::Accepted}; + } + + if (IsModuleInFlightLocked(ModuleId)) { - OutReason = fmt::format("Module '{}' is currently changing state", ModuleId); - return false; + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)}; } auto It = m_InstanceLookup.find(ModuleId); if (It == m_InstanceLookup.end()) { - OutReason.clear(); // empty = not found (-> 404) - return false; + return Response{EResponseCode::NotFound}; } const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true); - m_HibernatingModules.emplace(ModuleId); + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; + ZEN_ASSERT(InstanceRaw); + + if (InstanceRaw->GetState() == HubInstanceState::Hibernated) + { + return Response{EResponseCode::Completed}; + } + + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); + m_HibernatingModules.emplace(ModuleId, Instance.GetBasePort()); } + // NOTE: done while not holding the hub lock, to avoid blocking other operations. + // m_HibernatingModules tracks which modules are being hibernated, blocking + // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + ZEN_ASSERT(Instance); + + // Validate state while holding the exclusive instance lock (state cannot change). + // This gives a synchronous error response for invalid-state calls on both the sync + // and async paths, matching the existing behaviour. + if (Instance.GetState() != HubInstanceState::Provisioned) + { + const Response HibernateResp = + Response{EResponseCode::Rejected, + fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState()))}; + Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) + RwLock::ExclusiveLockScope _(m_Lock); + m_HibernatingModules.erase(ModuleId); + return HibernateResp; + } + + if (m_WorkerPool) + { + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, + ModuleId = std::string(ModuleId), + Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + try + { + CompleteHibernate(*Instance); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async hibernate of module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + // Dispatch failed: undo the latch increment and tracking-set membership. + // State has not changed so no callback is needed. + // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) + ZEN_ERROR("Failed async dispatch hibernate of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + Instance = {}; + { + RwLock::ExclusiveLockScope _(m_Lock); + m_HibernatingModules.erase(std::string(ModuleId)); + } + throw; + } + } + else + { + CompleteHibernate(Instance); + } + + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; +} + +void +Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance) +{ + const std::string ModuleId(Instance.GetModuleId()); uint16_t BasePort = Instance.GetBasePort(); std::string BaseUri; // TODO? HubInstanceState OldState = Instance.GetState(); @@ -523,19 +761,17 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason) auto RemoveHibernatingModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); - m_HibernatingModules.erase(ModuleId); + m_HibernatingModules.erase(std::string(ModuleId)); }); - // NOTE: done while not holding the hub lock, as hibernation may take time. - // m_HibernatingModules tracks which modules are being hibernated, blocking - // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. try { if (!Instance.Hibernate()) { - OutReason = fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState())); - NewState = Instance.GetState(); - return false; + ZEN_WARN("Hibernate returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState())); + NewState = Instance.GetState(); + Instance = {}; + return; } NewState = Instance.GetState(); Instance = {}; @@ -547,42 +783,116 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason) Instance = {}; throw; } - - OutReason.clear(); - - return true; } -bool -Hub::Wake(const std::string& ModuleId, std::string& OutReason) +Hub::Response +Hub::Wake(const std::string& ModuleId) { + ZEN_ASSERT(!m_ShutdownFlag.load()); + StorageServerInstance::ExclusiveLockedPtr Instance; { RwLock::ExclusiveLockScope _(m_Lock); - if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) || - m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId) || m_RecoveringModules.contains(ModuleId)) + if (m_WakingModules.contains(ModuleId)) + { + return Response{EResponseCode::Accepted}; + } + + if (IsModuleInFlightLocked(ModuleId)) { - OutReason = fmt::format("Module '{}' is currently changing state", ModuleId); - return false; + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)}; } auto It = m_InstanceLookup.find(ModuleId); if (It == m_InstanceLookup.end()) { - OutReason.clear(); // empty = not found (-> 404) - return false; + return Response{EResponseCode::NotFound}; } const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true); - m_WakingModules.emplace(ModuleId); + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; + ZEN_ASSERT(InstanceRaw); + + if (InstanceRaw->GetState() == HubInstanceState::Provisioned) + { + return Response{EResponseCode::Completed}; + } + + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); + m_WakingModules.emplace(ModuleId, Instance.GetBasePort()); } + // NOTE: done while not holding the hub lock, to avoid blocking other operations. + // m_WakingModules tracks which modules are being woken, blocking + // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + ZEN_ASSERT(Instance); + // Validate state while holding the exclusive instance lock (state cannot change). + // This gives a synchronous error response for invalid-state calls on both the sync + // and async paths, matching the existing behaviour. + if (Instance.GetState() != HubInstanceState::Hibernated) + { + const Response WakeResp = + Response{EResponseCode::Rejected, + fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState()))}; + Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) + RwLock::ExclusiveLockScope _(m_Lock); + m_WakingModules.erase(ModuleId); + return WakeResp; + } + + if (m_WorkerPool) + { + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, + ModuleId = std::string(ModuleId), + Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + try + { + CompleteWake(*Instance); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async wake of module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + // Dispatch failed: undo the latch increment and tracking-set membership. + // State has not changed so no callback is needed. + // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) + ZEN_ERROR("Failed async dispatch wake of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + Instance = {}; + { + RwLock::ExclusiveLockScope _(m_Lock); + m_WakingModules.erase(std::string(ModuleId)); + } + throw; + } + } + else + { + CompleteWake(Instance); + } + + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; +} + +void +Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance) +{ + const std::string ModuleId(Instance.GetModuleId()); uint16_t BasePort = Instance.GetBasePort(); std::string BaseUri; // TODO? HubInstanceState OldState = Instance.GetState(); @@ -591,19 +901,17 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason) auto RemoveWakingModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); - m_WakingModules.erase(ModuleId); + m_WakingModules.erase(std::string(ModuleId)); }); - // NOTE: done while not holding the hub lock, as waking may take time. - // m_WakingModules tracks which modules are being woken, blocking - // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. try { if (!Instance.Wake()) { - OutReason = fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState())); - NewState = Instance.GetState(); - return false; + ZEN_WARN("Wake returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState())); + NewState = Instance.GetState(); + Instance = {}; + return; } NewState = Instance.GetState(); Instance = {}; @@ -615,10 +923,6 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason) Instance = {}; throw; } - - OutReason.clear(); - - return true; } bool @@ -739,18 +1043,23 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) return true; } +bool +Hub::IsModuleInFlightLocked(std::string_view ModuleId) const +{ + const std::string Key(ModuleId); + return m_ProvisioningModules.contains(Key) || m_DeprovisioningModules.contains(Key) || m_HibernatingModules.contains(Key) || + m_WakingModules.contains(Key) || m_RecoveringModules.contains(Key); +} + void Hub::AttemptRecoverInstance(std::string_view ModuleId) { StorageServerInstance::ExclusiveLockedPtr Instance; - StorageServerInstance* RawInstance = nullptr; { RwLock::ExclusiveLockScope _(m_Lock); - if (m_RecoveringModules.contains(std::string(ModuleId)) || m_ProvisioningModules.contains(std::string(ModuleId)) || - m_DeprovisioningModules.contains(std::string(ModuleId)) || m_HibernatingModules.contains(std::string(ModuleId)) || - m_WakingModules.contains(std::string(ModuleId))) + if (IsModuleInFlightLocked(ModuleId)) { return; } @@ -763,38 +1072,42 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - RawInstance = m_ActiveInstances[ActiveInstanceIndex].get(); - Instance = RawInstance->LockExclusive(/*Wait*/ true); + Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true); + + // Definitive check while hub lock is held: exclusive instance lock is also held, + // so state cannot change under us. Bail if state changed (e.g. concurrent deprovision) + // or the process restarted since the watchdog fired. + if (Instance.GetState() != HubInstanceState::Provisioned || Instance.IsRunning()) + { + return; + } + m_RecoveringModules.emplace(std::string(ModuleId)); } ZEN_ASSERT(Instance); - uint16_t BasePort = Instance.GetBasePort(); - std::string BaseUri; // TODO? - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); + const uint16_t BasePort = Instance.GetBasePort(); + std::string BaseUri; // TODO? + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; auto RemoveRecoveringModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_RecoveringModules.erase(std::string(ModuleId)); }); - // Re-validate: state may have changed between releasing shared lock and acquiring exclusive lock - if (Instance.GetState() != HubInstanceState::Provisioned || Instance.IsRunning()) - { - return; - } - if (Instance.RecoverFromCrash()) { + // Spawn succeeded -- instance is back in Provisioned state. NewState = Instance.GetState(); Instance = {}; + OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri); return; } - // Restart threw but data dir is intact - run Dehydrate via Deprovision before cleanup. + // Spawn failed -- instance is now in Crashed state. Dehydrate before tearing down + // so any salvageable data is preserved. try { (void)Instance.Deprovision(); @@ -803,7 +1116,6 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) { ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what()); } - NewState = Instance.GetState(); Instance = {}; std::unique_ptr<StorageServerInstance> DestroyInstance; @@ -831,6 +1143,10 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) { ZEN_ERROR("Failed to destroy recovered instance for module '{}': {}", ModuleId, Ex.what()); } + + // Notify after all cleanup -- port is back in m_FreePorts and the callback sees + // a consistent end-state: module gone, transition complete. + OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri); } void @@ -942,9 +1258,10 @@ namespace hub_testutils { std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir, Hub::Configuration Config = {}, - Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {}) + Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {}, + WorkerThreadPool* WorkerPool = nullptr) { - return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback)); + return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), WorkerPool, std::move(StateChangeCallback)); } struct CallbackRecord @@ -989,10 +1306,8 @@ TEST_CASE("hub.provision_basic") CHECK_FALSE(HubInstance->Find("module_a")); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); - CHECK(Reason.empty()); + const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info); + REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); CHECK_EQ(HubInstance->GetInstanceCount(), 1); Hub::InstanceInfo InstanceInfo; @@ -1004,9 +1319,8 @@ TEST_CASE("hub.provision_basic") CHECK(ModClient.Get("/health/")); } - const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason); - CHECK(DeprovisionResult); - CHECK(Reason.empty()); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); @@ -1037,9 +1351,8 @@ TEST_CASE("hub.provision_config") CHECK_FALSE(HubInstance->Find("module_a")); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); + const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info); + REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); CHECK_EQ(HubInstance->GetInstanceCount(), 1); Hub::InstanceInfo InstanceInfo; @@ -1056,8 +1369,8 @@ TEST_CASE("hub.provision_config") CHECK(ModClient.Get("/health/")); } - const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason); - CHECK(DeprovisionResult); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); @@ -1076,10 +1389,9 @@ TEST_CASE("hub.provision_callbacks") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, CaptureInstance.CaptureFunc()); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool ProvisionResult = HubInstance->Provision("cb_module", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); + const Hub::Response ProvisionResult = HubInstance->Provision("cb_module", Info); + REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); { RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); @@ -1094,8 +1406,8 @@ TEST_CASE("hub.provision_callbacks") CHECK(ModClient.Get("/health/")); } - const bool DeprovisionResult = HubInstance->Deprovision("cb_module", Reason); - CHECK(DeprovisionResult); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("cb_module"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); @@ -1121,27 +1433,24 @@ TEST_CASE("hub.instance_limit") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool FirstResult = HubInstance->Provision("limit_a", Info, Reason); - REQUIRE_MESSAGE(FirstResult, Reason); + const Hub::Response FirstResult = HubInstance->Provision("limit_a", Info); + REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Completed, FirstResult.Message); - const bool SecondResult = HubInstance->Provision("limit_b", Info, Reason); - REQUIRE_MESSAGE(SecondResult, Reason); + const Hub::Response SecondResult = HubInstance->Provision("limit_b", Info); + REQUIRE_MESSAGE(SecondResult.ResponseCode == Hub::EResponseCode::Completed, SecondResult.Message); CHECK_EQ(HubInstance->GetInstanceCount(), 2); - Reason.clear(); - const bool ThirdResult = HubInstance->Provision("limit_c", Info, Reason); - CHECK_FALSE(ThirdResult); + const Hub::Response ThirdResult = HubInstance->Provision("limit_c", Info); + CHECK(ThirdResult.ResponseCode == Hub::EResponseCode::Rejected); CHECK_EQ(HubInstance->GetInstanceCount(), 2); - CHECK_NE(Reason.find("instance limit"), std::string::npos); + CHECK_NE(ThirdResult.Message.find("instance limit"), std::string::npos); - HubInstance->Deprovision("limit_a", Reason); + HubInstance->Deprovision("limit_a"); CHECK_EQ(HubInstance->GetInstanceCount(), 1); - Reason.clear(); - const bool FourthResult = HubInstance->Provision("limit_d", Info, Reason); - CHECK_MESSAGE(FourthResult, Reason); + const Hub::Response FourthResult = HubInstance->Provision("limit_d", Info); + CHECK_MESSAGE(FourthResult.ResponseCode == Hub::EResponseCode::Completed, FourthResult.Message); CHECK_EQ(HubInstance->GetInstanceCount(), 2); } @@ -1151,10 +1460,15 @@ TEST_CASE("hub.enumerate_modules") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision("enum_a", Info, Reason), Reason); - REQUIRE_MESSAGE(HubInstance->Provision("enum_b", Info, Reason), Reason); + { + const Hub::Response R = HubInstance->Provision("enum_a", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + { + const Hub::Response R = HubInstance->Provision("enum_b", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } std::vector<std::string> Ids; int ProvisionedCount = 0; @@ -1172,7 +1486,7 @@ TEST_CASE("hub.enumerate_modules") CHECK(FoundA); CHECK(FoundB); - HubInstance->Deprovision("enum_a", Reason); + HubInstance->Deprovision("enum_a"); Ids.clear(); ProvisionedCount = 0; HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { @@ -1195,17 +1509,22 @@ TEST_CASE("hub.max_instance_count") CHECK_EQ(HubInstance->GetMaxInstanceCount(), 0); HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision("max_a", Info, Reason), Reason); + { + const Hub::Response R = HubInstance->Provision("max_a", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } CHECK_GE(HubInstance->GetMaxInstanceCount(), 1); - REQUIRE_MESSAGE(HubInstance->Provision("max_b", Info, Reason), Reason); + { + const Hub::Response R = HubInstance->Provision("max_b", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } CHECK_GE(HubInstance->GetMaxInstanceCount(), 2); const int MaxAfterTwo = HubInstance->GetMaxInstanceCount(); - HubInstance->Deprovision("max_a", Reason); + HubInstance->Deprovision("max_a"); CHECK_EQ(HubInstance->GetInstanceCount(), 1); CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo); } @@ -1228,8 +1547,8 @@ TEST_CASE("hub.concurrent_callbacks") for (int I = 0; I < kHalf; ++I) { HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason); + const Hub::Response ProvR = HubInstance->Provision(fmt::format("pre_{}", I), Info); + REQUIRE_MESSAGE(ProvR.ResponseCode == Hub::EResponseCode::Completed, ProvR.Message); } CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); @@ -1253,23 +1572,21 @@ TEST_CASE("hub.concurrent_callbacks") for (int I = 0; I < kHalf; ++I) { - ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] { - HubProvisionedInstanceInfo Info; - std::string Reason; - const bool Result = - HubInstance->Provision(fmt::format("new_{}", I), Info, Reason); - ProvisionResults[I] = Result ? 1 : 0; - ProvisionReasons[I] = Reason; - }), - WorkerThreadPool::EMode::EnableBacklog); - - DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] { - std::string Reason; - const bool Result = - HubInstance->Deprovision(fmt::format("pre_{}", I), Reason); - DeprovisionResults[I] = Result ? 1 : 0; - }), - WorkerThreadPool::EMode::EnableBacklog); + ProvisionFutures[I] = + Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] { + HubProvisionedInstanceInfo Info; + const Hub::Response Result = HubInstance->Provision(fmt::format("new_{}", I), Info); + ProvisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0; + ProvisionReasons[I] = Result.Message; + }), + WorkerThreadPool::EMode::EnableBacklog); + + DeprovisionFutures[I] = + Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] { + const Hub::Response Result = HubInstance->Deprovision(fmt::format("pre_{}", I)); + DeprovisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0; + }), + WorkerThreadPool::EMode::EnableBacklog); } for (std::future<void>& F : ProvisionFutures) @@ -1324,14 +1641,13 @@ TEST_CASE("hub.job_object") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool ProvisionResult = HubInstance->Provision("jobobj_a", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); + const Hub::Response ProvisionResult = HubInstance->Provision("jobobj_a", Info); + REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); - const bool DeprovisionResult = HubInstance->Deprovision("jobobj_a", Reason); - CHECK(DeprovisionResult); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("jobobj_a"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); } @@ -1344,14 +1660,13 @@ TEST_CASE("hub.job_object") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool ProvisionResult = HubInstance->Provision("nojobobj_a", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); + const Hub::Response ProvisionResult = HubInstance->Provision("nojobobj_a", Info); + REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); - const bool DeprovisionResult = HubInstance->Deprovision("nojobobj_a", Reason); - CHECK(DeprovisionResult); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("nojobobj_a"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); } } @@ -1366,11 +1681,12 @@ TEST_CASE("hub.hibernate_wake") HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; - std::string Reason; // Provision - REQUIRE_MESSAGE(HubInstance->Provision("hib_a", ProvInfo, Reason), Reason); - CHECK(Reason.empty()); + { + const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); { @@ -1379,9 +1695,8 @@ TEST_CASE("hub.hibernate_wake") } // Hibernate - const bool HibernateResult = HubInstance->Hibernate("hib_a", Reason); - REQUIRE_MESSAGE(HibernateResult, Reason); - CHECK(Reason.empty()); + const Hub::Response HibernateResult = HubInstance->Hibernate("hib_a"); + REQUIRE_MESSAGE(HibernateResult.ResponseCode == Hub::EResponseCode::Completed, HibernateResult.Message); REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Hibernated); { @@ -1390,9 +1705,8 @@ TEST_CASE("hub.hibernate_wake") } // Wake - const bool WakeResult = HubInstance->Wake("hib_a", Reason); - REQUIRE_MESSAGE(WakeResult, Reason); - CHECK(Reason.empty()); + const Hub::Response WakeResult = HubInstance->Wake("hib_a"); + REQUIRE_MESSAGE(WakeResult.ResponseCode == Hub::EResponseCode::Completed, WakeResult.Message); REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); { @@ -1401,9 +1715,8 @@ TEST_CASE("hub.hibernate_wake") } // Deprovision - const bool DeprovisionResult = HubInstance->Deprovision("hib_a", Reason); - CHECK(DeprovisionResult); - CHECK(Reason.empty()); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("hib_a"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_FALSE(HubInstance->Find("hib_a")); { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); @@ -1419,36 +1732,131 @@ TEST_CASE("hub.hibernate_wake_errors") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo ProvInfo; - std::string Reason; - // Hibernate/wake on a non-existent module - should return false with empty reason (-> 404) - CHECK_FALSE(HubInstance->Hibernate("never_provisioned", Reason)); - CHECK(Reason.empty()); + // Hibernate/wake on a non-existent module - returns NotFound (-> 404) + CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); - CHECK_FALSE(HubInstance->Wake("never_provisioned", Reason)); - CHECK(Reason.empty()); + // Double-hibernate: second hibernate on already-hibernated module returns Completed (idempotent) + { + const Hub::Response R = HubInstance->Provision("err_b", ProvInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + { + const Hub::Response R = HubInstance->Hibernate("err_b"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } - // Double-hibernate: first hibernate succeeds, second returns false with non-empty reason (-> 400) - REQUIRE_MESSAGE(HubInstance->Provision("err_b", ProvInfo, Reason), Reason); - CHECK(Reason.empty()); - REQUIRE_MESSAGE(HubInstance->Hibernate("err_b", Reason), Reason); - CHECK(Reason.empty()); + { + const Hub::Response HibResp = HubInstance->Hibernate("err_b"); + CHECK(HibResp.ResponseCode == Hub::EResponseCode::Completed); + } - Reason.clear(); - CHECK_FALSE(HubInstance->Hibernate("err_b", Reason)); - CHECK_FALSE(Reason.empty()); + // Wake on provisioned: succeeds (-> Provisioned), then wake again returns Completed (idempotent) + { + const Hub::Response R = HubInstance->Wake("err_b"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } - // Wake on provisioned: succeeds (-> Provisioned), then wake again returns false (-> 400) - REQUIRE_MESSAGE(HubInstance->Wake("err_b", Reason), Reason); - CHECK(Reason.empty()); + { + const Hub::Response WakeResp = HubInstance->Wake("err_b"); + CHECK(WakeResp.ResponseCode == Hub::EResponseCode::Completed); + } - Reason.clear(); - CHECK_FALSE(HubInstance->Wake("err_b", Reason)); - CHECK_FALSE(Reason.empty()); + // Deprovision not-found - returns NotFound (-> 404) + CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); +} + +TEST_CASE("hub.async_hibernate_wake") +{ + ScopedTemporaryDirectory TempDir; + + Hub::Configuration Config; + Config.BasePortNumber = 23000; + + WorkerThreadPool WorkerPool(2, "hub_async_hib_wake"); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + + HubProvisionedInstanceInfo ProvInfo; + Hub::InstanceInfo Info; - // Deprovision not-found - should return false with empty reason (-> 404) - CHECK_FALSE(HubInstance->Deprovision("never_provisioned", Reason)); - CHECK(Reason.empty()); + constexpr auto kPollInterval = std::chrono::milliseconds(200); + constexpr auto kTimeout = std::chrono::seconds(30); + + // Provision and wait until Provisioned + { + const Hub::Response R = HubInstance->Provision("async_hib_a", ProvInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); + } + { + const auto Deadline = std::chrono::steady_clock::now() + kTimeout; + bool Ready = false; + while (std::chrono::steady_clock::now() < Deadline) + { + if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned) + { + Ready = true; + break; + } + std::this_thread::sleep_for(kPollInterval); + } + REQUIRE_MESSAGE(Ready, "Instance did not reach Provisioned state within timeout"); + } + + // Hibernate asynchronously and poll until Hibernated + { + const Hub::Response R = HubInstance->Hibernate("async_hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); + } + { + const auto Deadline = std::chrono::steady_clock::now() + kTimeout; + bool Hibernated = false; + while (std::chrono::steady_clock::now() < Deadline) + { + if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Hibernated) + { + Hibernated = true; + break; + } + std::this_thread::sleep_for(kPollInterval); + } + REQUIRE_MESSAGE(Hibernated, "Instance did not reach Hibernated state within timeout"); + } + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } + + // Wake asynchronously and poll until Provisioned + { + const Hub::Response R = HubInstance->Wake("async_hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); + } + { + const auto Deadline = std::chrono::steady_clock::now() + kTimeout; + bool Woken = false; + while (std::chrono::steady_clock::now() < Deadline) + { + if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned) + { + Woken = true; + break; + } + std::this_thread::sleep_for(kPollInterval); + } + REQUIRE_MESSAGE(Woken, "Instance did not reach Provisioned state after wake within timeout"); + } + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + + // Deprovision + { + const Hub::Response R = HubInstance->Deprovision("async_hib_a"); + CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); + } + CHECK_FALSE(HubInstance->Find("async_hib_a")); } TEST_CASE("hub.recover_process_crash") @@ -1457,8 +1865,10 @@ TEST_CASE("hub.recover_process_crash") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision("module_a", Info, Reason), Reason); + { + const Hub::Response R = HubInstance->Provision("module_a", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } // Kill the child process to simulate a crash, then poll until the watchdog detects it, // recovers the instance, and the new process is serving requests. @@ -1494,8 +1904,10 @@ TEST_CASE("hub.recover_process_crash_then_deprovision") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision("module_a", Info, Reason), Reason); + { + const Hub::Response R = HubInstance->Provision("module_a", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } // Kill the child process, wait for the watchdog to detect and recover the instance. HubInstance->TerminateModuleForTesting("module_a"); @@ -1518,16 +1930,166 @@ TEST_CASE("hub.recover_process_crash_then_deprovision") REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout"); // After recovery, deprovision should succeed and a re-provision should work. - CHECK_MESSAGE(HubInstance->Deprovision("module_a", Reason), Reason); + { + const Hub::Response R = HubInstance->Deprovision("module_a"); + CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } CHECK_EQ(HubInstance->GetInstanceCount(), 0); HubProvisionedInstanceInfo NewInfo; - CHECK_MESSAGE(HubInstance->Provision("module_a", NewInfo, Reason), Reason); + { + const Hub::Response R = HubInstance->Provision("module_a", NewInfo); + CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } CHECK_NE(NewInfo.Port, 0); HttpClient NewClient(fmt::format("http://localhost:{}", NewInfo.Port), kFastTimeout); CHECK_MESSAGE(NewClient.Get("/health/"), "Re-provisioned instance is not serving requests"); } +TEST_CASE("hub.async_provision_concurrent") +{ + ScopedTemporaryDirectory TempDir; + + constexpr int kModuleCount = 8; + + Hub::Configuration Config; + Config.BasePortNumber = 22800; + Config.InstanceLimit = kModuleCount; + + WorkerThreadPool WorkerPool(4, "hub_async_concurrent"); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + + std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount); + std::vector<std::string> Reasons(kModuleCount); + std::vector<int> Results(kModuleCount, 0); + + { + WorkerThreadPool Callers(kModuleCount, "hub_async_callers"); + std::vector<std::future<void>> Futures(kModuleCount); + + for (int I = 0; I < kModuleCount; ++I) + { + Futures[I] = Callers.EnqueueTask(std::packaged_task<void()>([&, I] { + const Hub::Response Resp = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]); + Results[I] = (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? 1 : 0; + Reasons[I] = Resp.Message; + }), + WorkerThreadPool::EMode::EnableBacklog); + } + for (std::future<void>& F : Futures) + { + F.get(); + } + } + + for (int I = 0; I < kModuleCount; ++I) + { + REQUIRE_MESSAGE(Results[I] != 0, Reasons[I]); + CHECK_NE(Infos[I].Port, 0); + } + + // Poll until all instances reach Provisioned state + constexpr auto kPollInterval = std::chrono::milliseconds(200); + constexpr auto kTimeout = std::chrono::seconds(30); + const auto Deadline = std::chrono::steady_clock::now() + kTimeout; + + bool AllProvisioned = false; + while (std::chrono::steady_clock::now() < Deadline) + { + int ProvisionedCount = 0; + for (int I = 0; I < kModuleCount; ++I) + { + Hub::InstanceInfo InstanceInfo; + if (HubInstance->Find(fmt::format("async_c{}", I), &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned) + { + ++ProvisionedCount; + } + } + if (ProvisionedCount == kModuleCount) + { + AllProvisioned = true; + break; + } + std::this_thread::sleep_for(kPollInterval); + } + CHECK_MESSAGE(AllProvisioned, "Not all instances reached Provisioned state within timeout"); + + for (int I = 0; I < kModuleCount; ++I) + { + HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout); + CHECK_MESSAGE(ModClient.Get("/health/"), fmt::format("async_c{} not serving requests", I)); + } + + for (int I = 0; I < kModuleCount; ++I) + { + const Hub::Response DepResp = HubInstance->Deprovision(fmt::format("async_c{}", I)); + CHECK_MESSAGE(DepResp.ResponseCode == Hub::EResponseCode::Accepted, DepResp.Message); + } + CHECK_EQ(HubInstance->GetInstanceCount(), 0); +} + +TEST_CASE("hub.async_provision_shutdown_waits") +{ + ScopedTemporaryDirectory TempDir; + + constexpr int kModuleCount = 8; + + Hub::Configuration Config; + Config.InstanceLimit = kModuleCount; + Config.BasePortNumber = 22900; + + WorkerThreadPool WorkerPool(2, "hub_async_shutdown"); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + + std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount); + + for (int I = 0; I < kModuleCount; ++I) + { + const Hub::Response ProvResult = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]); + REQUIRE_MESSAGE(ProvResult.ResponseCode == Hub::EResponseCode::Accepted, ProvResult.Message); + REQUIRE_NE(Infos[I].Port, 0); + } + + // Shut down without polling for Provisioned; Shutdown() must drain the latch and clean up. + HubInstance->Shutdown(); + + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + + for (int I = 0; I < kModuleCount; ++I) + { + HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout); + CHECK_FALSE(ModClient.Get("/health/")); + } +} + +TEST_CASE("hub.async_provision_rejected") +{ + // Rejection from CanProvisionInstance fires synchronously even when a WorkerPool is present. + ScopedTemporaryDirectory TempDir; + + Hub::Configuration Config; + Config.InstanceLimit = 1; + Config.BasePortNumber = 23100; + + WorkerThreadPool WorkerPool(2, "hub_async_rejected"); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + + HubProvisionedInstanceInfo Info; + + // First provision: dispatched to WorkerPool, returns Accepted + const Hub::Response FirstResult = HubInstance->Provision("async_r1", Info); + REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Accepted, FirstResult.Message); + REQUIRE_NE(Info.Port, 0); + + // Second provision: CanProvisionInstance rejects synchronously (limit reached), returns Rejected + HubProvisionedInstanceInfo Info2; + const Hub::Response SecondResult = HubInstance->Provision("async_r2", Info2); + CHECK(SecondResult.ResponseCode == Hub::EResponseCode::Rejected); + CHECK_FALSE(SecondResult.Message.empty()); + CHECK_NE(SecondResult.Message.find("instance limit"), std::string::npos); + CHECK_EQ(HubInstance->GetInstanceCount(), 1); +} + TEST_SUITE_END(); void |