aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub/hub.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
-rw-r--r--src/zenserver/hub/hub.cpp1090
1 files changed, 826 insertions, 264 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index 6a2609443..ceba21d8d 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -10,6 +10,7 @@
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
+#include <zencore/workthreadpool.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <EASTL/fixed_vector.h>
@@ -20,7 +21,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <zencore/filesystem.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
-# include <zencore/workthreadpool.h>
# include <zenhttp/httpclient.h>
#endif
@@ -122,9 +122,14 @@ private:
//////////////////////////////////////////////////////////////////////////
-Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback)
+Hub::Hub(const Configuration& Config,
+ ZenServerEnvironment&& RunEnvironment,
+ WorkerThreadPool* OptionalWorkerPool,
+ AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback)
: m_Config(Config)
, m_RunEnvironment(std::move(RunEnvironment))
+, m_WorkerPool(OptionalWorkerPool)
+, m_BackgroundWorkLatch(1)
, m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback))
{
m_HostMetrics = GetSystemMetrics();
@@ -196,42 +201,44 @@ Hub::Shutdown()
m_WatchDog = {};
- m_Lock.WithExclusiveLock([this] {
- for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup)
- {
- std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
- {
- StorageServerInstance::ExclusiveLockedPtr Instance(InstanceRaw->LockExclusive(/*Wait*/ true));
+ bool Expected = false;
+ bool WaitForBackgroundWork = m_ShutdownFlag.compare_exchange_strong(Expected, true);
+ if (WaitForBackgroundWork && m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.CountDown();
+ m_BackgroundWorkLatch.Wait();
+ // Shutdown flag is set and all background work is drained, safe to shut down remaining instances
- uint16_t BasePort = Instance.GetBasePort();
- std::string BaseUri; // TODO?
- HubInstanceState OldState = Instance.GetState();
- HubInstanceState NewState = OldState;
- InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
+ m_BackgroundWorkLatch.Reset(1);
+ }
- try
- {
- (void)Instance.Deprovision();
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what());
- }
- // Instance is being destroyed; always report Unprovisioned so callbacks (e.g. Consul) fire.
- NewState = HubInstanceState::Unprovisioned;
- Instance = {};
+ EnumerateModules([&](std::string_view ModuleId, const InstanceInfo& Info) {
+ ZEN_UNUSED(Info); // This might need to be checked to avoid spurious non-relevant warnings...
+ try
+ {
+ const Response DepResp = InternalDeprovision(std::string(ModuleId));
+ if (DepResp.ResponseCode != EResponseCode::Completed && DepResp.ResponseCode != EResponseCode::Accepted)
+ {
+ ZEN_WARN("Deprovision instance for module '{}' during hub shutdown rejected: {}", ModuleId, DepResp.Message);
}
- InstanceRaw.reset();
}
- m_InstanceLookup.clear();
- m_ActiveInstances.clear();
- m_FreeActiveInstanceIndexes.clear();
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what());
+ }
});
+
+ if (WaitForBackgroundWork && m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.CountDown();
+ m_BackgroundWorkLatch.Wait();
+ }
}
-bool
-Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason)
+Hub::Response
+Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
{
+ ZEN_ASSERT(!m_ShutdownFlag.load());
StorageServerInstance::ExclusiveLockedPtr Instance;
bool IsNewInstance = false;
uint16_t AllocatedPort = 0;
@@ -245,6 +252,15 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
}
});
+ if (auto It = m_ProvisioningModules.find(std::string(ModuleId)); It != m_ProvisioningModules.end())
+ {
+ // Same operation already in flight -- return the already-allocated port.
+ // RestoreAllocatedPort is a no-op here because IsNewInstance is still false
+ // (we return before line 273 where it is set), so no port is double-freed.
+ OutInfo.Port = It->second;
+ return Response{EResponseCode::Accepted};
+ }
+
if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end())
{
std::string Reason;
@@ -252,9 +268,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
{
ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason);
- OutReason = Reason;
-
- return false;
+ return Response{EResponseCode::Rejected, Reason};
}
IsNewInstance = true;
@@ -313,21 +327,100 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
if (m_RecoveringModules.contains(std::string(ModuleId)))
{
- OutReason = fmt::format("Module '{}' is currently recovering from a crash", ModuleId);
ZEN_WARN("Attempted to provision module '{}' which is currently recovering", ModuleId);
- return false;
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)};
}
std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
- Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
- AllocatedPort = InstanceRaw->GetBasePort();
+ ZEN_ASSERT(InstanceRaw);
+
+ if (InstanceRaw->GetState() == HubInstanceState::Provisioned)
+ {
+ OutInfo.Port = InstanceRaw->GetBasePort();
+ return Response{EResponseCode::Completed};
+ }
+
+ Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
+ AllocatedPort = InstanceRaw->GetBasePort();
}
- m_ProvisioningModules.emplace(std::string(ModuleId));
+ m_ProvisioningModules.emplace(std::string(ModuleId), AllocatedPort);
}
+ // NOTE: done while not holding the hub lock, to avoid blocking other operations.
+ // m_ProvisioningModules tracks which modules are being provisioned, blocking
+ // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
+
ZEN_ASSERT(Instance);
+ if (m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_WorkerPool->ScheduleWork(
+ [this,
+ ModuleId = std::string(ModuleId),
+ AllocatedPort,
+ IsNewInstance,
+ Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ try
+ {
+ CompleteProvision(*Instance, AllocatedPort, IsNewInstance);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async provision of module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ ZEN_ERROR("Failed async dispatch provision of module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+
+ if (IsNewInstance)
+ {
+ try
+ {
+ AbortProvision(ModuleId);
+ }
+ catch (const std::exception& DestroyEx)
+ {
+ ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what());
+ }
+ }
+
+ Instance = {};
+
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_ProvisioningModules.erase(std::string(ModuleId));
+ if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId)))
+ {
+ m_FreePorts.push_back(AllocatedPort);
+ }
+ }
+
+ throw;
+ }
+ }
+ else
+ {
+ CompleteProvision(Instance, AllocatedPort, IsNewInstance);
+ }
+
+ OutInfo.Port = AllocatedPort;
+
+ return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+}
+
+void
+Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance)
+{
+ const std::string ModuleId(Instance.GetModuleId());
uint16_t BasePort = Instance.GetBasePort();
std::string BaseUri; // TODO?
HubInstanceState OldState = Instance.GetState();
@@ -340,47 +433,34 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId)))
{
m_FreePorts.push_back(AllocatedPort);
- AllocatedPort = 0;
}
});
- // NOTE: this is done while not holding the hub lock, as provisioning may take time
- // and we don't want to block other operations. We track which modules are being
- // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision
- // those modules while in this state.
-
- try
+ if (m_ShutdownFlag.load() == false)
{
- (void)Instance.Provision(); // false = already in target state (idempotent); not an error
- NewState = Instance.GetState();
- Instance = {};
+ try
+ {
+ (void)Instance.Provision(); // false = already in target state (idempotent); not an error
+ NewState = Instance.GetState();
+ AllocatedPort = 0;
+ Instance = {};
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
+ }
}
- catch (const std::exception& Ex)
+
+ if (Instance)
{
- ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
NewState = Instance.GetState();
Instance = {};
if (IsNewInstance)
{
- // Clean up failed instance provisioning
- std::unique_ptr<StorageServerInstance> DestroyInstance;
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end())
- {
- const size_t ActiveInstanceIndex = It->second;
- ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]);
- ZEN_ASSERT(DestroyInstance);
- ZEN_ASSERT(!m_ActiveInstances[ActiveInstanceIndex]);
- m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
- m_InstanceLookup.erase(It);
- }
- }
try
{
- DestroyInstance.reset();
+ AbortProvision(ModuleId);
NewState = HubInstanceState::Unprovisioned;
}
catch (const std::exception& DestroyEx)
@@ -388,18 +468,42 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what());
}
}
- throw;
}
+}
- OutReason.clear();
- OutInfo.Port = AllocatedPort;
- // TODO: base URI? Would need to know what host name / IP to use
+void
+Hub::AbortProvision(std::string_view ModuleId)
+{
+ std::unique_ptr<StorageServerInstance> DestroyInstance;
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end())
+ {
+ const size_t ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+ DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]);
+ ZEN_ASSERT(DestroyInstance);
+ ZEN_ASSERT(!m_ActiveInstances[ActiveInstanceIndex]);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(It);
+ }
+ else
+ {
+ ZEN_WARN("AbortProvision called for unknown module '{}'", ModuleId);
+ }
+ }
+ DestroyInstance.reset();
+}
- return true;
+Hub::Response
+Hub::Deprovision(const std::string& ModuleId)
+{
+ ZEN_ASSERT(!m_ShutdownFlag.load());
+ return InternalDeprovision(ModuleId);
}
-bool
-Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
+Hub::Response
+Hub::InternalDeprovision(const std::string& ModuleId)
{
std::unique_ptr<StorageServerInstance> RawInstance;
StorageServerInstance::ExclusiveLockedPtr Instance;
@@ -409,26 +513,27 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
if (m_ProvisioningModules.contains(ModuleId))
{
- OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId);
-
ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId);
- return false;
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently being provisioned", ModuleId)};
}
if (m_RecoveringModules.contains(ModuleId))
{
- OutReason = fmt::format("Module '{}' is currently recovering from a crash", ModuleId);
ZEN_WARN("Attempted to deprovision module '{}' which is currently recovering", ModuleId);
- return false;
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)};
+ }
+
+ if (m_DeprovisioningModules.contains(ModuleId))
+ {
+ return Response{EResponseCode::Accepted};
}
if (auto It = m_InstanceLookup.find(ModuleId); It == m_InstanceLookup.end())
{
ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId);
- OutReason.clear(); // empty = not found (-> 404)
- return false;
+ return Response{EResponseCode::NotFound};
}
else
{
@@ -436,33 +541,90 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
RawInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]);
ZEN_ASSERT(RawInstance != nullptr);
+
m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
m_InstanceLookup.erase(It);
- m_DeprovisioningModules.emplace(ModuleId);
+ m_DeprovisioningModules.emplace(ModuleId, RawInstance->GetBasePort());
Instance = RawInstance->LockExclusive(/*Wait*/ true);
}
}
+ // NOTE: done while not holding the hub lock, to avoid blocking other operations.
+ // m_DeprovisioningModules tracks which modules are being deprovisioned, blocking
+ // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
+
ZEN_ASSERT(RawInstance);
ZEN_ASSERT(Instance);
+ if (m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_WorkerPool->ScheduleWork(
+ [this,
+ ModuleId = std::string(ModuleId),
+ Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)),
+ RawInstance = std::shared_ptr<StorageServerInstance>(std::move(RawInstance))]() mutable {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ try
+ {
+ CompleteDeprovision(*Instance);
+ RawInstance.reset();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async deprovision of module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+
+ HubInstanceState OldState = Instance.GetState();
+ HubInstanceState NewState = OldState;
+ uint16_t BasePort = Instance.GetBasePort();
+ InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, /*BaseUri*/ {});
+
+ // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly.
+ Instance = {};
+ NewState = HubInstanceState::Unprovisioned;
+
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_DeprovisioningModules.erase(std::string(ModuleId));
+ m_FreePorts.push_back(BasePort);
+ }
+ throw;
+ }
+ }
+ else
+ {
+ CompleteDeprovision(Instance);
+ RawInstance.reset();
+ }
+
+ return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+}
+
+void
+Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance)
+{
+ const std::string ModuleId(Instance.GetModuleId());
uint16_t BasePort = Instance.GetBasePort();
std::string BaseUri; // TODO?
HubInstanceState OldState = Instance.GetState();
HubInstanceState NewState = OldState;
InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
- // The module is deprovisioned outside the hub lock to avoid blocking other operations.
- //
- // To ensure that no new provisioning can occur while we're deprovisioning,
- // we add the module ID to m_DeprovisioningModules and remove it once
- // deprovisioning is complete.
-
auto _ = MakeGuard([&] {
{
RwLock::ExclusiveLockScope _(m_Lock);
- m_DeprovisioningModules.erase(ModuleId);
+ m_DeprovisioningModules.erase(std::string(ModuleId));
m_FreePorts.push_back(BasePort);
}
});
@@ -470,6 +632,8 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
try
{
(void)Instance.Deprovision();
+ NewState = Instance.GetState();
+ Instance = {};
}
catch (const std::exception& Ex)
{
@@ -479,42 +643,116 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
Instance = {};
throw;
}
- NewState = Instance.GetState();
- Instance = {};
- OutReason.clear();
-
- return true;
}
-bool
-Hub::Hibernate(const std::string& ModuleId, std::string& OutReason)
+Hub::Response
+Hub::Hibernate(const std::string& ModuleId)
{
+ ZEN_ASSERT(!m_ShutdownFlag.load());
+
StorageServerInstance::ExclusiveLockedPtr Instance;
{
RwLock::ExclusiveLockScope _(m_Lock);
- if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) ||
- m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId) || m_RecoveringModules.contains(ModuleId))
+ if (m_HibernatingModules.contains(ModuleId))
+ {
+ return Response{EResponseCode::Accepted};
+ }
+
+ if (IsModuleInFlightLocked(ModuleId))
{
- OutReason = fmt::format("Module '{}' is currently changing state", ModuleId);
- return false;
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)};
}
auto It = m_InstanceLookup.find(ModuleId);
if (It == m_InstanceLookup.end())
{
- OutReason.clear(); // empty = not found (-> 404)
- return false;
+ return Response{EResponseCode::NotFound};
}
const size_t ActiveInstanceIndex = It->second;
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true);
- m_HibernatingModules.emplace(ModuleId);
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
+ ZEN_ASSERT(InstanceRaw);
+
+ if (InstanceRaw->GetState() == HubInstanceState::Hibernated)
+ {
+ return Response{EResponseCode::Completed};
+ }
+
+ Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
+ m_HibernatingModules.emplace(ModuleId, Instance.GetBasePort());
}
+ // NOTE: done while not holding the hub lock, to avoid blocking other operations.
+ // m_HibernatingModules tracks which modules are being hibernated, blocking
+ // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
+
ZEN_ASSERT(Instance);
+
+ // Validate state while holding the exclusive instance lock (state cannot change).
+ // This gives a synchronous error response for invalid-state calls on both the sync
+ // and async paths, matching the existing behaviour.
+ if (Instance.GetState() != HubInstanceState::Provisioned)
+ {
+ const Response HibernateResp =
+ Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState()))};
+ Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock)
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_HibernatingModules.erase(ModuleId);
+ return HibernateResp;
+ }
+
+ if (m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_WorkerPool->ScheduleWork(
+ [this,
+ ModuleId = std::string(ModuleId),
+ Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ try
+ {
+ CompleteHibernate(*Instance);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async hibernate of module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ // Dispatch failed: undo the latch increment and tracking-set membership.
+ // State has not changed so no callback is needed.
+ // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock)
+ ZEN_ERROR("Failed async dispatch hibernate of module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+ Instance = {};
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_HibernatingModules.erase(std::string(ModuleId));
+ }
+ throw;
+ }
+ }
+ else
+ {
+ CompleteHibernate(Instance);
+ }
+
+ return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+}
+
+void
+Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance)
+{
+ const std::string ModuleId(Instance.GetModuleId());
uint16_t BasePort = Instance.GetBasePort();
std::string BaseUri; // TODO?
HubInstanceState OldState = Instance.GetState();
@@ -523,19 +761,17 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason)
auto RemoveHibernatingModule = MakeGuard([&] {
RwLock::ExclusiveLockScope _(m_Lock);
- m_HibernatingModules.erase(ModuleId);
+ m_HibernatingModules.erase(std::string(ModuleId));
});
- // NOTE: done while not holding the hub lock, as hibernation may take time.
- // m_HibernatingModules tracks which modules are being hibernated, blocking
- // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
try
{
if (!Instance.Hibernate())
{
- OutReason = fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState()));
- NewState = Instance.GetState();
- return false;
+ ZEN_WARN("Hibernate returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState()));
+ NewState = Instance.GetState();
+ Instance = {};
+ return;
}
NewState = Instance.GetState();
Instance = {};
@@ -547,42 +783,116 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason)
Instance = {};
throw;
}
-
- OutReason.clear();
-
- return true;
}
-bool
-Hub::Wake(const std::string& ModuleId, std::string& OutReason)
+Hub::Response
+Hub::Wake(const std::string& ModuleId)
{
+ ZEN_ASSERT(!m_ShutdownFlag.load());
+
StorageServerInstance::ExclusiveLockedPtr Instance;
{
RwLock::ExclusiveLockScope _(m_Lock);
- if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) ||
- m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId) || m_RecoveringModules.contains(ModuleId))
+ if (m_WakingModules.contains(ModuleId))
+ {
+ return Response{EResponseCode::Accepted};
+ }
+
+ if (IsModuleInFlightLocked(ModuleId))
{
- OutReason = fmt::format("Module '{}' is currently changing state", ModuleId);
- return false;
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)};
}
auto It = m_InstanceLookup.find(ModuleId);
if (It == m_InstanceLookup.end())
{
- OutReason.clear(); // empty = not found (-> 404)
- return false;
+ return Response{EResponseCode::NotFound};
}
const size_t ActiveInstanceIndex = It->second;
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true);
- m_WakingModules.emplace(ModuleId);
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
+ ZEN_ASSERT(InstanceRaw);
+
+ if (InstanceRaw->GetState() == HubInstanceState::Provisioned)
+ {
+ return Response{EResponseCode::Completed};
+ }
+
+ Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
+ m_WakingModules.emplace(ModuleId, Instance.GetBasePort());
}
+ // NOTE: done while not holding the hub lock, to avoid blocking other operations.
+ // m_WakingModules tracks which modules are being woken, blocking
+ // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
+
ZEN_ASSERT(Instance);
+ // Validate state while holding the exclusive instance lock (state cannot change).
+ // This gives a synchronous error response for invalid-state calls on both the sync
+ // and async paths, matching the existing behaviour.
+ if (Instance.GetState() != HubInstanceState::Hibernated)
+ {
+ const Response WakeResp =
+ Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState()))};
+ Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock)
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_WakingModules.erase(ModuleId);
+ return WakeResp;
+ }
+
+ if (m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_WorkerPool->ScheduleWork(
+ [this,
+ ModuleId = std::string(ModuleId),
+ Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ try
+ {
+ CompleteWake(*Instance);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async wake of module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ // Dispatch failed: undo the latch increment and tracking-set membership.
+ // State has not changed so no callback is needed.
+ // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock)
+ ZEN_ERROR("Failed async dispatch wake of module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+ Instance = {};
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_WakingModules.erase(std::string(ModuleId));
+ }
+ throw;
+ }
+ }
+ else
+ {
+ CompleteWake(Instance);
+ }
+
+ return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+}
+
+void
+Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance)
+{
+ const std::string ModuleId(Instance.GetModuleId());
uint16_t BasePort = Instance.GetBasePort();
std::string BaseUri; // TODO?
HubInstanceState OldState = Instance.GetState();
@@ -591,19 +901,17 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason)
auto RemoveWakingModule = MakeGuard([&] {
RwLock::ExclusiveLockScope _(m_Lock);
- m_WakingModules.erase(ModuleId);
+ m_WakingModules.erase(std::string(ModuleId));
});
- // NOTE: done while not holding the hub lock, as waking may take time.
- // m_WakingModules tracks which modules are being woken, blocking
- // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
try
{
if (!Instance.Wake())
{
- OutReason = fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState()));
- NewState = Instance.GetState();
- return false;
+ ZEN_WARN("Wake returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState()));
+ NewState = Instance.GetState();
+ Instance = {};
+ return;
}
NewState = Instance.GetState();
Instance = {};
@@ -615,10 +923,6 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason)
Instance = {};
throw;
}
-
- OutReason.clear();
-
- return true;
}
bool
@@ -739,18 +1043,23 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
return true;
}
+bool
+Hub::IsModuleInFlightLocked(std::string_view ModuleId) const
+{
+ const std::string Key(ModuleId);
+ return m_ProvisioningModules.contains(Key) || m_DeprovisioningModules.contains(Key) || m_HibernatingModules.contains(Key) ||
+ m_WakingModules.contains(Key) || m_RecoveringModules.contains(Key);
+}
+
void
Hub::AttemptRecoverInstance(std::string_view ModuleId)
{
StorageServerInstance::ExclusiveLockedPtr Instance;
- StorageServerInstance* RawInstance = nullptr;
{
RwLock::ExclusiveLockScope _(m_Lock);
- if (m_RecoveringModules.contains(std::string(ModuleId)) || m_ProvisioningModules.contains(std::string(ModuleId)) ||
- m_DeprovisioningModules.contains(std::string(ModuleId)) || m_HibernatingModules.contains(std::string(ModuleId)) ||
- m_WakingModules.contains(std::string(ModuleId)))
+ if (IsModuleInFlightLocked(ModuleId))
{
return;
}
@@ -763,38 +1072,42 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
const size_t ActiveInstanceIndex = It->second;
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- RawInstance = m_ActiveInstances[ActiveInstanceIndex].get();
- Instance = RawInstance->LockExclusive(/*Wait*/ true);
+ Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true);
+
+ // Definitive check while hub lock is held: exclusive instance lock is also held,
+ // so state cannot change under us. Bail if state changed (e.g. concurrent deprovision)
+ // or the process restarted since the watchdog fired.
+ if (Instance.GetState() != HubInstanceState::Provisioned || Instance.IsRunning())
+ {
+ return;
+ }
+
m_RecoveringModules.emplace(std::string(ModuleId));
}
ZEN_ASSERT(Instance);
- uint16_t BasePort = Instance.GetBasePort();
- std::string BaseUri; // TODO?
- HubInstanceState OldState = Instance.GetState();
- HubInstanceState NewState = OldState;
- InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
+ const uint16_t BasePort = Instance.GetBasePort();
+ std::string BaseUri; // TODO?
+ HubInstanceState OldState = Instance.GetState();
+ HubInstanceState NewState = OldState;
auto RemoveRecoveringModule = MakeGuard([&] {
RwLock::ExclusiveLockScope _(m_Lock);
m_RecoveringModules.erase(std::string(ModuleId));
});
- // Re-validate: state may have changed between releasing shared lock and acquiring exclusive lock
- if (Instance.GetState() != HubInstanceState::Provisioned || Instance.IsRunning())
- {
- return;
- }
-
if (Instance.RecoverFromCrash())
{
+ // Spawn succeeded -- instance is back in Provisioned state.
NewState = Instance.GetState();
Instance = {};
+ OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri);
return;
}
- // Restart threw but data dir is intact - run Dehydrate via Deprovision before cleanup.
+ // Spawn failed -- instance is now in Crashed state. Dehydrate before tearing down
+ // so any salvageable data is preserved.
try
{
(void)Instance.Deprovision();
@@ -803,7 +1116,6 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
{
ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what());
}
- NewState = Instance.GetState();
Instance = {};
std::unique_ptr<StorageServerInstance> DestroyInstance;
@@ -831,6 +1143,10 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
{
ZEN_ERROR("Failed to destroy recovered instance for module '{}': {}", ModuleId, Ex.what());
}
+
+ // Notify after all cleanup -- port is back in m_FreePorts and the callback sees
+ // a consistent end-state: module gone, transition complete.
+ OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri);
}
void
@@ -942,9 +1258,10 @@ namespace hub_testutils {
std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir,
Hub::Configuration Config = {},
- Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {})
+ Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {},
+ WorkerThreadPool* WorkerPool = nullptr)
{
- return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback));
+ return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), WorkerPool, std::move(StateChangeCallback));
}
struct CallbackRecord
@@ -989,10 +1306,8 @@ TEST_CASE("hub.provision_basic")
CHECK_FALSE(HubInstance->Find("module_a"));
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
- CHECK(Reason.empty());
+ const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info);
+ REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
CHECK_NE(Info.Port, 0);
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
Hub::InstanceInfo InstanceInfo;
@@ -1004,9 +1319,8 @@ TEST_CASE("hub.provision_basic")
CHECK(ModClient.Get("/health/"));
}
- const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason);
- CHECK(DeprovisionResult);
- CHECK(Reason.empty());
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
CHECK_FALSE(HubInstance->Find("module_a"));
@@ -1037,9 +1351,8 @@ TEST_CASE("hub.provision_config")
CHECK_FALSE(HubInstance->Find("module_a"));
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
+ const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info);
+ REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
CHECK_NE(Info.Port, 0);
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
Hub::InstanceInfo InstanceInfo;
@@ -1056,8 +1369,8 @@ TEST_CASE("hub.provision_config")
CHECK(ModClient.Get("/health/"));
}
- const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason);
- CHECK(DeprovisionResult);
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
CHECK_FALSE(HubInstance->Find("module_a"));
@@ -1076,10 +1389,9 @@ TEST_CASE("hub.provision_callbacks")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, CaptureInstance.CaptureFunc());
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool ProvisionResult = HubInstance->Provision("cb_module", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
+ const Hub::Response ProvisionResult = HubInstance->Provision("cb_module", Info);
+ REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
{
RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
@@ -1094,8 +1406,8 @@ TEST_CASE("hub.provision_callbacks")
CHECK(ModClient.Get("/health/"));
}
- const bool DeprovisionResult = HubInstance->Deprovision("cb_module", Reason);
- CHECK(DeprovisionResult);
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("cb_module");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
{
HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
@@ -1121,27 +1433,24 @@ TEST_CASE("hub.instance_limit")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool FirstResult = HubInstance->Provision("limit_a", Info, Reason);
- REQUIRE_MESSAGE(FirstResult, Reason);
+ const Hub::Response FirstResult = HubInstance->Provision("limit_a", Info);
+ REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Completed, FirstResult.Message);
- const bool SecondResult = HubInstance->Provision("limit_b", Info, Reason);
- REQUIRE_MESSAGE(SecondResult, Reason);
+ const Hub::Response SecondResult = HubInstance->Provision("limit_b", Info);
+ REQUIRE_MESSAGE(SecondResult.ResponseCode == Hub::EResponseCode::Completed, SecondResult.Message);
CHECK_EQ(HubInstance->GetInstanceCount(), 2);
- Reason.clear();
- const bool ThirdResult = HubInstance->Provision("limit_c", Info, Reason);
- CHECK_FALSE(ThirdResult);
+ const Hub::Response ThirdResult = HubInstance->Provision("limit_c", Info);
+ CHECK(ThirdResult.ResponseCode == Hub::EResponseCode::Rejected);
CHECK_EQ(HubInstance->GetInstanceCount(), 2);
- CHECK_NE(Reason.find("instance limit"), std::string::npos);
+ CHECK_NE(ThirdResult.Message.find("instance limit"), std::string::npos);
- HubInstance->Deprovision("limit_a", Reason);
+ HubInstance->Deprovision("limit_a");
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
- Reason.clear();
- const bool FourthResult = HubInstance->Provision("limit_d", Info, Reason);
- CHECK_MESSAGE(FourthResult, Reason);
+ const Hub::Response FourthResult = HubInstance->Provision("limit_d", Info);
+ CHECK_MESSAGE(FourthResult.ResponseCode == Hub::EResponseCode::Completed, FourthResult.Message);
CHECK_EQ(HubInstance->GetInstanceCount(), 2);
}
@@ -1151,10 +1460,15 @@ TEST_CASE("hub.enumerate_modules")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
HubProvisionedInstanceInfo Info;
- std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision("enum_a", Info, Reason), Reason);
- REQUIRE_MESSAGE(HubInstance->Provision("enum_b", Info, Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Provision("enum_a", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ {
+ const Hub::Response R = HubInstance->Provision("enum_b", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
std::vector<std::string> Ids;
int ProvisionedCount = 0;
@@ -1172,7 +1486,7 @@ TEST_CASE("hub.enumerate_modules")
CHECK(FoundA);
CHECK(FoundB);
- HubInstance->Deprovision("enum_a", Reason);
+ HubInstance->Deprovision("enum_a");
Ids.clear();
ProvisionedCount = 0;
HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
@@ -1195,17 +1509,22 @@ TEST_CASE("hub.max_instance_count")
CHECK_EQ(HubInstance->GetMaxInstanceCount(), 0);
HubProvisionedInstanceInfo Info;
- std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision("max_a", Info, Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Provision("max_a", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
CHECK_GE(HubInstance->GetMaxInstanceCount(), 1);
- REQUIRE_MESSAGE(HubInstance->Provision("max_b", Info, Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Provision("max_b", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
CHECK_GE(HubInstance->GetMaxInstanceCount(), 2);
const int MaxAfterTwo = HubInstance->GetMaxInstanceCount();
- HubInstance->Deprovision("max_a", Reason);
+ HubInstance->Deprovision("max_a");
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo);
}
@@ -1228,8 +1547,8 @@ TEST_CASE("hub.concurrent_callbacks")
for (int I = 0; I < kHalf; ++I)
{
HubProvisionedInstanceInfo Info;
- std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason);
+ const Hub::Response ProvR = HubInstance->Provision(fmt::format("pre_{}", I), Info);
+ REQUIRE_MESSAGE(ProvR.ResponseCode == Hub::EResponseCode::Completed, ProvR.Message);
}
CHECK_EQ(HubInstance->GetInstanceCount(), kHalf);
@@ -1253,23 +1572,21 @@ TEST_CASE("hub.concurrent_callbacks")
for (int I = 0; I < kHalf; ++I)
{
- ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] {
- HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool Result =
- HubInstance->Provision(fmt::format("new_{}", I), Info, Reason);
- ProvisionResults[I] = Result ? 1 : 0;
- ProvisionReasons[I] = Reason;
- }),
- WorkerThreadPool::EMode::EnableBacklog);
-
- DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] {
- std::string Reason;
- const bool Result =
- HubInstance->Deprovision(fmt::format("pre_{}", I), Reason);
- DeprovisionResults[I] = Result ? 1 : 0;
- }),
- WorkerThreadPool::EMode::EnableBacklog);
+ ProvisionFutures[I] =
+ Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] {
+ HubProvisionedInstanceInfo Info;
+ const Hub::Response Result = HubInstance->Provision(fmt::format("new_{}", I), Info);
+ ProvisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0;
+ ProvisionReasons[I] = Result.Message;
+ }),
+ WorkerThreadPool::EMode::EnableBacklog);
+
+ DeprovisionFutures[I] =
+ Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] {
+ const Hub::Response Result = HubInstance->Deprovision(fmt::format("pre_{}", I));
+ DeprovisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0;
+ }),
+ WorkerThreadPool::EMode::EnableBacklog);
}
for (std::future<void>& F : ProvisionFutures)
@@ -1324,14 +1641,13 @@ TEST_CASE("hub.job_object")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool ProvisionResult = HubInstance->Provision("jobobj_a", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
+ const Hub::Response ProvisionResult = HubInstance->Provision("jobobj_a", Info);
+ REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
CHECK_NE(Info.Port, 0);
- const bool DeprovisionResult = HubInstance->Deprovision("jobobj_a", Reason);
- CHECK(DeprovisionResult);
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("jobobj_a");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
}
@@ -1344,14 +1660,13 @@ TEST_CASE("hub.job_object")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool ProvisionResult = HubInstance->Provision("nojobobj_a", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
+ const Hub::Response ProvisionResult = HubInstance->Provision("nojobobj_a", Info);
+ REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
CHECK_NE(Info.Port, 0);
- const bool DeprovisionResult = HubInstance->Deprovision("nojobobj_a", Reason);
- CHECK(DeprovisionResult);
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("nojobobj_a");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
}
}
@@ -1366,11 +1681,12 @@ TEST_CASE("hub.hibernate_wake")
HubProvisionedInstanceInfo ProvInfo;
Hub::InstanceInfo Info;
- std::string Reason;
// Provision
- REQUIRE_MESSAGE(HubInstance->Provision("hib_a", ProvInfo, Reason), Reason);
- CHECK(Reason.empty());
+ {
+ const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
REQUIRE(HubInstance->Find("hib_a", &Info));
CHECK_EQ(Info.State, HubInstanceState::Provisioned);
{
@@ -1379,9 +1695,8 @@ TEST_CASE("hub.hibernate_wake")
}
// Hibernate
- const bool HibernateResult = HubInstance->Hibernate("hib_a", Reason);
- REQUIRE_MESSAGE(HibernateResult, Reason);
- CHECK(Reason.empty());
+ const Hub::Response HibernateResult = HubInstance->Hibernate("hib_a");
+ REQUIRE_MESSAGE(HibernateResult.ResponseCode == Hub::EResponseCode::Completed, HibernateResult.Message);
REQUIRE(HubInstance->Find("hib_a", &Info));
CHECK_EQ(Info.State, HubInstanceState::Hibernated);
{
@@ -1390,9 +1705,8 @@ TEST_CASE("hub.hibernate_wake")
}
// Wake
- const bool WakeResult = HubInstance->Wake("hib_a", Reason);
- REQUIRE_MESSAGE(WakeResult, Reason);
- CHECK(Reason.empty());
+ const Hub::Response WakeResult = HubInstance->Wake("hib_a");
+ REQUIRE_MESSAGE(WakeResult.ResponseCode == Hub::EResponseCode::Completed, WakeResult.Message);
REQUIRE(HubInstance->Find("hib_a", &Info));
CHECK_EQ(Info.State, HubInstanceState::Provisioned);
{
@@ -1401,9 +1715,8 @@ TEST_CASE("hub.hibernate_wake")
}
// Deprovision
- const bool DeprovisionResult = HubInstance->Deprovision("hib_a", Reason);
- CHECK(DeprovisionResult);
- CHECK(Reason.empty());
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("hib_a");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_FALSE(HubInstance->Find("hib_a"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
@@ -1419,36 +1732,131 @@ TEST_CASE("hub.hibernate_wake_errors")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo ProvInfo;
- std::string Reason;
- // Hibernate/wake on a non-existent module - should return false with empty reason (-> 404)
- CHECK_FALSE(HubInstance->Hibernate("never_provisioned", Reason));
- CHECK(Reason.empty());
+ // Hibernate/wake on a non-existent module - returns NotFound (-> 404)
+ CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+ CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
- CHECK_FALSE(HubInstance->Wake("never_provisioned", Reason));
- CHECK(Reason.empty());
+ // Double-hibernate: second hibernate on already-hibernated module returns Completed (idempotent)
+ {
+ const Hub::Response R = HubInstance->Provision("err_b", ProvInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ {
+ const Hub::Response R = HubInstance->Hibernate("err_b");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
- // Double-hibernate: first hibernate succeeds, second returns false with non-empty reason (-> 400)
- REQUIRE_MESSAGE(HubInstance->Provision("err_b", ProvInfo, Reason), Reason);
- CHECK(Reason.empty());
- REQUIRE_MESSAGE(HubInstance->Hibernate("err_b", Reason), Reason);
- CHECK(Reason.empty());
+ {
+ const Hub::Response HibResp = HubInstance->Hibernate("err_b");
+ CHECK(HibResp.ResponseCode == Hub::EResponseCode::Completed);
+ }
- Reason.clear();
- CHECK_FALSE(HubInstance->Hibernate("err_b", Reason));
- CHECK_FALSE(Reason.empty());
+ // Wake on provisioned: succeeds (-> Provisioned), then wake again returns Completed (idempotent)
+ {
+ const Hub::Response R = HubInstance->Wake("err_b");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
- // Wake on provisioned: succeeds (-> Provisioned), then wake again returns false (-> 400)
- REQUIRE_MESSAGE(HubInstance->Wake("err_b", Reason), Reason);
- CHECK(Reason.empty());
+ {
+ const Hub::Response WakeResp = HubInstance->Wake("err_b");
+ CHECK(WakeResp.ResponseCode == Hub::EResponseCode::Completed);
+ }
- Reason.clear();
- CHECK_FALSE(HubInstance->Wake("err_b", Reason));
- CHECK_FALSE(Reason.empty());
+ // Deprovision not-found - returns NotFound (-> 404)
+ CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+}
+
+TEST_CASE("hub.async_hibernate_wake")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ Hub::Configuration Config;
+ Config.BasePortNumber = 23000;
+
+ WorkerThreadPool WorkerPool(2, "hub_async_hib_wake");
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+
+ HubProvisionedInstanceInfo ProvInfo;
+ Hub::InstanceInfo Info;
- // Deprovision not-found - should return false with empty reason (-> 404)
- CHECK_FALSE(HubInstance->Deprovision("never_provisioned", Reason));
- CHECK(Reason.empty());
+ constexpr auto kPollInterval = std::chrono::milliseconds(200);
+ constexpr auto kTimeout = std::chrono::seconds(30);
+
+ // Provision and wait until Provisioned
+ {
+ const Hub::Response R = HubInstance->Provision("async_hib_a", ProvInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message);
+ }
+ {
+ const auto Deadline = std::chrono::steady_clock::now() + kTimeout;
+ bool Ready = false;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned)
+ {
+ Ready = true;
+ break;
+ }
+ std::this_thread::sleep_for(kPollInterval);
+ }
+ REQUIRE_MESSAGE(Ready, "Instance did not reach Provisioned state within timeout");
+ }
+
+ // Hibernate asynchronously and poll until Hibernated
+ {
+ const Hub::Response R = HubInstance->Hibernate("async_hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message);
+ }
+ {
+ const auto Deadline = std::chrono::steady_clock::now() + kTimeout;
+ bool Hibernated = false;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Hibernated)
+ {
+ Hibernated = true;
+ break;
+ }
+ std::this_thread::sleep_for(kPollInterval);
+ }
+ REQUIRE_MESSAGE(Hibernated, "Instance did not reach Hibernated state within timeout");
+ }
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
+ }
+
+ // Wake asynchronously and poll until Provisioned
+ {
+ const Hub::Response R = HubInstance->Wake("async_hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message);
+ }
+ {
+ const auto Deadline = std::chrono::steady_clock::now() + kTimeout;
+ bool Woken = false;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned)
+ {
+ Woken = true;
+ break;
+ }
+ std::this_thread::sleep_for(kPollInterval);
+ }
+ REQUIRE_MESSAGE(Woken, "Instance did not reach Provisioned state after wake within timeout");
+ }
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+ }
+
+ // Deprovision
+ {
+ const Hub::Response R = HubInstance->Deprovision("async_hib_a");
+ CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message);
+ }
+ CHECK_FALSE(HubInstance->Find("async_hib_a"));
}
TEST_CASE("hub.recover_process_crash")
@@ -1457,8 +1865,10 @@ TEST_CASE("hub.recover_process_crash")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
HubProvisionedInstanceInfo Info;
- std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision("module_a", Info, Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Provision("module_a", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
// Kill the child process to simulate a crash, then poll until the watchdog detects it,
// recovers the instance, and the new process is serving requests.
@@ -1494,8 +1904,10 @@ TEST_CASE("hub.recover_process_crash_then_deprovision")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
HubProvisionedInstanceInfo Info;
- std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision("module_a", Info, Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Provision("module_a", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
// Kill the child process, wait for the watchdog to detect and recover the instance.
HubInstance->TerminateModuleForTesting("module_a");
@@ -1518,16 +1930,166 @@ TEST_CASE("hub.recover_process_crash_then_deprovision")
REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout");
// After recovery, deprovision should succeed and a re-provision should work.
- CHECK_MESSAGE(HubInstance->Deprovision("module_a", Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Deprovision("module_a");
+ CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
HubProvisionedInstanceInfo NewInfo;
- CHECK_MESSAGE(HubInstance->Provision("module_a", NewInfo, Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Provision("module_a", NewInfo);
+ CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
CHECK_NE(NewInfo.Port, 0);
HttpClient NewClient(fmt::format("http://localhost:{}", NewInfo.Port), kFastTimeout);
CHECK_MESSAGE(NewClient.Get("/health/"), "Re-provisioned instance is not serving requests");
}
+TEST_CASE("hub.async_provision_concurrent")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ constexpr int kModuleCount = 8;
+
+ Hub::Configuration Config;
+ Config.BasePortNumber = 22800;
+ Config.InstanceLimit = kModuleCount;
+
+ WorkerThreadPool WorkerPool(4, "hub_async_concurrent");
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+
+ std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount);
+ std::vector<std::string> Reasons(kModuleCount);
+ std::vector<int> Results(kModuleCount, 0);
+
+ {
+ WorkerThreadPool Callers(kModuleCount, "hub_async_callers");
+ std::vector<std::future<void>> Futures(kModuleCount);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ Futures[I] = Callers.EnqueueTask(std::packaged_task<void()>([&, I] {
+ const Hub::Response Resp = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]);
+ Results[I] = (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? 1 : 0;
+ Reasons[I] = Resp.Message;
+ }),
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ for (std::future<void>& F : Futures)
+ {
+ F.get();
+ }
+ }
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ REQUIRE_MESSAGE(Results[I] != 0, Reasons[I]);
+ CHECK_NE(Infos[I].Port, 0);
+ }
+
+ // Poll until all instances reach Provisioned state
+ constexpr auto kPollInterval = std::chrono::milliseconds(200);
+ constexpr auto kTimeout = std::chrono::seconds(30);
+ const auto Deadline = std::chrono::steady_clock::now() + kTimeout;
+
+ bool AllProvisioned = false;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ int ProvisionedCount = 0;
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ Hub::InstanceInfo InstanceInfo;
+ if (HubInstance->Find(fmt::format("async_c{}", I), &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned)
+ {
+ ++ProvisionedCount;
+ }
+ }
+ if (ProvisionedCount == kModuleCount)
+ {
+ AllProvisioned = true;
+ break;
+ }
+ std::this_thread::sleep_for(kPollInterval);
+ }
+ CHECK_MESSAGE(AllProvisioned, "Not all instances reached Provisioned state within timeout");
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout);
+ CHECK_MESSAGE(ModClient.Get("/health/"), fmt::format("async_c{} not serving requests", I));
+ }
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ const Hub::Response DepResp = HubInstance->Deprovision(fmt::format("async_c{}", I));
+ CHECK_MESSAGE(DepResp.ResponseCode == Hub::EResponseCode::Accepted, DepResp.Message);
+ }
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+}
+
+TEST_CASE("hub.async_provision_shutdown_waits")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ constexpr int kModuleCount = 8;
+
+ Hub::Configuration Config;
+ Config.InstanceLimit = kModuleCount;
+ Config.BasePortNumber = 22900;
+
+ WorkerThreadPool WorkerPool(2, "hub_async_shutdown");
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+
+ std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ const Hub::Response ProvResult = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]);
+ REQUIRE_MESSAGE(ProvResult.ResponseCode == Hub::EResponseCode::Accepted, ProvResult.Message);
+ REQUIRE_NE(Infos[I].Port, 0);
+ }
+
+ // Shut down without polling for Provisioned; Shutdown() must drain the latch and clean up.
+ HubInstance->Shutdown();
+
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout);
+ CHECK_FALSE(ModClient.Get("/health/"));
+ }
+}
+
+TEST_CASE("hub.async_provision_rejected")
+{
+ // Rejection from CanProvisionInstance fires synchronously even when a WorkerPool is present.
+ ScopedTemporaryDirectory TempDir;
+
+ Hub::Configuration Config;
+ Config.InstanceLimit = 1;
+ Config.BasePortNumber = 23100;
+
+ WorkerThreadPool WorkerPool(2, "hub_async_rejected");
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+
+ HubProvisionedInstanceInfo Info;
+
+ // First provision: dispatched to WorkerPool, returns Accepted
+ const Hub::Response FirstResult = HubInstance->Provision("async_r1", Info);
+ REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Accepted, FirstResult.Message);
+ REQUIRE_NE(Info.Port, 0);
+
+ // Second provision: CanProvisionInstance rejects synchronously (limit reached), returns Rejected
+ HubProvisionedInstanceInfo Info2;
+ const Hub::Response SecondResult = HubInstance->Provision("async_r2", Info2);
+ CHECK(SecondResult.ResponseCode == Hub::EResponseCode::Rejected);
+ CHECK_FALSE(SecondResult.Message.empty());
+ CHECK_NE(SecondResult.Message.find("instance limit"), std::string::npos);
+ CHECK_EQ(HubInstance->GetInstanceCount(), 1);
+}
+
TEST_SUITE_END();
void