aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub/hub.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
-rw-r--r--src/zenserver/hub/hub.cpp2770
1 files changed, 2353 insertions, 417 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index 2f3873884..0cb25fdd6 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -9,6 +9,9 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+#include <zencore/workthreadpool.h>
+#include <zenhttp/httpclient.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <EASTL/fixed_vector.h>
@@ -16,11 +19,8 @@ ZEN_THIRD_PARTY_INCLUDES_START
ZEN_THIRD_PARTY_INCLUDES_END
#if ZEN_WITH_TESTS
-# include <zencore/filesystem.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
-# include <zencore/workthreadpool.h>
-# include <zenhttp/httpclient.h>
#endif
#include <numeric>
@@ -121,29 +121,90 @@ private:
//////////////////////////////////////////////////////////////////////////
-Hub::Hub(const Configuration& Config,
- ZenServerEnvironment&& RunEnvironment,
- ProvisionModuleCallbackFunc&& ProvisionedModuleCallback,
- ProvisionModuleCallbackFunc&& DeprovisionedModuleCallback)
+ProcessMetrics
+Hub::AtomicProcessMetrics::Load() const
+{
+ return {
+ .MemoryBytes = MemoryBytes.load(),
+ .KernelTimeMs = KernelTimeMs.load(),
+ .UserTimeMs = UserTimeMs.load(),
+ .WorkingSetSize = WorkingSetSize.load(),
+ .PeakWorkingSetSize = PeakWorkingSetSize.load(),
+ .PagefileUsage = PagefileUsage.load(),
+ .PeakPagefileUsage = PeakPagefileUsage.load(),
+ };
+}
+
+void
+Hub::AtomicProcessMetrics::Store(const ProcessMetrics& Metrics)
+{
+ MemoryBytes.store(Metrics.MemoryBytes);
+ KernelTimeMs.store(Metrics.KernelTimeMs);
+ UserTimeMs.store(Metrics.UserTimeMs);
+ WorkingSetSize.store(Metrics.WorkingSetSize);
+ PeakWorkingSetSize.store(Metrics.PeakWorkingSetSize);
+ PagefileUsage.store(Metrics.PagefileUsage);
+ PeakPagefileUsage.store(Metrics.PeakPagefileUsage);
+}
+
+void
+Hub::AtomicProcessMetrics::Reset()
+{
+ MemoryBytes.store(0);
+ KernelTimeMs.store(0);
+ UserTimeMs.store(0);
+ WorkingSetSize.store(0);
+ PeakWorkingSetSize.store(0);
+ PagefileUsage.store(0);
+ PeakPagefileUsage.store(0);
+}
+
+void
+Hub::GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace) const
+{
+ m_Lock.WithSharedLock([&]() {
+ OutSystemMetrict = m_SystemMetrics;
+ OutDiskSpace = m_DiskSpace;
+ });
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback)
: m_Config(Config)
, m_RunEnvironment(std::move(RunEnvironment))
-, m_ProvisionedModuleCallback(std::move(ProvisionedModuleCallback))
-, m_DeprovisionedModuleCallback(std::move(DeprovisionedModuleCallback))
+, m_WorkerPool(Config.OptionalProvisionWorkerPool)
+, m_BackgroundWorkLatch(1)
+, m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback))
+, m_ActiveInstances(Config.InstanceLimit)
+, m_FreeActiveInstanceIndexes(Config.InstanceLimit)
{
- m_HostMetrics = GetSystemMetrics();
- m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024;
- m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024;
+ ZEN_ASSERT_FORMAT(
+ Config.OptionalProvisionWorkerPool != Config.OptionalHydrationWorkerPool || Config.OptionalProvisionWorkerPool == nullptr,
+ "Provision and hydration worker pools must be distinct to avoid deadlocks");
- m_FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage");
- ZEN_INFO("using file hydration path: '{}'", m_FileHydrationPath);
+ if (!m_Config.HydrationTargetSpecification.empty())
+ {
+ m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification;
+ }
+ else if (!m_Config.HydrationOptions)
+ {
+ std::filesystem::path FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage");
+ ZEN_INFO("using file hydration path: '{}'", FileHydrationPath);
+ m_HydrationTargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native()));
+ }
+ else
+ {
+ m_HydrationOptions = m_Config.HydrationOptions;
+ }
m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp");
ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath);
ZEN_ASSERT(uint64_t(Config.BasePortNumber) + Config.InstanceLimit <= std::numeric_limits<uint16_t>::max());
- m_FreePorts.resize(Config.InstanceLimit);
- std::iota(m_FreePorts.begin(), m_FreePorts.end(), Config.BasePortNumber);
+ m_InstanceLookup.reserve(Config.InstanceLimit);
+ std::iota(m_FreeActiveInstanceIndexes.begin(), m_FreeActiveInstanceIndexes.end(), 0);
#if ZEN_PLATFORM_WINDOWS
if (m_Config.UseJobObject)
@@ -159,351 +220,1743 @@ Hub::Hub(const Configuration& Config,
}
}
#endif
+
+ UpdateMachineMetrics();
+
+ m_WatchDog = std::thread([this]() { WatchDog(); });
}
Hub::~Hub()
{
try
{
- ZEN_INFO("Hub service shutting down, deprovisioning any current instances");
+ // Safety call - should normally be properly Shutdown by owner
+ if (!m_ShutdownFlag.load())
+ {
+ Shutdown();
+ }
+ }
+ catch (const std::exception& e)
+ {
+ ZEN_WARN("Exception during hub service shutdown: {}", e.what());
+ }
+}
+
+void
+Hub::Shutdown()
+{
+ ZEN_INFO("Hub service shutting down, deprovisioning any current instances");
+
+ bool Expected = false;
+ bool WaitForBackgroundWork = m_ShutdownFlag.compare_exchange_strong(Expected, true);
+
+ m_WatchDogEvent.Set();
+ if (m_WatchDog.joinable())
+ {
+ m_WatchDog.join();
+ }
+
+ m_WatchDog = {};
+
+ if (WaitForBackgroundWork && m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.CountDown();
+ m_BackgroundWorkLatch.Wait();
+ // Shutdown flag is set and all background work is drained, safe to shut down remaining instances
+
+ m_BackgroundWorkLatch.Reset(1);
+ }
- m_Lock.WithExclusiveLock([this] {
- for (auto& [ModuleId, Instance] : m_Instances)
+ EnumerateModules([&](std::string_view ModuleId, const InstanceInfo& Info) {
+ ZEN_UNUSED(Info);
+ try
+ {
+ const Response DepResp = InternalDeprovision(std::string(ModuleId), [](ActiveInstance& Instance) {
+ ZEN_UNUSED(Instance);
+ return true;
+ });
+ if (DepResp.ResponseCode != EResponseCode::Completed && DepResp.ResponseCode != EResponseCode::Accepted)
{
- uint16_t BasePort = Instance->GetBasePort();
- std::string BaseUri; // TODO?
+ ZEN_WARN("Deprovision instance for module '{}' during hub shutdown rejected: {}", ModuleId, DepResp.Message);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what());
+ }
+ });
- if (m_DeprovisionedModuleCallback)
+ if (WaitForBackgroundWork && m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.CountDown();
+ m_BackgroundWorkLatch.Wait();
+ }
+}
+
+Hub::Response
+Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
+{
+ ZEN_ASSERT(!m_ShutdownFlag.load());
+ StorageServerInstance::ExclusiveLockedPtr Instance;
+ bool IsNewInstance = false;
+ size_t ActiveInstanceIndex = (size_t)-1;
+ HubInstanceState OldState = HubInstanceState::Unprovisioned;
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end())
+ {
+ std::string Reason;
+ if (!CanProvisionInstanceLocked(ModuleId, /* out */ Reason))
+ {
+ ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason);
+
+ return Response{EResponseCode::Rejected, Reason};
+ }
+
+ IsNewInstance = true;
+
+ ActiveInstanceIndex = m_FreeActiveInstanceIndexes.front();
+ m_FreeActiveInstanceIndexes.pop_front();
+ ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex);
+
+ try
+ {
+ auto NewInstance = std::make_unique<StorageServerInstance>(
+ m_RunEnvironment,
+ StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
+ .StateDir = m_RunEnvironment.CreateChildDir(ModuleId),
+ .TempDir = m_HydrationTempPath / ModuleId,
+ .HydrationTargetSpecification = m_HydrationTargetSpecification,
+ .HydrationOptions = m_HydrationOptions,
+ .HttpThreadCount = m_Config.InstanceHttpThreadCount,
+ .CoreLimit = m_Config.InstanceCoreLimit,
+ .ConfigPath = m_Config.InstanceConfigPath,
+ .Malloc = m_Config.InstanceMalloc,
+ .Trace = m_Config.InstanceTrace,
+ .TraceHost = m_Config.InstanceTraceHost,
+ .TraceFile = m_Config.InstanceTraceFile,
+ .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool},
+ ModuleId);
+
+#if ZEN_PLATFORM_WINDOWS
+ if (m_JobObject.IsValid())
+ {
+ NewInstance->SetJobObject(&m_JobObject);
+ }
+#endif
+
+ Instance = NewInstance->LockExclusive(/*Wait*/ true);
+
+ m_ActiveInstances[ActiveInstanceIndex].Instance = std::move(NewInstance);
+ m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Reset();
+ m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex);
+ // Set Provisioning while both hub lock and instance lock are held so that any
+ // concurrent Deprovision sees the in-flight state, not Unprovisioned.
+ OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioning);
+ }
+ catch (const std::exception&)
+ {
+ Instance = {};
+ m_ActiveInstances[ActiveInstanceIndex].Instance.reset();
+ m_ActiveInstances[ActiveInstanceIndex].State.store(HubInstanceState::Unprovisioned);
+ m_InstanceLookup.erase(std::string(ModuleId));
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ throw;
+ }
+
+ OutInfo.Port = GetInstanceIndexAssignedPort(ActiveInstanceIndex);
+
+ ZEN_INFO("Created new storage server instance for module '{}'", ModuleId);
+
+ const int CurrentInstanceCount = gsl::narrow_cast<int>(m_InstanceLookup.size());
+ int CurrentMaxCount = m_MaxInstanceCount.load();
+ const int NewMax = Max(CurrentMaxCount, CurrentInstanceCount);
+
+ m_MaxInstanceCount.compare_exchange_weak(CurrentMaxCount, NewMax);
+ }
+ else
+ {
+ ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex);
+
+ HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance;
+ ZEN_ASSERT(InstanceRaw);
+
+ OutInfo.Port = InstanceRaw->GetBasePort();
+
+ switch (CurrentState)
+ {
+ case HubInstanceState::Provisioning:
+ return Response{EResponseCode::Accepted};
+ case HubInstanceState::Crashed:
+ case HubInstanceState::Unprovisioned:
+ break;
+ case HubInstanceState::Provisioned:
+ return Response{EResponseCode::Completed};
+ case HubInstanceState::Hibernated:
+ _.ReleaseNow();
+ return Wake(std::string(ModuleId));
+ default:
+ return Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))};
+ }
+
+ Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
+
+ // Re-validate state after acquiring the instance lock: a concurrent Provision may have
+ // completed between our hub-lock read and LockExclusive, transitioning the state away
+ // from Crashed/Unprovisioned.
+ HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+ if (ActualState != HubInstanceState::Crashed && ActualState != HubInstanceState::Unprovisioned)
+ {
+ Instance = {};
+ if (ActualState == HubInstanceState::Provisioned)
+ {
+ return Response{EResponseCode::Completed};
+ }
+ if (ActualState == HubInstanceState::Provisioning)
{
+ return Response{EResponseCode::Accepted};
+ }
+ return Response{
+ EResponseCode::Rejected,
+ fmt::format("Module '{}' state changed to '{}' before provision could proceed", ModuleId, ToString(ActualState))};
+ }
+ // Set Provisioning while both hub lock and instance lock are held so that any
+ // concurrent Deprovision sees the in-flight state, not Crashed/Unprovisioned.
+ OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioning);
+ }
+ }
+
+ // NOTE: done while not holding the hub lock, to avoid blocking other operations.
+ // Both hub-lock paths above set OldState and updated the state to Provisioning before
+ // releasing the hub lock, so concurrent operations already see the in-flight state.
+
+ ZEN_ASSERT(Instance);
+ ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
+
+ NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Provisioning, OutInfo.Port, {});
+
+ if (m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_WorkerPool->ScheduleWork(
+ [this,
+ ModuleId = std::string(ModuleId),
+ ActiveInstanceIndex,
+ OldState,
+ IsNewInstance,
+ Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
try
{
- m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort});
+ CompleteProvision(*Instance, ActiveInstanceIndex, OldState, IsNewInstance);
}
catch (const std::exception& Ex)
{
- ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what());
+ ZEN_ERROR("Failed async provision of module '{}': {}", ModuleId, Ex.what());
}
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ // Dispatch failed: undo latch increment and roll back state.
+ ZEN_ERROR("Failed async dispatch provision of module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+
+ // dispatch failed before the lambda ran, so ActiveInstance::State is still Provisioning
+ NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, OldState, OutInfo.Port, {});
+
+ std::unique_ptr<StorageServerInstance> DestroyInstance;
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex);
+ if (IsNewInstance)
+ {
+ DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(std::string(ModuleId));
}
- Instance->Deprovision();
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState);
}
- m_Instances.clear();
- });
+ DestroyInstance.reset();
+
+ throw;
+ }
}
- catch (const std::exception& e)
+ else
{
- ZEN_WARN("Exception during hub service shutdown: {}", e.what());
+ CompleteProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance);
}
+
+ return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
}
-bool
-Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason)
+void
+Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance,
+ size_t ActiveInstanceIndex,
+ HubInstanceState OldState,
+ bool IsNewInstance)
{
- StorageServerInstance* Instance = nullptr;
- bool IsNewInstance = false;
+ const std::string ModuleId(Instance.GetModuleId());
+ const uint16_t Port = Instance.GetBasePort();
+ std::string BaseUri; // TODO?
+
+ if (m_ShutdownFlag.load() == false)
+ {
+ try
+ {
+ switch (OldState)
+ {
+ case HubInstanceState::Crashed:
+ case HubInstanceState::Unprovisioned:
+ Instance.Provision();
+ break;
+ case HubInstanceState::Hibernated:
+ ZEN_ASSERT(false); // unreachable: Provision redirects Hibernated->Wake before setting Provisioning
+ break;
+ default:
+ ZEN_ASSERT(false);
+ }
+ UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Provisioned, Port, BaseUri);
+ Instance = {};
+ return;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
+ // Instance will be notified and removed below.
+ }
+ }
+
+ if (IsNewInstance)
+ {
+ NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Unprovisioned, Port, {});
+ Instance = {};
+ std::unique_ptr<StorageServerInstance> DestroyInstance;
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex);
+ DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(std::string(ModuleId));
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned);
+ }
+ DestroyInstance.reset();
+ }
+ else
+ {
+ // OldState = Crashed: restore without cleanup (instance stays in lookup)
+ NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, OldState, Port, {});
+ UpdateInstanceState(Instance, ActiveInstanceIndex, OldState);
+ Instance = {};
+ }
+}
+
+Hub::Response
+Hub::Deprovision(const std::string& ModuleId)
+{
+ ZEN_ASSERT(!m_ShutdownFlag.load());
+ return InternalDeprovision(ModuleId, [](ActiveInstance& Instance) {
+ ZEN_UNUSED(Instance);
+ return true;
+ });
+}
+
+Hub::Response
+Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate)
+{
+ StorageServerInstance::ExclusiveLockedPtr Instance;
+ size_t ActiveInstanceIndex = (size_t)-1;
{
RwLock::ExclusiveLockScope _(m_Lock);
- uint16_t AllocatedPort = 0;
- auto RestoreAllocatedPort = MakeGuard([this, &AllocatedPort]() {
- if (AllocatedPort != 0)
- {
- m_FreePorts.push_back(AllocatedPort);
- AllocatedPort = 0;
- }
- });
- if (auto It = m_Instances.find(std::string(ModuleId)); It == m_Instances.end())
+ if (auto It = m_InstanceLookup.find(ModuleId); It == m_InstanceLookup.end())
{
- std::string Reason;
- if (!CanProvisionInstance(ModuleId, /* out */ Reason))
+ ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId);
+
+ return Response{EResponseCode::NotFound};
+ }
+ else
+ {
+ ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+
+ if (!DeprovisionGate(m_ActiveInstances[ActiveInstanceIndex]))
{
- ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason);
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' deprovision denied by gate", ModuleId)};
+ }
- OutReason = Reason;
+ HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load();
- return false;
+ switch (CurrentState)
+ {
+ case HubInstanceState::Deprovisioning:
+ case HubInstanceState::Obliterating:
+ return Response{EResponseCode::Accepted};
+ case HubInstanceState::Crashed:
+ case HubInstanceState::Hibernated:
+ case HubInstanceState::Provisioned:
+ break;
+ case HubInstanceState::Unprovisioned:
+ return Response{EResponseCode::Completed};
+ case HubInstanceState::Recovering:
+ // Recovering is watchdog-managed; reject to avoid interfering with the in-progress
+ // recovery. The watchdog will transition to Provisioned or Unprovisioned, after
+ // which deprovision can be retried.
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)};
+ default:
+ return Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))};
}
- AllocatedPort = m_FreePorts.front();
- m_FreePorts.pop_front();
+ std::unique_ptr<StorageServerInstance>& RawInstance = m_ActiveInstances[ActiveInstanceIndex].Instance;
+ ZEN_ASSERT(RawInstance != nullptr);
- IsNewInstance = true;
- auto NewInstance = std::make_unique<StorageServerInstance>(
- m_RunEnvironment,
- StorageServerInstance::Configuration{.BasePort = AllocatedPort,
- .HydrationTempPath = m_HydrationTempPath,
- .FileHydrationPath = m_FileHydrationPath,
- .HttpThreadCount = m_Config.InstanceHttpThreadCount,
- .CoreLimit = m_Config.InstanceCoreLimit,
- .ConfigPath = m_Config.InstanceConfigPath},
- ModuleId);
-#if ZEN_PLATFORM_WINDOWS
- if (m_JobObject.IsValid())
+ Instance = RawInstance->LockExclusive(/*Wait*/ true);
+ }
+ }
+
+ // NOTE: done while not holding the hub lock, to avoid blocking other operations.
+ // The exclusive instance lock acquired above prevents concurrent LockExclusive callers
+ // from modifying instance state. The state transition to Deprovisioning happens below,
+ // after the hub lock is released.
+
+ ZEN_ASSERT(Instance);
+ ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
+
+ HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Deprovisioning);
+ const uint16_t Port = Instance.GetBasePort();
+ NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Deprovisioning, Port, {});
+
+ if (m_WorkerPool)
+ {
+ std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr =
+ std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance));
+
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_WorkerPool->ScheduleWork(
+ [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr), OldState]() mutable {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ try
+ {
+ CompleteDeprovision(*Instance, ActiveInstanceIndex, OldState);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async deprovision of module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ // Dispatch failed: undo latch increment and roll back state.
+ ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+
+ NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, OldState, Port, {});
{
- NewInstance->SetJobObject(&m_JobObject);
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState);
}
-#endif
- Instance = NewInstance.get();
- m_Instances.emplace(std::string(ModuleId), std::move(NewInstance));
- AllocatedPort = 0;
- ZEN_INFO("Created new storage server instance for module '{}'", ModuleId);
+ throw;
}
- else
+ }
+ else
+ {
+ CompleteDeprovision(Instance, ActiveInstanceIndex, OldState);
+ }
+
+ return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+}
+
+Hub::Response
+Hub::Obliterate(const std::string& ModuleId)
+{
+ ZEN_ASSERT(!m_ShutdownFlag.load());
+
+ StorageServerInstance::ExclusiveLockedPtr Instance;
+ size_t ActiveInstanceIndex = (size_t)-1;
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+
+ if (auto It = m_InstanceLookup.find(ModuleId); It != m_InstanceLookup.end())
{
- Instance = It->second.get();
+ ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+
+ HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+
+ switch (CurrentState)
+ {
+ case HubInstanceState::Obliterating:
+ return Response{EResponseCode::Accepted};
+ case HubInstanceState::Provisioned:
+ case HubInstanceState::Hibernated:
+ case HubInstanceState::Crashed:
+ break;
+ case HubInstanceState::Deprovisioning:
+ return Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' is being deprovisioned, retry after completion", ModuleId)};
+ case HubInstanceState::Recovering:
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)};
+ case HubInstanceState::Unprovisioned:
+ return Response{EResponseCode::Completed};
+ default:
+ return Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))};
+ }
+
+ std::unique_ptr<StorageServerInstance>& RawInstance = m_ActiveInstances[ActiveInstanceIndex].Instance;
+ ZEN_ASSERT(RawInstance != nullptr);
+
+ Instance = RawInstance->LockExclusive(/*Wait*/ true);
}
+ else
+ {
+ // Module not tracked by hub - obliterate backend data directly.
+ // Covers the deprovisioned case where data was preserved via dehydration.
+ if (m_ObliteratingInstances.contains(ModuleId))
+ {
+ return Response{EResponseCode::Accepted};
+ }
+
+ m_ObliteratingInstances.insert(ModuleId);
+ Lock.ReleaseNow();
+
+ if (m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_WorkerPool->ScheduleWork(
+ [this, ModuleId = std::string(ModuleId)]() {
+ auto Guard = MakeGuard([this, ModuleId]() {
+ m_Lock.WithExclusiveLock([this, ModuleId]() { m_ObliteratingInstances.erase(ModuleId); });
+ m_BackgroundWorkLatch.CountDown();
+ });
+ try
+ {
+ ObliterateBackendData(ModuleId);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async obliterate of untracked module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ ZEN_ERROR("Failed to dispatch async obliterate of untracked module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_ObliteratingInstances.erase(ModuleId);
+ }
+ throw;
+ }
+
+ return Response{EResponseCode::Accepted};
+ }
+
+ auto _ = MakeGuard([this, &ModuleId]() {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_ObliteratingInstances.erase(ModuleId);
+ });
+
+ ObliterateBackendData(ModuleId);
- m_ProvisioningModules.emplace(std::string(ModuleId));
+ return Response{EResponseCode::Completed};
+ }
}
- ZEN_ASSERT(Instance != nullptr);
+ HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Obliterating);
+ const uint16_t Port = Instance.GetBasePort();
+ NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Obliterating, Port, {});
- auto RemoveProvisioningModule = MakeGuard([&] {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_ProvisioningModules.erase(std::string(ModuleId));
- });
+ if (m_WorkerPool)
+ {
+ std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr =
+ std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance));
+
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_WorkerPool->ScheduleWork(
+ [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ try
+ {
+ CompleteObliterate(*Instance, ActiveInstanceIndex);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async obliterate of module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ ZEN_ERROR("Failed async dispatch obliterate of module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
- // NOTE: this is done while not holding the lock, as provisioning may take time
- // and we don't want to block other operations. We track which modules are being
- // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision
- // those modules while in this state.
+ NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, OldState, Port, {});
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState);
+ }
- UpdateStats();
+ throw;
+ }
+ }
+ else
+ {
+ CompleteObliterate(Instance, ActiveInstanceIndex);
+ }
+
+ return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+}
+
+void
+Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex)
+{
+ const std::string ModuleId(Instance.GetModuleId());
+ const uint16_t Port = Instance.GetBasePort();
try
{
- Instance->Provision();
+ Instance.Obliterate();
}
catch (const std::exception& Ex)
{
- ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
- if (IsNewInstance)
+ ZEN_ERROR("Failed to obliterate storage server instance for module '{}': {}", ModuleId, Ex.what());
+ Instance = {};
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Crashed);
+ }
+ NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Crashed, Port, {});
+ throw;
+ }
+
+ NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Unprovisioned, Port, {});
+ RemoveInstance(Instance, ActiveInstanceIndex, ModuleId);
+}
+
+void
+Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState)
+{
+ const std::string ModuleId(Instance.GetModuleId());
+ const uint16_t Port = Instance.GetBasePort();
+
+ try
+ {
+ if (OldState == HubInstanceState::Provisioned)
{
- // Clean up
- RwLock::ExclusiveLockScope _(m_Lock);
- if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end())
+ ZEN_INFO("Triggering GC for module {}", ModuleId);
+
+ HttpClient GcClient(fmt::format("http://localhost:{}", Port));
+
+ HttpClient::KeyValueMap Params;
+ Params.Entries.insert({"smallobjects", "true"});
+ Params.Entries.insert({"skipcid", "false"});
+ HttpClient::Response Response = GcClient.Post("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject), Params);
+ Stopwatch Timer;
+ while (Response && Timer.GetElapsedTimeMs() < 5000)
{
- ZEN_ASSERT(It->second != nullptr);
- uint16_t BasePort = It->second->GetBasePort();
- m_FreePorts.push_back(BasePort);
- m_Instances.erase(It);
+ Response = GcClient.Get("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject));
+ if (Response)
+ {
+ bool Complete = Response.AsObject()["Status"].AsString() != "Running";
+ if (Complete)
+ {
+ break;
+ }
+ Sleep(50);
+ }
}
}
- return false;
+ Instance.Deprovision();
}
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to deprovision storage server instance for module '{}': {}", ModuleId, Ex.what());
+ // Effectively unreachable: Shutdown() never throws and Dehydrate() failures are swallowed
+ // by DeprovisionLocked. Kept as a safety net; if somehow reached, transition to Crashed
+ // so the watchdog can attempt recovery.
+ Instance = {};
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Crashed);
+ }
+ NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Crashed, Port, {});
+ throw;
+ }
+
+ NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Unprovisioned, Port, {});
+ RemoveInstance(Instance, ActiveInstanceIndex, ModuleId);
+}
+
+Hub::Response
+Hub::Hibernate(const std::string& ModuleId)
+{
+ ZEN_ASSERT(!m_ShutdownFlag.load());
- OutInfo.Port = Instance->GetBasePort();
- // TODO: base URI? Would need to know what host name / IP to use
+ StorageServerInstance::ExclusiveLockedPtr Instance;
+ size_t ActiveInstanceIndex = (size_t)-1;
- if (m_ProvisionedModuleCallback)
{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ auto It = m_InstanceLookup.find(ModuleId);
+ if (It == m_InstanceLookup.end())
+ {
+ return Response{EResponseCode::NotFound};
+ }
+
+ ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+
+ HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+
+ switch (CurrentState)
+ {
+ case HubInstanceState::Hibernating:
+ return Response{EResponseCode::Accepted};
+ case HubInstanceState::Provisioned:
+ break;
+ case HubInstanceState::Hibernated:
+ return Response{EResponseCode::Completed};
+ default:
+ return Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))};
+ }
+
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance;
+ ZEN_ASSERT(InstanceRaw);
+
+ Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
+
+ // Re-validate state after acquiring the instance lock: WatchDog may have transitioned
+ // Provisioned -> Crashed between our hub-lock read and the LockExclusive call above.
+
+ HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+ if (ActualState != HubInstanceState::Provisioned)
+ {
+ Instance = {};
+ return Response{
+ EResponseCode::Rejected,
+ fmt::format("Module '{}' state changed to '{}' before hibernate could proceed", ModuleId, ToString(ActualState))};
+ }
+ }
+
+ // NOTE: done while not holding the hub lock, to avoid blocking other operations.
+ // Any concurrent caller that acquired the hub lock and saw Provisioned will now block on
+ // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below
+ // will have already changed the state and the re-validate above will reject it.
+
+ ZEN_ASSERT(Instance);
+ ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
+
+ HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Hibernating);
+ const uint16_t Port = Instance.GetBasePort();
+ NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Hibernating, Port, {});
+
+ if (m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.AddCount(1);
try
{
- m_ProvisionedModuleCallback(ModuleId, OutInfo);
+ m_WorkerPool->ScheduleWork(
+ [this,
+ ModuleId = std::string(ModuleId),
+ ActiveInstanceIndex,
+ OldState,
+ Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ try
+ {
+ CompleteHibernate(*Instance, ActiveInstanceIndex, OldState);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async hibernate of module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
- catch (const std::exception& Ex)
+ catch (const std::exception& DispatchEx)
{
- ZEN_ERROR("Provision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what());
+ // Dispatch failed: undo latch increment and roll back state.
+ ZEN_ERROR("Failed async dispatch hibernate of module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+
+ NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, OldState, Port, {});
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState);
+ }
+
+ throw;
}
}
+ else
+ {
+ CompleteHibernate(Instance, ActiveInstanceIndex, OldState);
+ }
- return true;
+ return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
}
-bool
-Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
+void
+Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState)
+{
+ const std::string ModuleId(Instance.GetModuleId());
+ const uint16_t Port = Instance.GetBasePort();
+
+ try
+ {
+ Instance.Hibernate();
+ UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Hibernated);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, HubInstanceState::Hibernated, Port, {});
+ Instance = {};
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", ModuleId, Ex.what());
+ UpdateInstanceState(Instance, ActiveInstanceIndex, OldState);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, OldState, Port, {});
+ Instance = {};
+ throw;
+ }
+}
+
+Hub::Response
+Hub::Wake(const std::string& ModuleId)
{
- std::unique_ptr<StorageServerInstance> Instance;
+ ZEN_ASSERT(!m_ShutdownFlag.load());
+
+ StorageServerInstance::ExclusiveLockedPtr Instance;
+ size_t ActiveInstanceIndex = (size_t)-1;
{
RwLock::ExclusiveLockScope _(m_Lock);
- if (auto It = m_ProvisioningModules.find(ModuleId); It != m_ProvisioningModules.end())
+ auto It = m_InstanceLookup.find(ModuleId);
+ if (It == m_InstanceLookup.end())
{
- OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId);
+ return Response{EResponseCode::NotFound};
+ }
- ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId);
+ ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- return false;
- }
+ HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load();
- if (auto It = m_Instances.find(ModuleId); It == m_Instances.end())
+ switch (CurrentState)
{
- ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId);
-
- // Not found, OutReason should be empty
- return false;
+ case HubInstanceState::Waking:
+ return Response{EResponseCode::Accepted};
+ case HubInstanceState::Hibernated:
+ break;
+ case HubInstanceState::Provisioned:
+ return Response{EResponseCode::Completed};
+ default:
+ return Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))};
}
- else
+
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance;
+ ZEN_ASSERT(InstanceRaw);
+
+ Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
+
+ // Re-validate state after acquiring the instance lock: a concurrent Wake or Deprovision may
+ // have transitioned Hibernated -> something else between our hub-lock read and LockExclusive.
+ HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+ if (ActualState != HubInstanceState::Hibernated)
{
- Instance = std::move(It->second);
- m_Instances.erase(It);
- m_DeprovisioningModules.emplace(ModuleId);
+ Instance = {};
+ return Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' state changed to '{}' before wake could proceed", ModuleId, ToString(ActualState))};
}
}
- uint16_t BasePort = Instance->GetBasePort();
- std::string BaseUri; // TODO?
+ // NOTE: done while not holding the hub lock, to avoid blocking other operations.
+ // Any concurrent caller that acquired the hub lock and saw Hibernated will now block on
+ // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below
+ // will have already changed the state and the re-validate above will reject it.
+
+ ZEN_ASSERT(Instance);
+ ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
- if (m_DeprovisionedModuleCallback)
+ HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Waking);
+ const uint16_t Port = Instance.GetBasePort();
+ NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Waking, Port, {});
+
+ if (m_WorkerPool)
{
+ m_BackgroundWorkLatch.AddCount(1);
try
{
- m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort});
+ m_WorkerPool->ScheduleWork(
+ [this,
+ ModuleId = std::string(ModuleId),
+ ActiveInstanceIndex,
+ OldState,
+ Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ try
+ {
+ CompleteWake(*Instance, ActiveInstanceIndex, OldState);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async wake of module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
- catch (const std::exception& Ex)
+ catch (const std::exception& DispatchEx)
{
- ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what());
+ // Dispatch failed: undo latch increment and roll back state.
+ ZEN_ERROR("Failed async dispatch wake of module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+
+ NotifyStateUpdate(ModuleId, HubInstanceState::Waking, OldState, Port, {});
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState);
+ }
+
+ throw;
}
}
+ else
+ {
+ CompleteWake(Instance, ActiveInstanceIndex, OldState);
+ }
- // The module is deprovisioned outside the lock to avoid blocking other operations.
- //
- // To ensure that no new provisioning can occur while we're deprovisioning,
- // we add the module ID to m_DeprovisioningModules and remove it once
- // deprovisioning is complete.
+ return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+}
- auto _ = MakeGuard([&] {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_DeprovisioningModules.erase(ModuleId);
- m_FreePorts.push_back(BasePort);
- });
+void
+Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState)
+{
+ const std::string ModuleId(Instance.GetModuleId());
+ const uint16_t Port = Instance.GetBasePort();
- Instance->Deprovision();
+ try
+ {
+ Instance.Wake();
+ UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Waking, HubInstanceState::Provisioned, Port, {});
+ Instance = {};
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", ModuleId, Ex.what());
+ UpdateInstanceState(Instance, ActiveInstanceIndex, OldState);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Waking, OldState, Port, {});
+ Instance = {};
+ throw;
+ }
+}
- return true;
+void
+Hub::RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, std::string_view ModuleId)
+{
+ Instance = {};
+
+ std::unique_ptr<StorageServerInstance> DeleteInstance;
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ auto It = m_InstanceLookup.find(std::string(ModuleId));
+ ZEN_ASSERT_SLOW(It != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(It->second == ActiveInstanceIndex);
+ DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(It);
+ UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned);
+ }
+ DeleteInstance.reset();
+}
+
+void
+Hub::ObliterateBackendData(std::string_view ModuleId)
+{
+ std::filesystem::path ServerStateDir = m_RunEnvironment.GetChildBaseDir() / ModuleId;
+ std::filesystem::path TempDir = m_HydrationTempPath / ModuleId;
+
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir,
+ .TempDir = TempDir,
+ .ModuleId = std::string(ModuleId),
+ .TargetSpecification = m_HydrationTargetSpecification,
+ .Options = m_HydrationOptions};
+ if (m_Config.OptionalHydrationWorkerPool)
+ {
+ Config.Threading.emplace(HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalHydrationWorkerPool,
+ .AbortFlag = &AbortFlag,
+ .PauseFlag = &PauseFlag});
+ }
+
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Obliterate();
}
bool
-Hub::Find(std::string_view ModuleId, StorageServerInstance** OutInstance)
+Hub::Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo)
{
RwLock::SharedLockScope _(m_Lock);
- if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end())
+ if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end())
{
- if (OutInstance)
+ if (OutInstanceInfo)
{
- *OutInstance = It->second.get();
+ const size_t ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+ const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance;
+ ZEN_ASSERT(Instance);
+ InstanceInfo Info{m_ActiveInstances[ActiveInstanceIndex].State.load(),
+ m_ActiveInstances[ActiveInstanceIndex].StateChangeTime.load()};
+ Info.Metrics = m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Load();
+ Info.Port = Instance->GetBasePort();
+
+ *OutInstanceInfo = Info;
}
return true;
}
- else if (OutInstance)
- {
- *OutInstance = nullptr;
- }
return false;
}
void
-Hub::EnumerateModules(std::function<void(StorageServerInstance&)> Callback)
+Hub::EnumerateModules(std::function<void(std::string_view ModuleId, const InstanceInfo&)> Callback)
{
- RwLock::SharedLockScope _(m_Lock);
- for (auto& It : m_Instances)
+ std::vector<std::pair<std::string, InstanceInfo>> Infos;
{
- Callback(*It.second);
+ RwLock::SharedLockScope _(m_Lock);
+ for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup)
+ {
+ const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance;
+ ZEN_ASSERT(Instance);
+ InstanceInfo Info{m_ActiveInstances[ActiveInstanceIndex].State.load(),
+ m_ActiveInstances[ActiveInstanceIndex].StateChangeTime.load()};
+ Info.Metrics = m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Load();
+ Info.Port = Instance->GetBasePort();
+
+ Infos.push_back(std::make_pair(std::string(Instance->GetModuleId()), Info));
+ }
+ }
+
+ for (const std::pair<std::string, InstanceInfo>& Info : Infos)
+ {
+ Callback(Info.first, Info.second);
}
}
int
Hub::GetInstanceCount()
{
- RwLock::SharedLockScope _(m_Lock);
- return gsl::narrow_cast<int>(m_Instances.size());
+ return m_Lock.WithSharedLock([this]() { return gsl::narrow_cast<int>(m_InstanceLookup.size()); });
}
-void
-Hub::UpdateCapacityMetrics()
+bool
+Hub::CanProvisionInstanceLocked(std::string_view ModuleId, std::string& OutReason)
{
- m_HostMetrics = GetSystemMetrics();
+ if (m_ObliteratingInstances.contains(std::string(ModuleId)))
+ {
+ OutReason = fmt::format("module '{}' is being obliterated", ModuleId);
+ return false;
+ }
+
+ if (m_FreeActiveInstanceIndexes.empty())
+ {
+ OutReason = fmt::format("instance limit ({}) exceeded", m_Config.InstanceLimit);
+
+ return false;
+ }
+
+ const uint64_t DiskUsedBytes = m_DiskSpace.Free <= m_DiskSpace.Total ? m_DiskSpace.Total - m_DiskSpace.Free : 0;
+ if (m_Config.ResourceLimits.DiskUsageBytes > 0 && DiskUsedBytes > m_Config.ResourceLimits.DiskUsageBytes)
+ {
+ OutReason =
+ fmt::format("disk usage ({}) exceeds ({})", NiceBytes(DiskUsedBytes), NiceBytes(m_Config.ResourceLimits.DiskUsageBytes));
+ return false;
+ }
- // Update per-instance metrics
+ const uint64_t RamUsedMiB = m_SystemMetrics.AvailSystemMemoryMiB <= m_SystemMetrics.SystemMemoryMiB
+ ? m_SystemMetrics.SystemMemoryMiB - m_SystemMetrics.AvailSystemMemoryMiB
+ : 0;
+ const uint64_t RamUsedBytes = RamUsedMiB * 1024 * 1024;
+ if (m_Config.ResourceLimits.MemoryUsageBytes > 0 && RamUsedBytes > m_Config.ResourceLimits.MemoryUsageBytes)
+ {
+ OutReason =
+ fmt::format("ram usage ({}) exceeds ({})", NiceBytes(RamUsedBytes), NiceBytes(m_Config.ResourceLimits.MemoryUsageBytes));
+ return false;
+ }
+
+ return true;
}
-void
-Hub::UpdateStats()
+uint16_t
+Hub::GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const
{
- m_Lock.WithSharedLock([this] { m_MaxInstanceCount = Max(m_MaxInstanceCount, gsl::narrow_cast<int>(m_Instances.size())); });
+ return gsl::narrow<uint16_t>(m_Config.BasePortNumber + ActiveInstanceIndex);
}
bool
-Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
+Hub::IsInstancePort(uint16_t Port) const
{
- if (m_DeprovisioningModules.find(std::string(ModuleId)) != m_DeprovisioningModules.end())
+ if (Port < m_Config.BasePortNumber)
{
- OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId);
+ return false;
+ }
+ size_t Index = Port - m_Config.BasePortNumber;
+ if (Index >= m_ActiveInstances.size())
+ {
+ return false;
+ }
+ return m_ActiveInstances[Index].State.load(std::memory_order_relaxed) != HubInstanceState::Unprovisioned;
+}
+HubInstanceState
+Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState)
+{
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+ ZEN_ASSERT_SLOW([](HubInstanceState From, HubInstanceState To) {
+ switch (From)
+ {
+ case HubInstanceState::Unprovisioned:
+ return To == HubInstanceState::Provisioning;
+ case HubInstanceState::Provisioned:
+ return To == HubInstanceState::Hibernating || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Crashed ||
+ To == HubInstanceState::Obliterating;
+ case HubInstanceState::Hibernated:
+ return To == HubInstanceState::Waking || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Obliterating;
+ case HubInstanceState::Crashed:
+ return To == HubInstanceState::Provisioning || To == HubInstanceState::Deprovisioning ||
+ To == HubInstanceState::Recovering || To == HubInstanceState::Obliterating;
+ case HubInstanceState::Provisioning:
+ return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned || To == HubInstanceState::Crashed;
+ case HubInstanceState::Hibernating:
+ return To == HubInstanceState::Hibernated || To == HubInstanceState::Provisioned;
+ case HubInstanceState::Waking:
+ return To == HubInstanceState::Provisioned || To == HubInstanceState::Hibernated;
+ case HubInstanceState::Deprovisioning:
+ return To == HubInstanceState::Unprovisioned || To == HubInstanceState::Provisioned || To == HubInstanceState::Hibernated ||
+ To == HubInstanceState::Crashed;
+ case HubInstanceState::Recovering:
+ return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned;
+ case HubInstanceState::Obliterating:
+ return To == HubInstanceState::Unprovisioned || To == HubInstanceState::Crashed;
+ }
return false;
+ }(m_ActiveInstances[ActiveInstanceIndex].State.load(), NewState));
+ const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
+ m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.store(0);
+ m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(Now);
+ m_ActiveInstances[ActiveInstanceIndex].StateChangeTime.store(Now);
+ return m_ActiveInstances[ActiveInstanceIndex].State.exchange(NewState);
+}
+
+void
+Hub::AttemptRecoverInstance(std::string_view ModuleId)
+{
+ StorageServerInstance::ExclusiveLockedPtr Instance;
+ size_t ActiveInstanceIndex = (size_t)-1;
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (m_ShutdownFlag.load())
+ {
+ return;
+ }
+
+ auto It = m_InstanceLookup.find(std::string(ModuleId));
+ if (It == m_InstanceLookup.end())
+ {
+ return;
+ }
+
+ ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance;
+ ZEN_ASSERT(InstanceRaw);
+ HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+ if (CurrentState != HubInstanceState::Crashed)
+ {
+ return;
+ }
+
+ Instance = m_ActiveInstances[ActiveInstanceIndex].Instance->LockExclusive(/*Wait*/ false);
+ if (!Instance)
+ {
+ // Instance lock is held by another operation; the watchdog will retry on the next cycle if the state is still Crashed.
+ return;
+ }
+
+ ZEN_ASSERT(!Instance.IsRunning());
+
+ (void)UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Recovering);
}
- if (m_ProvisioningModules.find(std::string(ModuleId)) != m_ProvisioningModules.end())
+ ZEN_ASSERT(Instance);
+ ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1);
+ ZEN_ASSERT_SLOW(m_ActiveInstances[ActiveInstanceIndex].State.load() == HubInstanceState::Recovering);
+
+ NotifyStateUpdate(ModuleId, HubInstanceState::Crashed, HubInstanceState::Recovering, Instance.GetBasePort(), /*BaseUri*/ {});
+
+ // Dehydrate before trying to recover so any salvageable data is preserved.
+ try
{
- OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId);
+ Instance.Deprovision();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what());
+ NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Unprovisioned, Instance.GetBasePort(), /*BaseUri*/ {});
+ Instance = {};
+ std::unique_ptr<StorageServerInstance> DestroyInstance;
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ auto It = m_InstanceLookup.find(std::string(ModuleId));
+ ZEN_ASSERT_SLOW(It != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(ActiveInstanceIndex == It->second);
+
+ DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(It);
+ (void)UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned);
+ }
+ DestroyInstance.reset();
+ return;
+ }
- return false;
+ try
+ {
+ Instance.Provision();
+ UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Provisioned, Instance.GetBasePort(), /*BaseUri*/ {});
+ Instance = {};
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to reprovision instance for module '{}' during crash recovery reprovision: {}", ModuleId, Ex.what());
+ NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Unprovisioned, Instance.GetBasePort(), /*BaseUri*/ {});
+ Instance = {};
+ std::unique_ptr<StorageServerInstance> DestroyInstance;
+ {
+ RwLock::ExclusiveLockScope HubLock(m_Lock);
+ auto It = m_InstanceLookup.find(std::string(ModuleId));
+ ZEN_ASSERT_SLOW(It != m_InstanceLookup.end());
+ ZEN_ASSERT_SLOW(ActiveInstanceIndex == It->second);
+
+ DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(It);
+ (void)UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned);
+ }
+ DestroyInstance.reset();
+ return;
}
+}
- if (gsl::narrow_cast<int>(m_Instances.size()) >= m_Config.InstanceLimit)
+bool
+Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient,
+ StorageServerInstance::SharedLockedPtr&& LockedInstance,
+ size_t ActiveInstanceIndex)
+{
+ const std::string ModuleId(LockedInstance.GetModuleId());
+
+ HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+ if (LockedInstance.IsRunning())
{
- OutReason = fmt::format("instance limit ({}) exceeded", m_Config.InstanceLimit);
+ m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Store(LockedInstance.GetProcessMetrics());
+ if (InstanceState == HubInstanceState::Provisioned)
+ {
+ const uint16_t Port = LockedInstance.GetBasePort();
+ const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load();
+ const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load();
+
+ const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
+
+ // We do the activity check without holding a lock to the instance
+ LockedInstance = {};
+
+ uint64_t ActivitySum = PreviousActivitySum;
+
+ std::chrono::system_clock::time_point NextCheckTime =
+ LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout - m_Config.WatchDog.InactivityCheckMargin;
+ if (Now >= NextCheckTime)
+ {
+ ActivityCheckClient.SetBaseUri(fmt::format("http://localhost:{}", Port));
+ HttpClient::Response Result =
+ ActivityCheckClient.Get("/stats/activity_counters", HttpClient::Accept(HttpContentType::kCbObject));
+ if (Result.IsSuccess())
+ {
+ CbObject Response = Result.AsObject();
+ if (Response)
+ {
+ ActivitySum = Response["sum"].AsUInt64();
+ }
+ }
+ }
+
+ if (ActivitySum != PreviousActivitySum)
+ {
+ m_Lock.WithSharedLock([this, InstanceState, PreviousActivitySum, &LastActivityTime, ActivitySum, Now, ModuleId]() {
+ if (auto It = m_InstanceLookup.find(ModuleId); It != m_InstanceLookup.end())
+ {
+ const uint64_t ActiveInstanceIndex = It->second;
+ ActiveInstance& Instance = m_ActiveInstances[ActiveInstanceIndex];
+
+ HubInstanceState CurrentState = Instance.State.load();
+ if (CurrentState == InstanceState)
+ {
+ if (Instance.LastActivityTime.load() == LastActivityTime &&
+ Instance.LastKnownActivitySum.load() == PreviousActivitySum)
+ {
+ Instance.LastActivityTime.store(Now);
+ Instance.LastKnownActivitySum.store(ActivitySum);
+ }
+ }
+ }
+ });
+ }
+ else if (LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout < Now)
+ {
+ ZEN_INFO("Instance {} has not been active for {}, attempting deprovision...",
+ ModuleId,
+ NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count()));
+ (void)InternalDeprovision(
+ ModuleId,
+ [ModuleId, InstanceState, LastActivityTime, PreviousActivitySum](ActiveInstance& Instance) -> bool {
+ HubInstanceState CurrentState = Instance.State.load();
+ if (CurrentState != InstanceState)
+ {
+ ZEN_INFO("Instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState));
+ return false;
+ }
+ if (Instance.LastActivityTime.load() != LastActivityTime ||
+ Instance.LastKnownActivitySum.load() != PreviousActivitySum)
+ {
+ ZEN_INFO("Instance {} idle deprovision aborted due to activity", ModuleId);
+ return false;
+ }
+ return true;
+ });
+ }
+ }
+
+ return true;
+ }
+ else if (InstanceState == HubInstanceState::Provisioned)
+ {
+ // Process is not running but state says it should be - instance died unexpectedly.
+ const uint16_t Port = LockedInstance.GetBasePort();
+ UpdateInstanceState(LockedInstance, ActiveInstanceIndex, HubInstanceState::Crashed);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {});
+ LockedInstance = {};
return false;
}
+ else if (InstanceState == HubInstanceState::Hibernated)
+ {
+ // Process is not running - no HTTP activity check is possible.
+ // Use a pure time-based check; the margin window does not apply here.
+ const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load();
+ const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load();
+ const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
+ LockedInstance = {};
+
+ if (LastActivityTime + m_Config.WatchDog.HibernatedInactivityTimeout < Now)
+ {
+ ZEN_INFO("Hibernated instance {} has not been active for {}, attempting deprovision...",
+ ModuleId,
+ NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count()));
+ (void)InternalDeprovision(
+ ModuleId,
+ [ModuleId, InstanceState, LastActivityTime, PreviousActivitySum](ActiveInstance& Instance) -> bool {
+ HubInstanceState CurrentState = Instance.State.load();
+ if (CurrentState != InstanceState)
+ {
+ ZEN_INFO("Hibernated instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState));
+ return false;
+ }
+ if (Instance.LastActivityTime.load() != LastActivityTime || Instance.LastKnownActivitySum.load() != PreviousActivitySum)
+ {
+ ZEN_INFO("Hibernated instance {} idle deprovision aborted due to activity", ModuleId);
+ return false;
+ }
+ return true;
+ });
+ }
+ return true;
+ }
+ else
+ {
+ // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering, Obliterating) - expected, skip.
+ // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance
+ // lock was busy on a previous cycle and recovery is already pending.
+ return true;
+ }
+}
- // Since deprovisioning happens outside the lock and we don't add the port back until the instance is full shut down we might be under
- // the instance limit but all ports may be in use
- if (m_FreePorts.empty())
+void
+Hub::UpdateMachineMetrics()
+{
+ try
{
- OutReason = fmt::format("no free ports available, deprovisioning of instances might be in flight ({})",
- m_Config.InstanceLimit - m_Instances.size());
+ bool DiskSpaceOk = false;
+ DiskSpace Disk;
- return false;
+ std::filesystem::path ChildDir = m_RunEnvironment.GetChildBaseDir();
+ if (!ChildDir.empty())
+ {
+ if (DiskSpaceInfo(ChildDir, Disk))
+ {
+ DiskSpaceOk = true;
+ }
+ else
+ {
+ ZEN_WARN("Failed to query disk space for '{}'; disk-based provisioning limits will not be enforced", ChildDir);
+ }
+ }
+
+ SystemMetrics Metrics = GetSystemMetrics();
+
+ m_Lock.WithExclusiveLock([&]() {
+ if (DiskSpaceOk)
+ {
+ m_DiskSpace = Disk;
+ }
+ m_SystemMetrics = Metrics;
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed to update machine metrics. Reason: {}", Ex.what());
}
+}
- // TODO: handle additional resource metrics
+void
+Hub::WatchDog()
+{
+ const uint64_t CycleIntervalMs = std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.CycleInterval).count();
+ const uint64_t CycleProcessingBudgetMs =
+ std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.CycleProcessingBudget).count();
+ const uint64_t InstanceCheckThrottleMs =
+ std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.InstanceCheckThrottle).count();
+
+ HttpClient ActivityCheckClient("http://localhost",
+ HttpClientSettings{.ConnectTimeout = m_Config.WatchDog.ActivityCheckConnectTimeout,
+ .Timeout = m_Config.WatchDog.ActivityCheckRequestTimeout},
+ [&]() -> bool { return m_WatchDogEvent.Wait(0); });
+
+ size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0
+ while (!m_ShutdownFlag.load() && !m_WatchDogEvent.Wait(gsl::narrow<int>(CycleIntervalMs)))
+ {
+ try
+ {
+ UpdateMachineMetrics();
- return true;
+ // Snapshot slot count. We iterate all slots (including freed nulls) so
+ // round-robin coverage is not skewed by deprovisioned entries.
+ size_t SlotsRemaining = m_Lock.WithSharedLock([this]() { return m_ActiveInstances.size(); });
+
+ Stopwatch Timer;
+ bool ShuttingDown = m_ShutdownFlag.load();
+ while (SlotsRemaining > 0 && Timer.GetElapsedTimeMs() < CycleProcessingBudgetMs && !ShuttingDown)
+ {
+ StorageServerInstance::SharedLockedPtr LockedInstance;
+ m_Lock.WithSharedLock([this, &CheckInstanceIndex, &LockedInstance, &SlotsRemaining]() {
+ // Advance through null (freed) slots under a single lock acquisition.
+ while (SlotsRemaining > 0)
+ {
+ SlotsRemaining--;
+ CheckInstanceIndex++;
+ if (CheckInstanceIndex >= m_ActiveInstances.size())
+ {
+ CheckInstanceIndex = 0;
+ }
+ StorageServerInstance* Instance = m_ActiveInstances[CheckInstanceIndex].Instance.get();
+ if (Instance)
+ {
+ LockedInstance = Instance->LockShared(/*Wait*/ false);
+ break; // Found a live slot (locked or busy); stop scanning this batch.
+ }
+ }
+ });
+
+ if (!LockedInstance)
+ {
+ // Either all remaining slots were null, or the live slot's lock was busy -- move on.
+ continue;
+ }
+
+ std::string ModuleId(LockedInstance.GetModuleId());
+
+ try
+ {
+ bool InstanceIsOk = CheckInstanceStatus(ActivityCheckClient, std::move(LockedInstance), CheckInstanceIndex);
+ if (InstanceIsOk)
+ {
+ ShuttingDown = m_WatchDogEvent.Wait(gsl::narrow<int>(InstanceCheckThrottleMs));
+ }
+ else
+ {
+ ZEN_WARN("Instance for module '{}' is not running, attempting recovery", ModuleId);
+ AttemptRecoverInstance(ModuleId);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed to check status of module {}. Reason: {}", ModuleId, Ex.what());
+ }
+ ShuttingDown |= m_ShutdownFlag.load();
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ // TODO: Catch specific errors such as asserts, OOM, OOD, system_error etc
+ ZEN_ERROR("Hub watchdog threw exception: {}", Ex.what());
+ }
+ }
+}
+
+void
+Hub::NotifyStateUpdate(std::string_view ModuleId,
+ HubInstanceState OldState,
+ HubInstanceState NewState,
+ uint16_t BasePort,
+ std::string_view BaseUri)
+{
+ if (m_ModuleStateChangeCallback && OldState != NewState)
+ {
+ try
+ {
+ m_ModuleStateChangeCallback(ModuleId,
+ HubProvisionedInstanceInfo{.BaseUri = std::string(BaseUri), .Port = BasePort},
+ OldState,
+ NewState);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Module state change callback for module '{}' failed. Reason: '{}'", ModuleId, Ex.what());
+ }
+ }
}
#if ZEN_WITH_TESTS
TEST_SUITE_BEGIN("server.hub");
+static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::milliseconds(200)};
+
namespace hub_testutils {
+ struct TestHubPools
+ {
+ WorkerThreadPool ProvisionPool;
+ WorkerThreadPool HydrationPool;
+
+ explicit TestHubPools(int ThreadCount) : ProvisionPool(ThreadCount, "hub_test_prov"), HydrationPool(ThreadCount, "hub_test_hydr") {}
+ };
+
ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir)
{
return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir);
}
- std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir,
- Hub::Configuration Config = {},
- Hub::ProvisionModuleCallbackFunc ProvisionCallback = {},
- Hub::ProvisionModuleCallbackFunc DeprovisionCallback = {})
+ std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir,
+ Hub::Configuration Config = {},
+ Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {},
+ TestHubPools* Pools = nullptr)
+ {
+ if (Pools)
+ {
+ Config.OptionalProvisionWorkerPool = &Pools->ProvisionPool;
+ Config.OptionalHydrationWorkerPool = &Pools->HydrationPool;
+ }
+ return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback));
+ }
+
+ struct CallbackRecord
+ {
+ std::string ModuleId;
+ uint16_t Port;
+ };
+
+ struct StateChangeCapture
+ {
+ RwLock CallbackMutex;
+ std::vector<CallbackRecord> ProvisionCallbacks;
+ std::vector<CallbackRecord> DeprovisionCallbacks;
+
+ auto CaptureFunc()
+ {
+ return [this](std::string_view ModuleId,
+ const HubProvisionedInstanceInfo& Info,
+ HubInstanceState PreviousState,
+ HubInstanceState NewState) {
+ ZEN_UNUSED(PreviousState);
+ if (NewState == HubInstanceState::Provisioned)
+ {
+ CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); });
+ }
+ else if (NewState == HubInstanceState::Unprovisioned)
+ {
+ CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); });
+ }
+ };
+ }
+ };
+
+ // Poll until Find() returns false for the given module (i.e. async deprovision completes).
+ static bool WaitForInstanceGone(Hub& HubInstance,
+ std::string_view ModuleId,
+ std::chrono::milliseconds PollInterval = std::chrono::milliseconds(50),
+ std::chrono::seconds Timeout = std::chrono::seconds(30))
+ {
+ const auto Deadline = std::chrono::steady_clock::now() + Timeout;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (!HubInstance.Find(ModuleId))
+ {
+ return true;
+ }
+ std::this_thread::sleep_for(PollInterval);
+ }
+ return !HubInstance.Find(ModuleId);
+ }
+
+ // Poll until GetInstanceCount() reaches ExpectedCount (i.e. all async deprovisions complete).
+ static bool WaitForInstanceCount(Hub& HubInstance,
+ int ExpectedCount,
+ std::chrono::milliseconds PollInterval = std::chrono::milliseconds(50),
+ std::chrono::seconds Timeout = std::chrono::seconds(30))
{
- return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(ProvisionCallback), std::move(DeprovisionCallback));
+ const auto Deadline = std::chrono::steady_clock::now() + Timeout;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (HubInstance.GetInstanceCount() == ExpectedCount)
+ {
+ return true;
+ }
+ std::this_thread::sleep_for(PollInterval);
+ }
+ return HubInstance.GetInstanceCount() == ExpectedCount;
}
} // namespace hub_testutils
-TEST_CASE("hub.provision_basic")
+TEST_CASE("hub.provision")
{
ScopedTemporaryDirectory TempDir;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
+
+ struct TransitionRecord
+ {
+ HubInstanceState OldState;
+ HubInstanceState NewState;
+ };
+ RwLock CaptureMutex;
+ std::vector<TransitionRecord> Transitions;
+
+ hub_testutils::StateChangeCapture CaptureInstance;
+
+ auto CaptureFunc =
+ [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) {
+ CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); });
+ CaptureInstance.CaptureFunc()(ModuleId, Info, OldState, NewState);
+ };
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc));
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
CHECK_FALSE(HubInstance->Find("module_a"));
+ // Provision
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
+ const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info);
+ REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
CHECK_NE(Info.Port, 0);
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
- CHECK(HubInstance->Find("module_a"));
+ Hub::InstanceInfo InstanceInfo;
+ REQUIRE(HubInstance->Find("module_a", &InstanceInfo));
+ CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned);
+ CHECK_NE(InstanceInfo.StateChangeTime, std::chrono::system_clock::time_point::min());
+ CHECK_LE(InstanceInfo.StateChangeTime, std::chrono::system_clock::now());
+
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+ }
- const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason);
- CHECK(DeprovisionResult);
+ // Verify provision callback
+ {
+ RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
+ REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u);
+ CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "module_a");
+ CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port);
+ }
+
+ // Deprovision
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
CHECK_FALSE(HubInstance->Find("module_a"));
+
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
+ }
+
+ // Verify deprovision callback
+ {
+ RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
+ REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u);
+ CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "module_a");
+ CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port);
+ }
+
+ // Verify full transition sequence
+ {
+ RwLock::SharedLockScope _(CaptureMutex);
+ REQUIRE_EQ(Transitions.size(), 4u);
+ CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned);
+ CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning);
+ CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning);
+ CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned);
+ CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned);
+ CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning);
+ CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning);
+ CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned);
+ }
}
TEST_CASE("hub.provision_config")
@@ -527,69 +1980,32 @@ TEST_CASE("hub.provision_config")
CHECK_FALSE(HubInstance->Find("module_a"));
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
+ const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info);
+ REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
CHECK_NE(Info.Port, 0);
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
- CHECK(HubInstance->Find("module_a"));
+ Hub::InstanceInfo InstanceInfo;
+ REQUIRE(HubInstance->Find("module_a", &InstanceInfo));
+ CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned);
HttpClient Client(fmt::format("http://127.0.0.1:{}{}", Info.Port, Info.BaseUri));
HttpClient::Response TestResponse = Client.Get("/status/builds");
CHECK(TestResponse.IsSuccess());
CHECK(TestResponse.AsObject()["ok"].AsBool());
- const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason);
- CHECK(DeprovisionResult);
- CHECK_EQ(HubInstance->GetInstanceCount(), 0);
- CHECK_FALSE(HubInstance->Find("module_a"));
-}
-
-TEST_CASE("hub.provision_callbacks")
-{
- ScopedTemporaryDirectory TempDir;
-
- struct CallbackRecord
- {
- std::string ModuleId;
- uint16_t Port;
- };
- RwLock CallbackMutex;
- std::vector<CallbackRecord> ProvisionRecords;
- std::vector<CallbackRecord> DeprovisionRecords;
-
- auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) {
- CallbackMutex.WithExclusiveLock([&]() { ProvisionRecords.push_back({std::string(ModuleId), Info.Port}); });
- };
- auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) {
- CallbackMutex.WithExclusiveLock([&]() { DeprovisionRecords.push_back({std::string(ModuleId), Info.Port}); });
- };
-
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(ProvisionCb), std::move(DeprovisionCb));
-
- HubProvisionedInstanceInfo Info;
- std::string Reason;
-
- const bool ProvisionResult = HubInstance->Provision("cb_module", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
-
{
- RwLock::SharedLockScope _(CallbackMutex);
- REQUIRE_EQ(ProvisionRecords.size(), 1u);
- CHECK_EQ(ProvisionRecords[0].ModuleId, "cb_module");
- CHECK_EQ(ProvisionRecords[0].Port, Info.Port);
- CHECK_NE(ProvisionRecords[0].Port, 0);
+ HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
}
- const bool DeprovisionResult = HubInstance->Deprovision("cb_module", Reason);
- CHECK(DeprovisionResult);
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+ CHECK_FALSE(HubInstance->Find("module_a"));
{
- RwLock::SharedLockScope _(CallbackMutex);
- REQUIRE_EQ(DeprovisionRecords.size(), 1u);
- CHECK_EQ(DeprovisionRecords[0].ModuleId, "cb_module");
- CHECK_NE(DeprovisionRecords[0].Port, 0);
- CHECK_EQ(ProvisionRecords.size(), 1u);
+ HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
}
}
@@ -603,69 +2019,28 @@ TEST_CASE("hub.instance_limit")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool FirstResult = HubInstance->Provision("limit_a", Info, Reason);
- REQUIRE_MESSAGE(FirstResult, Reason);
+ const Hub::Response FirstResult = HubInstance->Provision("limit_a", Info);
+ REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Completed, FirstResult.Message);
- const bool SecondResult = HubInstance->Provision("limit_b", Info, Reason);
- REQUIRE_MESSAGE(SecondResult, Reason);
+ const Hub::Response SecondResult = HubInstance->Provision("limit_b", Info);
+ REQUIRE_MESSAGE(SecondResult.ResponseCode == Hub::EResponseCode::Completed, SecondResult.Message);
CHECK_EQ(HubInstance->GetInstanceCount(), 2);
- Reason.clear();
- const bool ThirdResult = HubInstance->Provision("limit_c", Info, Reason);
- CHECK_FALSE(ThirdResult);
+ const Hub::Response ThirdResult = HubInstance->Provision("limit_c", Info);
+ CHECK(ThirdResult.ResponseCode == Hub::EResponseCode::Rejected);
CHECK_EQ(HubInstance->GetInstanceCount(), 2);
- CHECK_NE(Reason.find("instance limit"), std::string::npos);
+ CHECK_NE(ThirdResult.Message.find("instance limit"), std::string::npos);
- HubInstance->Deprovision("limit_a", Reason);
+ HubInstance->Deprovision("limit_a");
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
- Reason.clear();
- const bool FourthResult = HubInstance->Provision("limit_d", Info, Reason);
- CHECK_MESSAGE(FourthResult, Reason);
+ const Hub::Response FourthResult = HubInstance->Provision("limit_d", Info);
+ CHECK_MESSAGE(FourthResult.ResponseCode == Hub::EResponseCode::Completed, FourthResult.Message);
CHECK_EQ(HubInstance->GetInstanceCount(), 2);
}
-TEST_CASE("hub.deprovision_nonexistent")
-{
- ScopedTemporaryDirectory TempDir;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
-
- std::string Reason;
- const bool Result = HubInstance->Deprovision("never_provisioned", Reason);
- CHECK_FALSE(Result);
- CHECK(Reason.empty());
- CHECK_EQ(HubInstance->GetInstanceCount(), 0);
-}
-
-TEST_CASE("hub.enumerate_modules")
-{
- ScopedTemporaryDirectory TempDir;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
-
- HubProvisionedInstanceInfo Info;
- std::string Reason;
-
- REQUIRE_MESSAGE(HubInstance->Provision("enum_a", Info, Reason), Reason);
- REQUIRE_MESSAGE(HubInstance->Provision("enum_b", Info, Reason), Reason);
-
- std::vector<std::string> Ids;
- HubInstance->EnumerateModules([&](StorageServerInstance& Instance) { Ids.push_back(std::string(Instance.GetModuleId())); });
- CHECK_EQ(Ids.size(), 2u);
- const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end();
- const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end();
- CHECK(FoundA);
- CHECK(FoundB);
-
- HubInstance->Deprovision("enum_a", Reason);
- Ids.clear();
- HubInstance->EnumerateModules([&](StorageServerInstance& Instance) { Ids.push_back(std::string(Instance.GetModuleId())); });
- REQUIRE_EQ(Ids.size(), 1u);
- CHECK_EQ(Ids[0], "enum_b");
-}
-
-TEST_CASE("hub.max_instance_count")
+TEST_CASE("hub.enumerate_and_instance_tracking")
{
ScopedTemporaryDirectory TempDir;
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
@@ -673,97 +2048,58 @@ TEST_CASE("hub.max_instance_count")
CHECK_EQ(HubInstance->GetMaxInstanceCount(), 0);
HubProvisionedInstanceInfo Info;
- std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision("max_a", Info, Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Provision("track_a", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
CHECK_GE(HubInstance->GetMaxInstanceCount(), 1);
- REQUIRE_MESSAGE(HubInstance->Provision("max_b", Info, Reason), Reason);
- CHECK_GE(HubInstance->GetMaxInstanceCount(), 2);
-
- const int MaxAfterTwo = HubInstance->GetMaxInstanceCount();
-
- HubInstance->Deprovision("max_a", Reason);
- CHECK_EQ(HubInstance->GetInstanceCount(), 1);
- CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo);
-}
-
-TEST_CASE("hub.concurrent")
-{
- ScopedTemporaryDirectory TempDir;
- Hub::Configuration Config;
- Config.BasePortNumber = 22000;
- Config.InstanceLimit = 10;
-
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
-
- constexpr int kHalf = 3;
-
- // Serially pre-provision kHalf modules
- for (int I = 0; I < kHalf; ++I)
{
- HubProvisionedInstanceInfo Info;
- std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason);
+ const Hub::Response R = HubInstance->Provision("track_b", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
}
- CHECK_EQ(HubInstance->GetInstanceCount(), kHalf);
-
- // Simultaneously:
- // Provisioner pool → provisions kHalf new modules ("new_0" .. "new_N")
- // Deprovisioner pool → deprovisions the kHalf pre-provisioned modules ("pre_0" .. "pre_N")
- // The two pools use distinct OS threads, so provisions and deprovisions are interleaved.
-
- // Use int rather than bool to avoid std::vector<bool> bitfield packing,
- // which would cause data races on concurrent per-index writes.
- std::vector<int> ProvisionResults(kHalf, 0);
- std::vector<std::string> ProvisionReasons(kHalf);
- std::vector<int> DeprovisionResults(kHalf, 0);
+ CHECK_GE(HubInstance->GetMaxInstanceCount(), 2);
+ // Enumerate both modules
{
- WorkerThreadPool Provisioners(kHalf, "hub_test_provisioners");
- WorkerThreadPool Deprovisioneers(kHalf, "hub_test_deprovisioneers");
-
- std::vector<std::future<void>> ProvisionFutures(kHalf);
- std::vector<std::future<void>> DeprovisionFutures(kHalf);
+ std::vector<std::string> Ids;
+ int ProvisionedCount = 0;
+ HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
+ Ids.push_back(std::string(ModuleId));
+ if (InstanceInfo.State == HubInstanceState::Provisioned)
+ {
+ ProvisionedCount++;
+ }
+ });
+ CHECK_EQ(Ids.size(), 2u);
+ CHECK_EQ(ProvisionedCount, 2);
+ CHECK(std::find(Ids.begin(), Ids.end(), "track_a") != Ids.end());
+ CHECK(std::find(Ids.begin(), Ids.end(), "track_b") != Ids.end());
+ }
- for (int I = 0; I < kHalf; ++I)
- {
- ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] {
- HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool Result =
- HubInstance->Provision(fmt::format("new_{}", I), Info, Reason);
- ProvisionResults[I] = Result ? 1 : 0;
- ProvisionReasons[I] = Reason;
- }),
- WorkerThreadPool::EMode::EnableBacklog);
-
- DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] {
- std::string Reason;
- const bool Result =
- HubInstance->Deprovision(fmt::format("pre_{}", I), Reason);
- DeprovisionResults[I] = Result ? 1 : 0;
- }),
- WorkerThreadPool::EMode::EnableBacklog);
- }
+ const int MaxAfterTwo = HubInstance->GetMaxInstanceCount();
- for (std::future<void>& F : ProvisionFutures)
- {
- F.get();
- }
- for (std::future<void>& F : DeprovisionFutures)
- {
- F.get();
- }
- }
+ // Deprovision one - max instance count must not decrease
+ HubInstance->Deprovision("track_a");
+ CHECK_EQ(HubInstance->GetInstanceCount(), 1);
+ CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo);
- for (int I = 0; I < kHalf; ++I)
+ // Enumerate after deprovision
{
- CHECK_MESSAGE(ProvisionResults[I] != 0, ProvisionReasons[I]);
- CHECK(DeprovisionResults[I] != 0);
+ std::vector<std::string> Ids;
+ int ProvisionedCount = 0;
+ HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
+ Ids.push_back(std::string(ModuleId));
+ if (InstanceInfo.State == HubInstanceState::Provisioned)
+ {
+ ProvisionedCount++;
+ }
+ });
+ REQUIRE_EQ(Ids.size(), 1u);
+ CHECK_EQ(Ids[0], "track_b");
+ CHECK_EQ(ProvisionedCount, 1);
}
- // Only the newly provisioned modules should remain
- CHECK_EQ(HubInstance->GetInstanceCount(), kHalf);
}
TEST_CASE("hub.concurrent_callbacks")
@@ -773,23 +2109,9 @@ TEST_CASE("hub.concurrent_callbacks")
Config.BasePortNumber = 22300;
Config.InstanceLimit = 10;
- struct CallbackRecord
- {
- std::string ModuleId;
- uint16_t Port;
- };
- RwLock CallbackMutex;
- std::vector<CallbackRecord> ProvisionCallbacks;
- std::vector<CallbackRecord> DeprovisionCallbacks;
-
- auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) {
- CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); });
- };
- auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) {
- CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); });
- };
+ hub_testutils::StateChangeCapture CaptureInstance;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, std::move(ProvisionCb), std::move(DeprovisionCb));
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, CaptureInstance.CaptureFunc());
constexpr int kHalf = 3;
@@ -798,15 +2120,15 @@ TEST_CASE("hub.concurrent_callbacks")
for (int I = 0; I < kHalf; ++I)
{
HubProvisionedInstanceInfo Info;
- std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason);
+ const Hub::Response ProvR = HubInstance->Provision(fmt::format("pre_{}", I), Info);
+ REQUIRE_MESSAGE(ProvR.ResponseCode == Hub::EResponseCode::Completed, ProvR.Message);
}
CHECK_EQ(HubInstance->GetInstanceCount(), kHalf);
{
- RwLock::ExclusiveLockScope _(CallbackMutex);
- REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf));
- ProvisionCallbacks.clear();
+ RwLock::ExclusiveLockScope _(CaptureInstance.CallbackMutex);
+ REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), static_cast<size_t>(kHalf));
+ CaptureInstance.ProvisionCallbacks.clear();
}
// Concurrently provision kHalf new modules while deprovisioning the pre-provisioned ones.
@@ -823,23 +2145,21 @@ TEST_CASE("hub.concurrent_callbacks")
for (int I = 0; I < kHalf; ++I)
{
- ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] {
- HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool Result =
- HubInstance->Provision(fmt::format("new_{}", I), Info, Reason);
- ProvisionResults[I] = Result ? 1 : 0;
- ProvisionReasons[I] = Reason;
- }),
- WorkerThreadPool::EMode::EnableBacklog);
-
- DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] {
- std::string Reason;
- const bool Result =
- HubInstance->Deprovision(fmt::format("pre_{}", I), Reason);
- DeprovisionResults[I] = Result ? 1 : 0;
- }),
- WorkerThreadPool::EMode::EnableBacklog);
+ ProvisionFutures[I] =
+ Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] {
+ HubProvisionedInstanceInfo Info;
+ const Hub::Response Result = HubInstance->Provision(fmt::format("new_{}", I), Info);
+ ProvisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0;
+ ProvisionReasons[I] = Result.Message;
+ }),
+ WorkerThreadPool::EMode::EnableBacklog);
+
+ DeprovisionFutures[I] =
+ Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] {
+ const Hub::Response Result = HubInstance->Deprovision(fmt::format("pre_{}", I));
+ DeprovisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0;
+ }),
+ WorkerThreadPool::EMode::EnableBacklog);
}
for (std::future<void>& F : ProvisionFutures)
@@ -863,17 +2183,17 @@ TEST_CASE("hub.concurrent_callbacks")
// Each new_* module must have triggered exactly one provision callback with a non-zero port.
// Each pre_* module must have triggered exactly one deprovision callback with a non-zero port.
{
- RwLock::SharedLockScope _(CallbackMutex);
- REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf));
- REQUIRE_EQ(DeprovisionCallbacks.size(), static_cast<size_t>(kHalf));
+ RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
+ REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), static_cast<size_t>(kHalf));
+ REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), static_cast<size_t>(kHalf));
- for (const CallbackRecord& Record : ProvisionCallbacks)
+ for (const hub_testutils::CallbackRecord& Record : CaptureInstance.ProvisionCallbacks)
{
CHECK_NE(Record.Port, 0);
const bool IsNewModule = Record.ModuleId.rfind("new_", 0) == 0;
CHECK_MESSAGE(IsNewModule, Record.ModuleId);
}
- for (const CallbackRecord& Record : DeprovisionCallbacks)
+ for (const hub_testutils::CallbackRecord& Record : CaptureInstance.DeprovisionCallbacks)
{
CHECK_NE(Record.Port, 0);
const bool IsPreModule = Record.ModuleId.rfind("pre_", 0) == 0;
@@ -894,14 +2214,13 @@ TEST_CASE("hub.job_object")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool ProvisionResult = HubInstance->Provision("jobobj_a", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
+ const Hub::Response ProvisionResult = HubInstance->Provision("jobobj_a", Info);
+ REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
CHECK_NE(Info.Port, 0);
- const bool DeprovisionResult = HubInstance->Deprovision("jobobj_a", Reason);
- CHECK(DeprovisionResult);
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("jobobj_a");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
}
@@ -914,22 +2233,639 @@ TEST_CASE("hub.job_object")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool ProvisionResult = HubInstance->Provision("nojobobj_a", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
+ const Hub::Response ProvisionResult = HubInstance->Provision("nojobobj_a", Info);
+ REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
CHECK_NE(Info.Port, 0);
- const bool DeprovisionResult = HubInstance->Deprovision("nojobobj_a", Reason);
- CHECK(DeprovisionResult);
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("nojobobj_a");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
}
}
# endif // ZEN_PLATFORM_WINDOWS
+TEST_CASE("hub.hibernate_wake_obliterate")
+{
+ ScopedTemporaryDirectory TempDir;
+ Hub::Configuration Config;
+ Config.BasePortNumber = 22600;
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
+
+ HubProvisionedInstanceInfo ProvInfo;
+ Hub::InstanceInfo Info;
+
+ // Error cases on non-existent modules (no provision needed)
+ CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+ CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+ CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+
+ // Provision
+ {
+ const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ REQUIRE(HubInstance->Find("hib_a", &Info));
+ CHECK_EQ(Info.State, HubInstanceState::Provisioned);
+ const std::chrono::system_clock::time_point ProvisionedTime = Info.StateChangeTime;
+ CHECK_NE(ProvisionedTime, std::chrono::system_clock::time_point::min());
+ CHECK_LE(ProvisionedTime, std::chrono::system_clock::now());
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+ }
+
+ // Double-wake on provisioned module is idempotent
+ CHECK(HubInstance->Wake("hib_a").ResponseCode == Hub::EResponseCode::Completed);
+
+ // Hibernate
+ {
+ const Hub::Response R = HubInstance->Hibernate("hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ REQUIRE(HubInstance->Find("hib_a", &Info));
+ CHECK_EQ(Info.State, HubInstanceState::Hibernated);
+ const std::chrono::system_clock::time_point HibernatedTime = Info.StateChangeTime;
+ CHECK_GE(HibernatedTime, ProvisionedTime);
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
+ }
+
+ // Double-hibernate on already-hibernated module is idempotent
+ CHECK(HubInstance->Hibernate("hib_a").ResponseCode == Hub::EResponseCode::Completed);
+
+ // Wake
+ {
+ const Hub::Response R = HubInstance->Wake("hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ REQUIRE(HubInstance->Find("hib_a", &Info));
+ CHECK_EQ(Info.State, HubInstanceState::Provisioned);
+ CHECK_GE(Info.StateChangeTime, HibernatedTime);
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+ }
+
+ // Hibernate again for obliterate-from-hibernated test
+ {
+ const Hub::Response R = HubInstance->Hibernate("hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ REQUIRE(HubInstance->Find("hib_a", &Info));
+ CHECK_EQ(Info.State, HubInstanceState::Hibernated);
+
+ // Obliterate from hibernated
+ {
+ const Hub::Response R = HubInstance->Obliterate("hib_a");
+ CHECK(R.ResponseCode == Hub::EResponseCode::Completed);
+ }
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+ CHECK_FALSE(HubInstance->Find("hib_a"));
+
+ // Re-provision for obliterate-from-provisioned test
+ {
+ const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ REQUIRE(HubInstance->Find("hib_a", &Info));
+ CHECK_EQ(Info.State, HubInstanceState::Provisioned);
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+ }
+
+ // Obliterate from provisioned
+ {
+ const Hub::Response R = HubInstance->Obliterate("hib_a");
+ CHECK(R.ResponseCode == Hub::EResponseCode::Completed);
+ }
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+ CHECK_FALSE(HubInstance->Find("hib_a"));
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
+ }
+
+ // Obliterate deprovisioned module (not tracked by hub, backend data may exist)
+ {
+ const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ {
+ const Hub::Response R = HubInstance->Deprovision("hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ CHECK_FALSE(HubInstance->Find("hib_a"));
+ {
+ const Hub::Response R = HubInstance->Obliterate("hib_a");
+ CHECK(R.ResponseCode == Hub::EResponseCode::Completed);
+ }
+
+ // Obliterate of a never-provisioned module also succeeds (no-op backend cleanup)
+ CHECK(HubInstance->Obliterate("never_existed").ResponseCode == Hub::EResponseCode::Completed);
+}
+
+TEST_CASE("hub.async_hibernate_wake")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ Hub::Configuration Config;
+ Config.BasePortNumber = 23000;
+
+ hub_testutils::TestHubPools Pools(2);
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools);
+
+ HubProvisionedInstanceInfo ProvInfo;
+ Hub::InstanceInfo Info;
+
+ constexpr auto kPollInterval = std::chrono::milliseconds(50);
+ constexpr auto kTimeout = std::chrono::seconds(30);
+
+ // Provision and wait until Provisioned
+ {
+ const Hub::Response R = HubInstance->Provision("async_hib_a", ProvInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message);
+ }
+ {
+ const auto Deadline = std::chrono::steady_clock::now() + kTimeout;
+ bool Ready = false;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned)
+ {
+ Ready = true;
+ break;
+ }
+ std::this_thread::sleep_for(kPollInterval);
+ }
+ REQUIRE_MESSAGE(Ready, "Instance did not reach Provisioned state within timeout");
+ }
+
+ // Hibernate asynchronously and poll until Hibernated
+ {
+ const Hub::Response R = HubInstance->Hibernate("async_hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message);
+ }
+ {
+ const auto Deadline = std::chrono::steady_clock::now() + kTimeout;
+ bool Hibernated = false;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Hibernated)
+ {
+ Hibernated = true;
+ break;
+ }
+ std::this_thread::sleep_for(kPollInterval);
+ }
+ REQUIRE_MESSAGE(Hibernated, "Instance did not reach Hibernated state within timeout");
+ }
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
+ }
+
+ // Wake asynchronously and poll until Provisioned
+ {
+ const Hub::Response R = HubInstance->Wake("async_hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message);
+ }
+ {
+ const auto Deadline = std::chrono::steady_clock::now() + kTimeout;
+ bool Woken = false;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned)
+ {
+ Woken = true;
+ break;
+ }
+ std::this_thread::sleep_for(kPollInterval);
+ }
+ REQUIRE_MESSAGE(Woken, "Instance did not reach Provisioned state after wake within timeout");
+ }
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+ }
+
+ // Deprovision asynchronously and poll until the instance is gone
+ {
+ const Hub::Response R = HubInstance->Deprovision("async_hib_a");
+ CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message);
+ }
+ REQUIRE_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "async_hib_a"), "Instance did not deprovision within timeout");
+}
+
+TEST_CASE("hub.recover_process_crash")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ struct TransitionRecord
+ {
+ HubInstanceState OldState;
+ HubInstanceState NewState;
+ };
+ RwLock CaptureMutex;
+ std::vector<TransitionRecord> Transitions;
+ auto CaptureFunc = [&](std::string_view, const HubProvisionedInstanceInfo&, HubInstanceState OldState, HubInstanceState NewState) {
+ CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); });
+ };
+
+ // Fast watchdog cycle so crash detection is near-instant instead of waiting up to the 3s default.
+ Hub::Configuration Config;
+ Config.WatchDog.CycleInterval = std::chrono::milliseconds(10);
+ Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1);
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, std::move(CaptureFunc));
+
+ HubProvisionedInstanceInfo Info;
+ {
+ const Hub::Response R = HubInstance->Provision("module_a", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+
+ // Kill the child process to simulate a crash, then poll until the watchdog detects it,
+ // recovers the instance, and the new process is serving requests.
+ HubInstance->TerminateModuleForTesting("module_a");
+
+ constexpr auto kPollIntervalMs = std::chrono::milliseconds(50);
+ constexpr auto kTimeoutMs = std::chrono::seconds(15);
+ const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs;
+
+ // A successful HTTP health check on the same port confirms the new process is up.
+ HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ bool Recovered = false;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ std::this_thread::sleep_for(kPollIntervalMs);
+ Hub::InstanceInfo InstanceInfo;
+ if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned &&
+ ModClient.Get("/health/"))
+ {
+ CHECK_EQ(InstanceInfo.Port, Info.Port);
+ Recovered = true;
+ break;
+ }
+ }
+ REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout");
+
+ // Verify the full crash/recovery callback sequence
+ {
+ RwLock::SharedLockScope _(CaptureMutex);
+ REQUIRE_GE(Transitions.size(), 3u);
+ const auto CrashedIt = std::find_if(Transitions.begin(), Transitions.end(), [](const TransitionRecord& R) {
+ return R.OldState == HubInstanceState::Provisioned && R.NewState == HubInstanceState::Crashed;
+ });
+ REQUIRE_NE(CrashedIt, Transitions.end());
+ const auto RecoveringIt = CrashedIt + 1;
+ REQUIRE_NE(RecoveringIt, Transitions.end());
+ CHECK_EQ(RecoveringIt->OldState, HubInstanceState::Crashed);
+ CHECK_EQ(RecoveringIt->NewState, HubInstanceState::Recovering);
+ const auto RecoveredIt = RecoveringIt + 1;
+ REQUIRE_NE(RecoveredIt, Transitions.end());
+ CHECK_EQ(RecoveredIt->OldState, HubInstanceState::Recovering);
+ CHECK_EQ(RecoveredIt->NewState, HubInstanceState::Provisioned);
+ }
+
+ // After recovery, deprovision should succeed and a re-provision should work.
+ {
+ const Hub::Response R = HubInstance->Deprovision("module_a");
+ CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+
+ HubProvisionedInstanceInfo NewInfo;
+ {
+ const Hub::Response R = HubInstance->Provision("module_a", NewInfo);
+ CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ CHECK_NE(NewInfo.Port, 0);
+ HttpClient NewClient(fmt::format("http://localhost:{}", NewInfo.Port), kFastTimeout);
+ CHECK_MESSAGE(NewClient.Get("/health/"), "Re-provisioned instance is not serving requests");
+}
+
+TEST_CASE("hub.async_provision_concurrent")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ constexpr int kModuleCount = 8;
+
+ Hub::Configuration Config;
+ Config.BasePortNumber = 22800;
+ Config.InstanceLimit = kModuleCount;
+
+ hub_testutils::TestHubPools Pools(4);
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools);
+
+ std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount);
+ std::vector<std::string> Reasons(kModuleCount);
+ std::vector<int> Results(kModuleCount, 0);
+
+ {
+ WorkerThreadPool Callers(kModuleCount, "hub_async_callers");
+ std::vector<std::future<void>> Futures(kModuleCount);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ Futures[I] = Callers.EnqueueTask(std::packaged_task<void()>([&, I] {
+ const Hub::Response Resp = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]);
+ Results[I] = (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? 1 : 0;
+ Reasons[I] = Resp.Message;
+ }),
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ for (std::future<void>& F : Futures)
+ {
+ F.get();
+ }
+ }
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ REQUIRE_MESSAGE(Results[I] != 0, Reasons[I]);
+ CHECK_NE(Infos[I].Port, 0);
+ }
+
+ // Poll until all instances reach Provisioned state
+ constexpr auto kPollInterval = std::chrono::milliseconds(50);
+ constexpr auto kTimeout = std::chrono::seconds(30);
+ const auto Deadline = std::chrono::steady_clock::now() + kTimeout;
+
+ bool AllProvisioned = false;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ int ProvisionedCount = 0;
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ Hub::InstanceInfo InstanceInfo;
+ if (HubInstance->Find(fmt::format("async_c{}", I), &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned)
+ {
+ ++ProvisionedCount;
+ }
+ }
+ if (ProvisionedCount == kModuleCount)
+ {
+ AllProvisioned = true;
+ break;
+ }
+ std::this_thread::sleep_for(kPollInterval);
+ }
+ CHECK_MESSAGE(AllProvisioned, "Not all instances reached Provisioned state within timeout");
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout);
+ CHECK_MESSAGE(ModClient.Get("/health/"), fmt::format("async_c{} not serving requests", I));
+ }
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ const Hub::Response DepResp = HubInstance->Deprovision(fmt::format("async_c{}", I));
+ CHECK_MESSAGE(DepResp.ResponseCode == Hub::EResponseCode::Accepted, DepResp.Message);
+ }
+ REQUIRE_MESSAGE(hub_testutils::WaitForInstanceCount(*HubInstance, 0), "Not all instances deprovisioned within timeout");
+}
+
+TEST_CASE("hub.async_provision_shutdown_waits")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ constexpr int kModuleCount = 8;
+
+ Hub::Configuration Config;
+ Config.InstanceLimit = kModuleCount;
+ Config.BasePortNumber = 22900;
+
+ hub_testutils::TestHubPools Pools(2);
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools);
+
+ std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ const Hub::Response ProvResult = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]);
+ REQUIRE_MESSAGE(ProvResult.ResponseCode == Hub::EResponseCode::Accepted, ProvResult.Message);
+ REQUIRE_NE(Infos[I].Port, 0);
+ }
+
+ // Shut down without polling for Provisioned; Shutdown() must drain the latch and clean up.
+ HubInstance->Shutdown();
+
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout);
+ CHECK_FALSE(ModClient.Get("/health/"));
+ }
+}
+
+TEST_CASE("hub.async_provision_rejected")
+{
+ // Rejection from CanProvisionInstanceLocked fires synchronously even when a WorkerPool is present.
+ ScopedTemporaryDirectory TempDir;
+
+ Hub::Configuration Config;
+ Config.InstanceLimit = 1;
+ Config.BasePortNumber = 23100;
+
+ hub_testutils::TestHubPools Pools(2);
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools);
+
+ HubProvisionedInstanceInfo Info;
+
+ // First provision: dispatched to WorkerPool, returns Accepted
+ const Hub::Response FirstResult = HubInstance->Provision("async_r1", Info);
+ REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Accepted, FirstResult.Message);
+ REQUIRE_NE(Info.Port, 0);
+
+ // Second provision: CanProvisionInstanceLocked rejects synchronously (limit reached), returns Rejected
+ HubProvisionedInstanceInfo Info2;
+ const Hub::Response SecondResult = HubInstance->Provision("async_r2", Info2);
+ CHECK(SecondResult.ResponseCode == Hub::EResponseCode::Rejected);
+ CHECK_FALSE(SecondResult.Message.empty());
+ CHECK_NE(SecondResult.Message.find("instance limit"), std::string::npos);
+ CHECK_EQ(HubInstance->GetInstanceCount(), 1);
+}
+
+TEST_CASE("hub.instance.inactivity.deprovision")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ // Aggressive watchdog settings to keep test duration short.
+ // Provisioned timeout (2s) > Hibernated timeout (1s) - this is the key invariant under test.
+ // Margin (1s) means the HTTP activity check fires at LastActivityTime+1s for Provisioned instances.
+ // The Hibernated branch ignores the margin and uses a direct time-based check.
+ Hub::Configuration Config;
+ Config.BasePortNumber = 23200;
+ Config.InstanceLimit = 3;
+ Config.WatchDog.CycleInterval = std::chrono::milliseconds(10);
+ Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1);
+ Config.WatchDog.ProvisionedInactivityTimeout = std::chrono::seconds(2);
+ Config.WatchDog.HibernatedInactivityTimeout = std::chrono::seconds(1);
+ Config.WatchDog.InactivityCheckMargin = std::chrono::seconds(1);
+ Config.WatchDog.ActivityCheckConnectTimeout = std::chrono::milliseconds(200);
+ Config.WatchDog.ActivityCheckRequestTimeout = std::chrono::milliseconds(500);
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
+
+ // Provision in order: idle first, idle_hib second (then hibernate), persistent last.
+ // idle_hib uses the shorter Hibernated timeout (1s) and expires before idle (2s provisioned).
+ // persistent gets real HTTP PUTs so its activity timer is reset; it must still be alive
+ // after both idle instances are gone.
+
+ HubProvisionedInstanceInfo IdleInfo;
+ {
+ const Hub::Response R = HubInstance->Provision("idle", IdleInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+
+ HubProvisionedInstanceInfo IdleHibInfo;
+ {
+ const Hub::Response R = HubInstance->Provision("idle_hib", IdleHibInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ const Hub::Response H = HubInstance->Hibernate("idle_hib");
+ REQUIRE_MESSAGE(H.ResponseCode == Hub::EResponseCode::Completed, H.Message);
+ }
+
+ HubProvisionedInstanceInfo PersistentInfo;
+ {
+ const Hub::Response R = HubInstance->Provision("persistent", PersistentInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+
+ auto PokeInstance = [&](uint16_t Port) {
+ // Make a real storage request to increment the instance's activity sum.
+ // The watchdog detects the changed sum on the next cycle and resets LastActivityTime.
+ {
+ HttpClient PersistentClient(fmt::format("http://localhost:{}", Port),
+ HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200)});
+ uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() -
+ std::chrono::steady_clock::time_point::min())
+ .count();
+ IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick));
+ const HttpClient::Response PutResult =
+ PersistentClient.Put(fmt::format("/z$/ns1/b/{}", Key),
+ IoBufferBuilder::MakeFromMemory(MakeMemoryView(std::string_view("keepalive"))));
+ CHECK(PutResult);
+ }
+ };
+
+ PokeInstance(IdleInfo.Port);
+ PokeInstance(PersistentInfo.Port);
+
+ Sleep(100);
+
+ // Phase 1: immediately after setup all three instances must still be alive.
+ // No timeout has elapsed yet (only 100ms have passed).
+ CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 4s provisioned timeout has not elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("idle_hib"), "idle_hib was deprovisioned within 100ms - its 1s hibernated timeout has not elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("persistent"),
+ "persistent was deprovisioned within 100ms - its 4s provisioned timeout has not elapsed");
+
+ // Phase 2: idle_hib must be deprovisioned by the watchdog within its 1s hibernated timeout.
+ // idle must remain alive - its 2s provisioned timeout has not elapsed yet.
+ CHECK_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "idle_hib", std::chrono::milliseconds(100), std::chrono::seconds(3)),
+ "idle_hib was not deprovisioned within its 1s hibernated timeout");
+
+ CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should be gone after its 1s hibernated timeout elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("idle"),
+ "idle was deprovisioned before its 2s provisioned timeout - only idle_hib's 1s hibernated timeout has elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("persistent"),
+ "persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance");
+
+ PokeInstance(PersistentInfo.Port);
+
+ // Phase 3: idle must be deprovisioned by the watchdog within its 2s provisioned timeout.
+ // persistent must remain alive - its activity timer was reset by PokeInstance.
+ CHECK_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "idle", std::chrono::milliseconds(100), std::chrono::seconds(4)),
+ "idle was not deprovisioned within its 2s provisioned timeout");
+
+ CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should still be gone - it was deprovisioned in phase 2");
+
+ CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 4s provisioned timeout elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("persistent"),
+ "persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance");
+
+ HubInstance->Shutdown();
+}
+
+TEST_CASE("hub.machine_metrics")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {});
+
+ // UpdateMachineMetrics() is called synchronously in the Hub constructor, so metrics
+ // are available immediately without waiting for a watchdog cycle.
+ SystemMetrics SysMetrics;
+ DiskSpace Disk;
+ HubInstance->GetMachineMetrics(SysMetrics, Disk);
+
+ CHECK_GT(Disk.Total, 0u);
+ CHECK_LE(Disk.Free, Disk.Total);
+
+ CHECK_GT(SysMetrics.SystemMemoryMiB, 0u);
+ CHECK_LE(SysMetrics.AvailSystemMemoryMiB, SysMetrics.SystemMemoryMiB);
+
+ CHECK_GT(SysMetrics.VirtualMemoryMiB, 0u);
+ CHECK_LE(SysMetrics.AvailVirtualMemoryMiB, SysMetrics.VirtualMemoryMiB);
+}
+
+TEST_CASE("hub.provision_rejected_resource_limits")
+{
+ // The Hub constructor calls UpdateMachineMetrics() synchronously, so CanProvisionInstanceLocked
+ // can enforce limits immediately without waiting for a watchdog cycle.
+ ScopedTemporaryDirectory TempDir;
+
+ {
+ Hub::Configuration Config;
+ Config.ResourceLimits.DiskUsageBytes = 1;
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
+ HubProvisionedInstanceInfo Info;
+ const Hub::Response Result = HubInstance->Provision("disk_limit", Info);
+ CHECK(Result.ResponseCode == Hub::EResponseCode::Rejected);
+ CHECK_NE(Result.Message.find("disk usage"), std::string::npos);
+ }
+
+ {
+ Hub::Configuration Config;
+ Config.ResourceLimits.MemoryUsageBytes = 1;
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
+ HubProvisionedInstanceInfo Info;
+ const Hub::Response Result = HubInstance->Provision("mem_limit", Info);
+ CHECK(Result.ResponseCode == Hub::EResponseCode::Rejected);
+ CHECK_NE(Result.Message.find("ram usage"), std::string::npos);
+ }
+}
+
TEST_SUITE_END();
void
+Hub::TerminateModuleForTesting(const std::string& ModuleId)
+{
+ RwLock::SharedLockScope _(m_Lock);
+ auto It = m_InstanceLookup.find(ModuleId);
+ if (It == m_InstanceLookup.end())
+ {
+ return;
+ }
+ StorageServerInstance::SharedLockedPtr Locked = m_ActiveInstances[It->second].Instance->LockShared(/*Wait*/ true);
+ if (Locked)
+ {
+ Locked.TerminateForTesting();
+ }
+}
+
+void
hub_forcelink()
{
}