diff options
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 2770 |
1 files changed, 2353 insertions, 417 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index 2f3873884..0cb25fdd6 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -9,6 +9,9 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpclient.h> ZEN_THIRD_PARTY_INCLUDES_START #include <EASTL/fixed_vector.h> @@ -16,11 +19,8 @@ ZEN_THIRD_PARTY_INCLUDES_START ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS -# include <zencore/filesystem.h> # include <zencore/testing.h> # include <zencore/testutils.h> -# include <zencore/workthreadpool.h> -# include <zenhttp/httpclient.h> #endif #include <numeric> @@ -121,29 +121,90 @@ private: ////////////////////////////////////////////////////////////////////////// -Hub::Hub(const Configuration& Config, - ZenServerEnvironment&& RunEnvironment, - ProvisionModuleCallbackFunc&& ProvisionedModuleCallback, - ProvisionModuleCallbackFunc&& DeprovisionedModuleCallback) +ProcessMetrics +Hub::AtomicProcessMetrics::Load() const +{ + return { + .MemoryBytes = MemoryBytes.load(), + .KernelTimeMs = KernelTimeMs.load(), + .UserTimeMs = UserTimeMs.load(), + .WorkingSetSize = WorkingSetSize.load(), + .PeakWorkingSetSize = PeakWorkingSetSize.load(), + .PagefileUsage = PagefileUsage.load(), + .PeakPagefileUsage = PeakPagefileUsage.load(), + }; +} + +void +Hub::AtomicProcessMetrics::Store(const ProcessMetrics& Metrics) +{ + MemoryBytes.store(Metrics.MemoryBytes); + KernelTimeMs.store(Metrics.KernelTimeMs); + UserTimeMs.store(Metrics.UserTimeMs); + WorkingSetSize.store(Metrics.WorkingSetSize); + PeakWorkingSetSize.store(Metrics.PeakWorkingSetSize); + PagefileUsage.store(Metrics.PagefileUsage); + PeakPagefileUsage.store(Metrics.PeakPagefileUsage); +} + +void +Hub::AtomicProcessMetrics::Reset() +{ + MemoryBytes.store(0); + KernelTimeMs.store(0); + UserTimeMs.store(0); + WorkingSetSize.store(0); + PeakWorkingSetSize.store(0); + PagefileUsage.store(0); + PeakPagefileUsage.store(0); +} + +void +Hub::GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace) const +{ + m_Lock.WithSharedLock([&]() { + OutSystemMetrict = m_SystemMetrics; + OutDiskSpace = m_DiskSpace; + }); +} + +////////////////////////////////////////////////////////////////////////// + +Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) : m_Config(Config) , m_RunEnvironment(std::move(RunEnvironment)) -, m_ProvisionedModuleCallback(std::move(ProvisionedModuleCallback)) -, m_DeprovisionedModuleCallback(std::move(DeprovisionedModuleCallback)) +, m_WorkerPool(Config.OptionalProvisionWorkerPool) +, m_BackgroundWorkLatch(1) +, m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback)) +, m_ActiveInstances(Config.InstanceLimit) +, m_FreeActiveInstanceIndexes(Config.InstanceLimit) { - m_HostMetrics = GetSystemMetrics(); - m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; - m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024; + ZEN_ASSERT_FORMAT( + Config.OptionalProvisionWorkerPool != Config.OptionalHydrationWorkerPool || Config.OptionalProvisionWorkerPool == nullptr, + "Provision and hydration worker pools must be distinct to avoid deadlocks"); - m_FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); - ZEN_INFO("using file hydration path: '{}'", m_FileHydrationPath); + if (!m_Config.HydrationTargetSpecification.empty()) + { + m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification; + } + else if (!m_Config.HydrationOptions) + { + std::filesystem::path FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); + ZEN_INFO("using file hydration path: '{}'", FileHydrationPath); + m_HydrationTargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native())); + } + else + { + m_HydrationOptions = m_Config.HydrationOptions; + } m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp"); ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath); ZEN_ASSERT(uint64_t(Config.BasePortNumber) + Config.InstanceLimit <= std::numeric_limits<uint16_t>::max()); - m_FreePorts.resize(Config.InstanceLimit); - std::iota(m_FreePorts.begin(), m_FreePorts.end(), Config.BasePortNumber); + m_InstanceLookup.reserve(Config.InstanceLimit); + std::iota(m_FreeActiveInstanceIndexes.begin(), m_FreeActiveInstanceIndexes.end(), 0); #if ZEN_PLATFORM_WINDOWS if (m_Config.UseJobObject) @@ -159,351 +220,1743 @@ Hub::Hub(const Configuration& Config, } } #endif + + UpdateMachineMetrics(); + + m_WatchDog = std::thread([this]() { WatchDog(); }); } Hub::~Hub() { try { - ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); + // Safety call - should normally be properly Shutdown by owner + if (!m_ShutdownFlag.load()) + { + Shutdown(); + } + } + catch (const std::exception& e) + { + ZEN_WARN("Exception during hub service shutdown: {}", e.what()); + } +} + +void +Hub::Shutdown() +{ + ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); + + bool Expected = false; + bool WaitForBackgroundWork = m_ShutdownFlag.compare_exchange_strong(Expected, true); + + m_WatchDogEvent.Set(); + if (m_WatchDog.joinable()) + { + m_WatchDog.join(); + } + + m_WatchDog = {}; + + if (WaitForBackgroundWork && m_WorkerPool) + { + m_BackgroundWorkLatch.CountDown(); + m_BackgroundWorkLatch.Wait(); + // Shutdown flag is set and all background work is drained, safe to shut down remaining instances + + m_BackgroundWorkLatch.Reset(1); + } - m_Lock.WithExclusiveLock([this] { - for (auto& [ModuleId, Instance] : m_Instances) + EnumerateModules([&](std::string_view ModuleId, const InstanceInfo& Info) { + ZEN_UNUSED(Info); + try + { + const Response DepResp = InternalDeprovision(std::string(ModuleId), [](ActiveInstance& Instance) { + ZEN_UNUSED(Instance); + return true; + }); + if (DepResp.ResponseCode != EResponseCode::Completed && DepResp.ResponseCode != EResponseCode::Accepted) { - uint16_t BasePort = Instance->GetBasePort(); - std::string BaseUri; // TODO? + ZEN_WARN("Deprovision instance for module '{}' during hub shutdown rejected: {}", ModuleId, DepResp.Message); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what()); + } + }); - if (m_DeprovisionedModuleCallback) + if (WaitForBackgroundWork && m_WorkerPool) + { + m_BackgroundWorkLatch.CountDown(); + m_BackgroundWorkLatch.Wait(); + } +} + +Hub::Response +Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) +{ + ZEN_ASSERT(!m_ShutdownFlag.load()); + StorageServerInstance::ExclusiveLockedPtr Instance; + bool IsNewInstance = false; + size_t ActiveInstanceIndex = (size_t)-1; + HubInstanceState OldState = HubInstanceState::Unprovisioned; + { + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end()) + { + std::string Reason; + if (!CanProvisionInstanceLocked(ModuleId, /* out */ Reason)) + { + ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason); + + return Response{EResponseCode::Rejected, Reason}; + } + + IsNewInstance = true; + + ActiveInstanceIndex = m_FreeActiveInstanceIndexes.front(); + m_FreeActiveInstanceIndexes.pop_front(); + ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); + + try + { + auto NewInstance = std::make_unique<StorageServerInstance>( + m_RunEnvironment, + StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), + .StateDir = m_RunEnvironment.CreateChildDir(ModuleId), + .TempDir = m_HydrationTempPath / ModuleId, + .HydrationTargetSpecification = m_HydrationTargetSpecification, + .HydrationOptions = m_HydrationOptions, + .HttpThreadCount = m_Config.InstanceHttpThreadCount, + .CoreLimit = m_Config.InstanceCoreLimit, + .ConfigPath = m_Config.InstanceConfigPath, + .Malloc = m_Config.InstanceMalloc, + .Trace = m_Config.InstanceTrace, + .TraceHost = m_Config.InstanceTraceHost, + .TraceFile = m_Config.InstanceTraceFile, + .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool}, + ModuleId); + +#if ZEN_PLATFORM_WINDOWS + if (m_JobObject.IsValid()) + { + NewInstance->SetJobObject(&m_JobObject); + } +#endif + + Instance = NewInstance->LockExclusive(/*Wait*/ true); + + m_ActiveInstances[ActiveInstanceIndex].Instance = std::move(NewInstance); + m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Reset(); + m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex); + // Set Provisioning while both hub lock and instance lock are held so that any + // concurrent Deprovision sees the in-flight state, not Unprovisioned. + OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioning); + } + catch (const std::exception&) + { + Instance = {}; + m_ActiveInstances[ActiveInstanceIndex].Instance.reset(); + m_ActiveInstances[ActiveInstanceIndex].State.store(HubInstanceState::Unprovisioned); + m_InstanceLookup.erase(std::string(ModuleId)); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + throw; + } + + OutInfo.Port = GetInstanceIndexAssignedPort(ActiveInstanceIndex); + + ZEN_INFO("Created new storage server instance for module '{}'", ModuleId); + + const int CurrentInstanceCount = gsl::narrow_cast<int>(m_InstanceLookup.size()); + int CurrentMaxCount = m_MaxInstanceCount.load(); + const int NewMax = Max(CurrentMaxCount, CurrentInstanceCount); + + m_MaxInstanceCount.compare_exchange_weak(CurrentMaxCount, NewMax); + } + else + { + ActiveInstanceIndex = It->second; + ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); + + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(InstanceRaw); + + OutInfo.Port = InstanceRaw->GetBasePort(); + + switch (CurrentState) + { + case HubInstanceState::Provisioning: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Crashed: + case HubInstanceState::Unprovisioned: + break; + case HubInstanceState::Provisioned: + return Response{EResponseCode::Completed}; + case HubInstanceState::Hibernated: + _.ReleaseNow(); + return Wake(std::string(ModuleId)); + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; + } + + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); + + // Re-validate state after acquiring the instance lock: a concurrent Provision may have + // completed between our hub-lock read and LockExclusive, transitioning the state away + // from Crashed/Unprovisioned. + HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (ActualState != HubInstanceState::Crashed && ActualState != HubInstanceState::Unprovisioned) + { + Instance = {}; + if (ActualState == HubInstanceState::Provisioned) + { + return Response{EResponseCode::Completed}; + } + if (ActualState == HubInstanceState::Provisioning) { + return Response{EResponseCode::Accepted}; + } + return Response{ + EResponseCode::Rejected, + fmt::format("Module '{}' state changed to '{}' before provision could proceed", ModuleId, ToString(ActualState))}; + } + // Set Provisioning while both hub lock and instance lock are held so that any + // concurrent Deprovision sees the in-flight state, not Crashed/Unprovisioned. + OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioning); + } + } + + // NOTE: done while not holding the hub lock, to avoid blocking other operations. + // Both hub-lock paths above set OldState and updated the state to Provisioning before + // releasing the hub lock, so concurrent operations already see the in-flight state. + + ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); + + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Provisioning, OutInfo.Port, {}); + + if (m_WorkerPool) + { + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, + ModuleId = std::string(ModuleId), + ActiveInstanceIndex, + OldState, + IsNewInstance, + Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort}); + CompleteProvision(*Instance, ActiveInstanceIndex, OldState, IsNewInstance); } catch (const std::exception& Ex) { - ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); + ZEN_ERROR("Failed async provision of module '{}': {}", ModuleId, Ex.what()); } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + // Dispatch failed: undo latch increment and roll back state. + ZEN_ERROR("Failed async dispatch provision of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + + // dispatch failed before the lambda ran, so ActiveInstance::State is still Provisioning + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, OldState, OutInfo.Port, {}); + + std::unique_ptr<StorageServerInstance> DestroyInstance; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + if (IsNewInstance) + { + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(std::string(ModuleId)); } - Instance->Deprovision(); + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); } - m_Instances.clear(); - }); + DestroyInstance.reset(); + + throw; + } } - catch (const std::exception& e) + else { - ZEN_WARN("Exception during hub service shutdown: {}", e.what()); + CompleteProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance); } + + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } -bool -Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason) +void +Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState OldState, + bool IsNewInstance) { - StorageServerInstance* Instance = nullptr; - bool IsNewInstance = false; + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); + std::string BaseUri; // TODO? + + if (m_ShutdownFlag.load() == false) + { + try + { + switch (OldState) + { + case HubInstanceState::Crashed: + case HubInstanceState::Unprovisioned: + Instance.Provision(); + break; + case HubInstanceState::Hibernated: + ZEN_ASSERT(false); // unreachable: Provision redirects Hibernated->Wake before setting Provisioning + break; + default: + ZEN_ASSERT(false); + } + UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Provisioned, Port, BaseUri); + Instance = {}; + return; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); + // Instance will be notified and removed below. + } + } + + if (IsNewInstance) + { + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Unprovisioned, Port, {}); + Instance = {}; + std::unique_ptr<StorageServerInstance> DestroyInstance; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(std::string(ModuleId)); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); + } + DestroyInstance.reset(); + } + else + { + // OldState = Crashed: restore without cleanup (instance stays in lookup) + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, OldState, Port, {}); + UpdateInstanceState(Instance, ActiveInstanceIndex, OldState); + Instance = {}; + } +} + +Hub::Response +Hub::Deprovision(const std::string& ModuleId) +{ + ZEN_ASSERT(!m_ShutdownFlag.load()); + return InternalDeprovision(ModuleId, [](ActiveInstance& Instance) { + ZEN_UNUSED(Instance); + return true; + }); +} + +Hub::Response +Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate) +{ + StorageServerInstance::ExclusiveLockedPtr Instance; + size_t ActiveInstanceIndex = (size_t)-1; { RwLock::ExclusiveLockScope _(m_Lock); - uint16_t AllocatedPort = 0; - auto RestoreAllocatedPort = MakeGuard([this, &AllocatedPort]() { - if (AllocatedPort != 0) - { - m_FreePorts.push_back(AllocatedPort); - AllocatedPort = 0; - } - }); - if (auto It = m_Instances.find(std::string(ModuleId)); It == m_Instances.end()) + if (auto It = m_InstanceLookup.find(ModuleId); It == m_InstanceLookup.end()) { - std::string Reason; - if (!CanProvisionInstance(ModuleId, /* out */ Reason)) + ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); + + return Response{EResponseCode::NotFound}; + } + else + { + ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + + if (!DeprovisionGate(m_ActiveInstances[ActiveInstanceIndex])) { - ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason); + return Response{EResponseCode::Rejected, fmt::format("Module '{}' deprovision denied by gate", ModuleId)}; + } - OutReason = Reason; + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); - return false; + switch (CurrentState) + { + case HubInstanceState::Deprovisioning: + case HubInstanceState::Obliterating: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Crashed: + case HubInstanceState::Hibernated: + case HubInstanceState::Provisioned: + break; + case HubInstanceState::Unprovisioned: + return Response{EResponseCode::Completed}; + case HubInstanceState::Recovering: + // Recovering is watchdog-managed; reject to avoid interfering with the in-progress + // recovery. The watchdog will transition to Provisioned or Unprovisioned, after + // which deprovision can be retried. + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; } - AllocatedPort = m_FreePorts.front(); - m_FreePorts.pop_front(); + std::unique_ptr<StorageServerInstance>& RawInstance = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(RawInstance != nullptr); - IsNewInstance = true; - auto NewInstance = std::make_unique<StorageServerInstance>( - m_RunEnvironment, - StorageServerInstance::Configuration{.BasePort = AllocatedPort, - .HydrationTempPath = m_HydrationTempPath, - .FileHydrationPath = m_FileHydrationPath, - .HttpThreadCount = m_Config.InstanceHttpThreadCount, - .CoreLimit = m_Config.InstanceCoreLimit, - .ConfigPath = m_Config.InstanceConfigPath}, - ModuleId); -#if ZEN_PLATFORM_WINDOWS - if (m_JobObject.IsValid()) + Instance = RawInstance->LockExclusive(/*Wait*/ true); + } + } + + // NOTE: done while not holding the hub lock, to avoid blocking other operations. + // The exclusive instance lock acquired above prevents concurrent LockExclusive callers + // from modifying instance state. The state transition to Deprovisioning happens below, + // after the hub lock is released. + + ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); + + HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Deprovisioning); + const uint16_t Port = Instance.GetBasePort(); + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Deprovisioning, Port, {}); + + if (m_WorkerPool) + { + std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr = + std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)); + + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr), OldState]() mutable { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + try + { + CompleteDeprovision(*Instance, ActiveInstanceIndex, OldState); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async deprovision of module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + // Dispatch failed: undo latch increment and roll back state. + ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + + NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, OldState, Port, {}); { - NewInstance->SetJobObject(&m_JobObject); + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); } -#endif - Instance = NewInstance.get(); - m_Instances.emplace(std::string(ModuleId), std::move(NewInstance)); - AllocatedPort = 0; - ZEN_INFO("Created new storage server instance for module '{}'", ModuleId); + throw; } - else + } + else + { + CompleteDeprovision(Instance, ActiveInstanceIndex, OldState); + } + + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; +} + +Hub::Response +Hub::Obliterate(const std::string& ModuleId) +{ + ZEN_ASSERT(!m_ShutdownFlag.load()); + + StorageServerInstance::ExclusiveLockedPtr Instance; + size_t ActiveInstanceIndex = (size_t)-1; + { + RwLock::ExclusiveLockScope Lock(m_Lock); + + if (auto It = m_InstanceLookup.find(ModuleId); It != m_InstanceLookup.end()) { - Instance = It->second.get(); + ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + + switch (CurrentState) + { + case HubInstanceState::Obliterating: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Provisioned: + case HubInstanceState::Hibernated: + case HubInstanceState::Crashed: + break; + case HubInstanceState::Deprovisioning: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is being deprovisioned, retry after completion", ModuleId)}; + case HubInstanceState::Recovering: + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; + case HubInstanceState::Unprovisioned: + return Response{EResponseCode::Completed}; + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; + } + + std::unique_ptr<StorageServerInstance>& RawInstance = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(RawInstance != nullptr); + + Instance = RawInstance->LockExclusive(/*Wait*/ true); } + else + { + // Module not tracked by hub - obliterate backend data directly. + // Covers the deprovisioned case where data was preserved via dehydration. + if (m_ObliteratingInstances.contains(ModuleId)) + { + return Response{EResponseCode::Accepted}; + } + + m_ObliteratingInstances.insert(ModuleId); + Lock.ReleaseNow(); + + if (m_WorkerPool) + { + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, ModuleId = std::string(ModuleId)]() { + auto Guard = MakeGuard([this, ModuleId]() { + m_Lock.WithExclusiveLock([this, ModuleId]() { m_ObliteratingInstances.erase(ModuleId); }); + m_BackgroundWorkLatch.CountDown(); + }); + try + { + ObliterateBackendData(ModuleId); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async obliterate of untracked module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + ZEN_ERROR("Failed to dispatch async obliterate of untracked module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + { + RwLock::ExclusiveLockScope _(m_Lock); + m_ObliteratingInstances.erase(ModuleId); + } + throw; + } + + return Response{EResponseCode::Accepted}; + } + + auto _ = MakeGuard([this, &ModuleId]() { + RwLock::ExclusiveLockScope _(m_Lock); + m_ObliteratingInstances.erase(ModuleId); + }); + + ObliterateBackendData(ModuleId); - m_ProvisioningModules.emplace(std::string(ModuleId)); + return Response{EResponseCode::Completed}; + } } - ZEN_ASSERT(Instance != nullptr); + HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Obliterating); + const uint16_t Port = Instance.GetBasePort(); + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Obliterating, Port, {}); - auto RemoveProvisioningModule = MakeGuard([&] { - RwLock::ExclusiveLockScope _(m_Lock); - m_ProvisioningModules.erase(std::string(ModuleId)); - }); + if (m_WorkerPool) + { + std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr = + std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)); + + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + try + { + CompleteObliterate(*Instance, ActiveInstanceIndex); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async obliterate of module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + ZEN_ERROR("Failed async dispatch obliterate of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); - // NOTE: this is done while not holding the lock, as provisioning may take time - // and we don't want to block other operations. We track which modules are being - // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision - // those modules while in this state. + NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, OldState, Port, {}); + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); + } - UpdateStats(); + throw; + } + } + else + { + CompleteObliterate(Instance, ActiveInstanceIndex); + } + + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; +} + +void +Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex) +{ + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); try { - Instance->Provision(); + Instance.Obliterate(); } catch (const std::exception& Ex) { - ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); - if (IsNewInstance) + ZEN_ERROR("Failed to obliterate storage server instance for module '{}': {}", ModuleId, Ex.what()); + Instance = {}; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Crashed); + } + NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Crashed, Port, {}); + throw; + } + + NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Unprovisioned, Port, {}); + RemoveInstance(Instance, ActiveInstanceIndex, ModuleId); +} + +void +Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) +{ + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); + + try + { + if (OldState == HubInstanceState::Provisioned) { - // Clean up - RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end()) + ZEN_INFO("Triggering GC for module {}", ModuleId); + + HttpClient GcClient(fmt::format("http://localhost:{}", Port)); + + HttpClient::KeyValueMap Params; + Params.Entries.insert({"smallobjects", "true"}); + Params.Entries.insert({"skipcid", "false"}); + HttpClient::Response Response = GcClient.Post("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject), Params); + Stopwatch Timer; + while (Response && Timer.GetElapsedTimeMs() < 5000) { - ZEN_ASSERT(It->second != nullptr); - uint16_t BasePort = It->second->GetBasePort(); - m_FreePorts.push_back(BasePort); - m_Instances.erase(It); + Response = GcClient.Get("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject)); + if (Response) + { + bool Complete = Response.AsObject()["Status"].AsString() != "Running"; + if (Complete) + { + break; + } + Sleep(50); + } } } - return false; + Instance.Deprovision(); } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to deprovision storage server instance for module '{}': {}", ModuleId, Ex.what()); + // Effectively unreachable: Shutdown() never throws and Dehydrate() failures are swallowed + // by DeprovisionLocked. Kept as a safety net; if somehow reached, transition to Crashed + // so the watchdog can attempt recovery. + Instance = {}; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Crashed); + } + NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Crashed, Port, {}); + throw; + } + + NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Unprovisioned, Port, {}); + RemoveInstance(Instance, ActiveInstanceIndex, ModuleId); +} + +Hub::Response +Hub::Hibernate(const std::string& ModuleId) +{ + ZEN_ASSERT(!m_ShutdownFlag.load()); - OutInfo.Port = Instance->GetBasePort(); - // TODO: base URI? Would need to know what host name / IP to use + StorageServerInstance::ExclusiveLockedPtr Instance; + size_t ActiveInstanceIndex = (size_t)-1; - if (m_ProvisionedModuleCallback) { + RwLock::ExclusiveLockScope _(m_Lock); + + auto It = m_InstanceLookup.find(ModuleId); + if (It == m_InstanceLookup.end()) + { + return Response{EResponseCode::NotFound}; + } + + ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + + switch (CurrentState) + { + case HubInstanceState::Hibernating: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Provisioned: + break; + case HubInstanceState::Hibernated: + return Response{EResponseCode::Completed}; + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; + } + + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(InstanceRaw); + + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); + + // Re-validate state after acquiring the instance lock: WatchDog may have transitioned + // Provisioned -> Crashed between our hub-lock read and the LockExclusive call above. + + HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (ActualState != HubInstanceState::Provisioned) + { + Instance = {}; + return Response{ + EResponseCode::Rejected, + fmt::format("Module '{}' state changed to '{}' before hibernate could proceed", ModuleId, ToString(ActualState))}; + } + } + + // NOTE: done while not holding the hub lock, to avoid blocking other operations. + // Any concurrent caller that acquired the hub lock and saw Provisioned will now block on + // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below + // will have already changed the state and the re-validate above will reject it. + + ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); + + HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Hibernating); + const uint16_t Port = Instance.GetBasePort(); + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Hibernating, Port, {}); + + if (m_WorkerPool) + { + m_BackgroundWorkLatch.AddCount(1); try { - m_ProvisionedModuleCallback(ModuleId, OutInfo); + m_WorkerPool->ScheduleWork( + [this, + ModuleId = std::string(ModuleId), + ActiveInstanceIndex, + OldState, + Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + try + { + CompleteHibernate(*Instance, ActiveInstanceIndex, OldState); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async hibernate of module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); } - catch (const std::exception& Ex) + catch (const std::exception& DispatchEx) { - ZEN_ERROR("Provision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); + // Dispatch failed: undo latch increment and roll back state. + ZEN_ERROR("Failed async dispatch hibernate of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + + NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, OldState, Port, {}); + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); + } + + throw; } } + else + { + CompleteHibernate(Instance, ActiveInstanceIndex, OldState); + } - return true; + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } -bool -Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) +void +Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) +{ + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); + + try + { + Instance.Hibernate(); + UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Hibernated); + NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, HubInstanceState::Hibernated, Port, {}); + Instance = {}; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", ModuleId, Ex.what()); + UpdateInstanceState(Instance, ActiveInstanceIndex, OldState); + NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, OldState, Port, {}); + Instance = {}; + throw; + } +} + +Hub::Response +Hub::Wake(const std::string& ModuleId) { - std::unique_ptr<StorageServerInstance> Instance; + ZEN_ASSERT(!m_ShutdownFlag.load()); + + StorageServerInstance::ExclusiveLockedPtr Instance; + size_t ActiveInstanceIndex = (size_t)-1; { RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_ProvisioningModules.find(ModuleId); It != m_ProvisioningModules.end()) + auto It = m_InstanceLookup.find(ModuleId); + if (It == m_InstanceLookup.end()) { - OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId); + return Response{EResponseCode::NotFound}; + } - ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId); + ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - return false; - } + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); - if (auto It = m_Instances.find(ModuleId); It == m_Instances.end()) + switch (CurrentState) { - ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); - - // Not found, OutReason should be empty - return false; + case HubInstanceState::Waking: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Hibernated: + break; + case HubInstanceState::Provisioned: + return Response{EResponseCode::Completed}; + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; } - else + + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(InstanceRaw); + + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); + + // Re-validate state after acquiring the instance lock: a concurrent Wake or Deprovision may + // have transitioned Hibernated -> something else between our hub-lock read and LockExclusive. + HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (ActualState != HubInstanceState::Hibernated) { - Instance = std::move(It->second); - m_Instances.erase(It); - m_DeprovisioningModules.emplace(ModuleId); + Instance = {}; + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' state changed to '{}' before wake could proceed", ModuleId, ToString(ActualState))}; } } - uint16_t BasePort = Instance->GetBasePort(); - std::string BaseUri; // TODO? + // NOTE: done while not holding the hub lock, to avoid blocking other operations. + // Any concurrent caller that acquired the hub lock and saw Hibernated will now block on + // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below + // will have already changed the state and the re-validate above will reject it. + + ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); - if (m_DeprovisionedModuleCallback) + HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Waking); + const uint16_t Port = Instance.GetBasePort(); + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Waking, Port, {}); + + if (m_WorkerPool) { + m_BackgroundWorkLatch.AddCount(1); try { - m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort}); + m_WorkerPool->ScheduleWork( + [this, + ModuleId = std::string(ModuleId), + ActiveInstanceIndex, + OldState, + Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + try + { + CompleteWake(*Instance, ActiveInstanceIndex, OldState); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async wake of module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); } - catch (const std::exception& Ex) + catch (const std::exception& DispatchEx) { - ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what()); + // Dispatch failed: undo latch increment and roll back state. + ZEN_ERROR("Failed async dispatch wake of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + + NotifyStateUpdate(ModuleId, HubInstanceState::Waking, OldState, Port, {}); + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); + } + + throw; } } + else + { + CompleteWake(Instance, ActiveInstanceIndex, OldState); + } - // The module is deprovisioned outside the lock to avoid blocking other operations. - // - // To ensure that no new provisioning can occur while we're deprovisioning, - // we add the module ID to m_DeprovisioningModules and remove it once - // deprovisioning is complete. + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; +} - auto _ = MakeGuard([&] { - RwLock::ExclusiveLockScope _(m_Lock); - m_DeprovisioningModules.erase(ModuleId); - m_FreePorts.push_back(BasePort); - }); +void +Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) +{ + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); - Instance->Deprovision(); + try + { + Instance.Wake(); + UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); + NotifyStateUpdate(ModuleId, HubInstanceState::Waking, HubInstanceState::Provisioned, Port, {}); + Instance = {}; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", ModuleId, Ex.what()); + UpdateInstanceState(Instance, ActiveInstanceIndex, OldState); + NotifyStateUpdate(ModuleId, HubInstanceState::Waking, OldState, Port, {}); + Instance = {}; + throw; + } +} - return true; +void +Hub::RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, std::string_view ModuleId) +{ + Instance = {}; + + std::unique_ptr<StorageServerInstance> DeleteInstance; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + auto It = m_InstanceLookup.find(std::string(ModuleId)); + ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(It->second == ActiveInstanceIndex); + DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); + } + DeleteInstance.reset(); +} + +void +Hub::ObliterateBackendData(std::string_view ModuleId) +{ + std::filesystem::path ServerStateDir = m_RunEnvironment.GetChildBaseDir() / ModuleId; + std::filesystem::path TempDir = m_HydrationTempPath / ModuleId; + + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + + HydrationConfig Config{.ServerStateDir = ServerStateDir, + .TempDir = TempDir, + .ModuleId = std::string(ModuleId), + .TargetSpecification = m_HydrationTargetSpecification, + .Options = m_HydrationOptions}; + if (m_Config.OptionalHydrationWorkerPool) + { + Config.Threading.emplace(HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalHydrationWorkerPool, + .AbortFlag = &AbortFlag, + .PauseFlag = &PauseFlag}); + } + + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Obliterate(); } bool -Hub::Find(std::string_view ModuleId, StorageServerInstance** OutInstance) +Hub::Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo) { RwLock::SharedLockScope _(m_Lock); - if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end()) + if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) { - if (OutInstance) + if (OutInstanceInfo) { - *OutInstance = It->second.get(); + const size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(Instance); + InstanceInfo Info{m_ActiveInstances[ActiveInstanceIndex].State.load(), + m_ActiveInstances[ActiveInstanceIndex].StateChangeTime.load()}; + Info.Metrics = m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Load(); + Info.Port = Instance->GetBasePort(); + + *OutInstanceInfo = Info; } return true; } - else if (OutInstance) - { - *OutInstance = nullptr; - } return false; } void -Hub::EnumerateModules(std::function<void(StorageServerInstance&)> Callback) +Hub::EnumerateModules(std::function<void(std::string_view ModuleId, const InstanceInfo&)> Callback) { - RwLock::SharedLockScope _(m_Lock); - for (auto& It : m_Instances) + std::vector<std::pair<std::string, InstanceInfo>> Infos; { - Callback(*It.second); + RwLock::SharedLockScope _(m_Lock); + for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup) + { + const std::unique_ptr<StorageServerInstance>& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(Instance); + InstanceInfo Info{m_ActiveInstances[ActiveInstanceIndex].State.load(), + m_ActiveInstances[ActiveInstanceIndex].StateChangeTime.load()}; + Info.Metrics = m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Load(); + Info.Port = Instance->GetBasePort(); + + Infos.push_back(std::make_pair(std::string(Instance->GetModuleId()), Info)); + } + } + + for (const std::pair<std::string, InstanceInfo>& Info : Infos) + { + Callback(Info.first, Info.second); } } int Hub::GetInstanceCount() { - RwLock::SharedLockScope _(m_Lock); - return gsl::narrow_cast<int>(m_Instances.size()); + return m_Lock.WithSharedLock([this]() { return gsl::narrow_cast<int>(m_InstanceLookup.size()); }); } -void -Hub::UpdateCapacityMetrics() +bool +Hub::CanProvisionInstanceLocked(std::string_view ModuleId, std::string& OutReason) { - m_HostMetrics = GetSystemMetrics(); + if (m_ObliteratingInstances.contains(std::string(ModuleId))) + { + OutReason = fmt::format("module '{}' is being obliterated", ModuleId); + return false; + } + + if (m_FreeActiveInstanceIndexes.empty()) + { + OutReason = fmt::format("instance limit ({}) exceeded", m_Config.InstanceLimit); + + return false; + } + + const uint64_t DiskUsedBytes = m_DiskSpace.Free <= m_DiskSpace.Total ? m_DiskSpace.Total - m_DiskSpace.Free : 0; + if (m_Config.ResourceLimits.DiskUsageBytes > 0 && DiskUsedBytes > m_Config.ResourceLimits.DiskUsageBytes) + { + OutReason = + fmt::format("disk usage ({}) exceeds ({})", NiceBytes(DiskUsedBytes), NiceBytes(m_Config.ResourceLimits.DiskUsageBytes)); + return false; + } - // Update per-instance metrics + const uint64_t RamUsedMiB = m_SystemMetrics.AvailSystemMemoryMiB <= m_SystemMetrics.SystemMemoryMiB + ? m_SystemMetrics.SystemMemoryMiB - m_SystemMetrics.AvailSystemMemoryMiB + : 0; + const uint64_t RamUsedBytes = RamUsedMiB * 1024 * 1024; + if (m_Config.ResourceLimits.MemoryUsageBytes > 0 && RamUsedBytes > m_Config.ResourceLimits.MemoryUsageBytes) + { + OutReason = + fmt::format("ram usage ({}) exceeds ({})", NiceBytes(RamUsedBytes), NiceBytes(m_Config.ResourceLimits.MemoryUsageBytes)); + return false; + } + + return true; } -void -Hub::UpdateStats() +uint16_t +Hub::GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const { - m_Lock.WithSharedLock([this] { m_MaxInstanceCount = Max(m_MaxInstanceCount, gsl::narrow_cast<int>(m_Instances.size())); }); + return gsl::narrow<uint16_t>(m_Config.BasePortNumber + ActiveInstanceIndex); } bool -Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) +Hub::IsInstancePort(uint16_t Port) const { - if (m_DeprovisioningModules.find(std::string(ModuleId)) != m_DeprovisioningModules.end()) + if (Port < m_Config.BasePortNumber) { - OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId); + return false; + } + size_t Index = Port - m_Config.BasePortNumber; + if (Index >= m_ActiveInstances.size()) + { + return false; + } + return m_ActiveInstances[Index].State.load(std::memory_order_relaxed) != HubInstanceState::Unprovisioned; +} +HubInstanceState +Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState) +{ + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + ZEN_ASSERT_SLOW([](HubInstanceState From, HubInstanceState To) { + switch (From) + { + case HubInstanceState::Unprovisioned: + return To == HubInstanceState::Provisioning; + case HubInstanceState::Provisioned: + return To == HubInstanceState::Hibernating || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Crashed || + To == HubInstanceState::Obliterating; + case HubInstanceState::Hibernated: + return To == HubInstanceState::Waking || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Obliterating; + case HubInstanceState::Crashed: + return To == HubInstanceState::Provisioning || To == HubInstanceState::Deprovisioning || + To == HubInstanceState::Recovering || To == HubInstanceState::Obliterating; + case HubInstanceState::Provisioning: + return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned || To == HubInstanceState::Crashed; + case HubInstanceState::Hibernating: + return To == HubInstanceState::Hibernated || To == HubInstanceState::Provisioned; + case HubInstanceState::Waking: + return To == HubInstanceState::Provisioned || To == HubInstanceState::Hibernated; + case HubInstanceState::Deprovisioning: + return To == HubInstanceState::Unprovisioned || To == HubInstanceState::Provisioned || To == HubInstanceState::Hibernated || + To == HubInstanceState::Crashed; + case HubInstanceState::Recovering: + return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned; + case HubInstanceState::Obliterating: + return To == HubInstanceState::Unprovisioned || To == HubInstanceState::Crashed; + } return false; + }(m_ActiveInstances[ActiveInstanceIndex].State.load(), NewState)); + const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); + m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.store(0); + m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(Now); + m_ActiveInstances[ActiveInstanceIndex].StateChangeTime.store(Now); + return m_ActiveInstances[ActiveInstanceIndex].State.exchange(NewState); +} + +void +Hub::AttemptRecoverInstance(std::string_view ModuleId) +{ + StorageServerInstance::ExclusiveLockedPtr Instance; + size_t ActiveInstanceIndex = (size_t)-1; + { + RwLock::ExclusiveLockScope _(m_Lock); + + if (m_ShutdownFlag.load()) + { + return; + } + + auto It = m_InstanceLookup.find(std::string(ModuleId)); + if (It == m_InstanceLookup.end()) + { + return; + } + + ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(InstanceRaw); + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (CurrentState != HubInstanceState::Crashed) + { + return; + } + + Instance = m_ActiveInstances[ActiveInstanceIndex].Instance->LockExclusive(/*Wait*/ false); + if (!Instance) + { + // Instance lock is held by another operation; the watchdog will retry on the next cycle if the state is still Crashed. + return; + } + + ZEN_ASSERT(!Instance.IsRunning()); + + (void)UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Recovering); } - if (m_ProvisioningModules.find(std::string(ModuleId)) != m_ProvisioningModules.end()) + ZEN_ASSERT(Instance); + ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); + ZEN_ASSERT_SLOW(m_ActiveInstances[ActiveInstanceIndex].State.load() == HubInstanceState::Recovering); + + NotifyStateUpdate(ModuleId, HubInstanceState::Crashed, HubInstanceState::Recovering, Instance.GetBasePort(), /*BaseUri*/ {}); + + // Dehydrate before trying to recover so any salvageable data is preserved. + try { - OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId); + Instance.Deprovision(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what()); + NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Unprovisioned, Instance.GetBasePort(), /*BaseUri*/ {}); + Instance = {}; + std::unique_ptr<StorageServerInstance> DestroyInstance; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + auto It = m_InstanceLookup.find(std::string(ModuleId)); + ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(ActiveInstanceIndex == It->second); + + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); + (void)UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); + } + DestroyInstance.reset(); + return; + } - return false; + try + { + Instance.Provision(); + UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); + NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Provisioned, Instance.GetBasePort(), /*BaseUri*/ {}); + Instance = {}; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to reprovision instance for module '{}' during crash recovery reprovision: {}", ModuleId, Ex.what()); + NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Unprovisioned, Instance.GetBasePort(), /*BaseUri*/ {}); + Instance = {}; + std::unique_ptr<StorageServerInstance> DestroyInstance; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + auto It = m_InstanceLookup.find(std::string(ModuleId)); + ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(ActiveInstanceIndex == It->second); + + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); + (void)UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); + } + DestroyInstance.reset(); + return; } +} - if (gsl::narrow_cast<int>(m_Instances.size()) >= m_Config.InstanceLimit) +bool +Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient, + StorageServerInstance::SharedLockedPtr&& LockedInstance, + size_t ActiveInstanceIndex) +{ + const std::string ModuleId(LockedInstance.GetModuleId()); + + HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (LockedInstance.IsRunning()) { - OutReason = fmt::format("instance limit ({}) exceeded", m_Config.InstanceLimit); + m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Store(LockedInstance.GetProcessMetrics()); + if (InstanceState == HubInstanceState::Provisioned) + { + const uint16_t Port = LockedInstance.GetBasePort(); + const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load(); + const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load(); + + const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); + + // We do the activity check without holding a lock to the instance + LockedInstance = {}; + + uint64_t ActivitySum = PreviousActivitySum; + + std::chrono::system_clock::time_point NextCheckTime = + LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout - m_Config.WatchDog.InactivityCheckMargin; + if (Now >= NextCheckTime) + { + ActivityCheckClient.SetBaseUri(fmt::format("http://localhost:{}", Port)); + HttpClient::Response Result = + ActivityCheckClient.Get("/stats/activity_counters", HttpClient::Accept(HttpContentType::kCbObject)); + if (Result.IsSuccess()) + { + CbObject Response = Result.AsObject(); + if (Response) + { + ActivitySum = Response["sum"].AsUInt64(); + } + } + } + + if (ActivitySum != PreviousActivitySum) + { + m_Lock.WithSharedLock([this, InstanceState, PreviousActivitySum, &LastActivityTime, ActivitySum, Now, ModuleId]() { + if (auto It = m_InstanceLookup.find(ModuleId); It != m_InstanceLookup.end()) + { + const uint64_t ActiveInstanceIndex = It->second; + ActiveInstance& Instance = m_ActiveInstances[ActiveInstanceIndex]; + + HubInstanceState CurrentState = Instance.State.load(); + if (CurrentState == InstanceState) + { + if (Instance.LastActivityTime.load() == LastActivityTime && + Instance.LastKnownActivitySum.load() == PreviousActivitySum) + { + Instance.LastActivityTime.store(Now); + Instance.LastKnownActivitySum.store(ActivitySum); + } + } + } + }); + } + else if (LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout < Now) + { + ZEN_INFO("Instance {} has not been active for {}, attempting deprovision...", + ModuleId, + NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count())); + (void)InternalDeprovision( + ModuleId, + [ModuleId, InstanceState, LastActivityTime, PreviousActivitySum](ActiveInstance& Instance) -> bool { + HubInstanceState CurrentState = Instance.State.load(); + if (CurrentState != InstanceState) + { + ZEN_INFO("Instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState)); + return false; + } + if (Instance.LastActivityTime.load() != LastActivityTime || + Instance.LastKnownActivitySum.load() != PreviousActivitySum) + { + ZEN_INFO("Instance {} idle deprovision aborted due to activity", ModuleId); + return false; + } + return true; + }); + } + } + + return true; + } + else if (InstanceState == HubInstanceState::Provisioned) + { + // Process is not running but state says it should be - instance died unexpectedly. + const uint16_t Port = LockedInstance.GetBasePort(); + UpdateInstanceState(LockedInstance, ActiveInstanceIndex, HubInstanceState::Crashed); + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {}); + LockedInstance = {}; return false; } + else if (InstanceState == HubInstanceState::Hibernated) + { + // Process is not running - no HTTP activity check is possible. + // Use a pure time-based check; the margin window does not apply here. + const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load(); + const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load(); + const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); + LockedInstance = {}; + + if (LastActivityTime + m_Config.WatchDog.HibernatedInactivityTimeout < Now) + { + ZEN_INFO("Hibernated instance {} has not been active for {}, attempting deprovision...", + ModuleId, + NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count())); + (void)InternalDeprovision( + ModuleId, + [ModuleId, InstanceState, LastActivityTime, PreviousActivitySum](ActiveInstance& Instance) -> bool { + HubInstanceState CurrentState = Instance.State.load(); + if (CurrentState != InstanceState) + { + ZEN_INFO("Hibernated instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState)); + return false; + } + if (Instance.LastActivityTime.load() != LastActivityTime || Instance.LastKnownActivitySum.load() != PreviousActivitySum) + { + ZEN_INFO("Hibernated instance {} idle deprovision aborted due to activity", ModuleId); + return false; + } + return true; + }); + } + return true; + } + else + { + // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering, Obliterating) - expected, skip. + // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance + // lock was busy on a previous cycle and recovery is already pending. + return true; + } +} - // Since deprovisioning happens outside the lock and we don't add the port back until the instance is full shut down we might be under - // the instance limit but all ports may be in use - if (m_FreePorts.empty()) +void +Hub::UpdateMachineMetrics() +{ + try { - OutReason = fmt::format("no free ports available, deprovisioning of instances might be in flight ({})", - m_Config.InstanceLimit - m_Instances.size()); + bool DiskSpaceOk = false; + DiskSpace Disk; - return false; + std::filesystem::path ChildDir = m_RunEnvironment.GetChildBaseDir(); + if (!ChildDir.empty()) + { + if (DiskSpaceInfo(ChildDir, Disk)) + { + DiskSpaceOk = true; + } + else + { + ZEN_WARN("Failed to query disk space for '{}'; disk-based provisioning limits will not be enforced", ChildDir); + } + } + + SystemMetrics Metrics = GetSystemMetrics(); + + m_Lock.WithExclusiveLock([&]() { + if (DiskSpaceOk) + { + m_DiskSpace = Disk; + } + m_SystemMetrics = Metrics; + }); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed to update machine metrics. Reason: {}", Ex.what()); } +} - // TODO: handle additional resource metrics +void +Hub::WatchDog() +{ + const uint64_t CycleIntervalMs = std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.CycleInterval).count(); + const uint64_t CycleProcessingBudgetMs = + std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.CycleProcessingBudget).count(); + const uint64_t InstanceCheckThrottleMs = + std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.InstanceCheckThrottle).count(); + + HttpClient ActivityCheckClient("http://localhost", + HttpClientSettings{.ConnectTimeout = m_Config.WatchDog.ActivityCheckConnectTimeout, + .Timeout = m_Config.WatchDog.ActivityCheckRequestTimeout}, + [&]() -> bool { return m_WatchDogEvent.Wait(0); }); + + size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0 + while (!m_ShutdownFlag.load() && !m_WatchDogEvent.Wait(gsl::narrow<int>(CycleIntervalMs))) + { + try + { + UpdateMachineMetrics(); - return true; + // Snapshot slot count. We iterate all slots (including freed nulls) so + // round-robin coverage is not skewed by deprovisioned entries. + size_t SlotsRemaining = m_Lock.WithSharedLock([this]() { return m_ActiveInstances.size(); }); + + Stopwatch Timer; + bool ShuttingDown = m_ShutdownFlag.load(); + while (SlotsRemaining > 0 && Timer.GetElapsedTimeMs() < CycleProcessingBudgetMs && !ShuttingDown) + { + StorageServerInstance::SharedLockedPtr LockedInstance; + m_Lock.WithSharedLock([this, &CheckInstanceIndex, &LockedInstance, &SlotsRemaining]() { + // Advance through null (freed) slots under a single lock acquisition. + while (SlotsRemaining > 0) + { + SlotsRemaining--; + CheckInstanceIndex++; + if (CheckInstanceIndex >= m_ActiveInstances.size()) + { + CheckInstanceIndex = 0; + } + StorageServerInstance* Instance = m_ActiveInstances[CheckInstanceIndex].Instance.get(); + if (Instance) + { + LockedInstance = Instance->LockShared(/*Wait*/ false); + break; // Found a live slot (locked or busy); stop scanning this batch. + } + } + }); + + if (!LockedInstance) + { + // Either all remaining slots were null, or the live slot's lock was busy -- move on. + continue; + } + + std::string ModuleId(LockedInstance.GetModuleId()); + + try + { + bool InstanceIsOk = CheckInstanceStatus(ActivityCheckClient, std::move(LockedInstance), CheckInstanceIndex); + if (InstanceIsOk) + { + ShuttingDown = m_WatchDogEvent.Wait(gsl::narrow<int>(InstanceCheckThrottleMs)); + } + else + { + ZEN_WARN("Instance for module '{}' is not running, attempting recovery", ModuleId); + AttemptRecoverInstance(ModuleId); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed to check status of module {}. Reason: {}", ModuleId, Ex.what()); + } + ShuttingDown |= m_ShutdownFlag.load(); + } + } + catch (const std::exception& Ex) + { + // TODO: Catch specific errors such as asserts, OOM, OOD, system_error etc + ZEN_ERROR("Hub watchdog threw exception: {}", Ex.what()); + } + } +} + +void +Hub::NotifyStateUpdate(std::string_view ModuleId, + HubInstanceState OldState, + HubInstanceState NewState, + uint16_t BasePort, + std::string_view BaseUri) +{ + if (m_ModuleStateChangeCallback && OldState != NewState) + { + try + { + m_ModuleStateChangeCallback(ModuleId, + HubProvisionedInstanceInfo{.BaseUri = std::string(BaseUri), .Port = BasePort}, + OldState, + NewState); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Module state change callback for module '{}' failed. Reason: '{}'", ModuleId, Ex.what()); + } + } } #if ZEN_WITH_TESTS TEST_SUITE_BEGIN("server.hub"); +static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::milliseconds(200)}; + namespace hub_testutils { + struct TestHubPools + { + WorkerThreadPool ProvisionPool; + WorkerThreadPool HydrationPool; + + explicit TestHubPools(int ThreadCount) : ProvisionPool(ThreadCount, "hub_test_prov"), HydrationPool(ThreadCount, "hub_test_hydr") {} + }; + ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir) { return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir); } - std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir, - Hub::Configuration Config = {}, - Hub::ProvisionModuleCallbackFunc ProvisionCallback = {}, - Hub::ProvisionModuleCallbackFunc DeprovisionCallback = {}) + std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir, + Hub::Configuration Config = {}, + Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {}, + TestHubPools* Pools = nullptr) + { + if (Pools) + { + Config.OptionalProvisionWorkerPool = &Pools->ProvisionPool; + Config.OptionalHydrationWorkerPool = &Pools->HydrationPool; + } + return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback)); + } + + struct CallbackRecord + { + std::string ModuleId; + uint16_t Port; + }; + + struct StateChangeCapture + { + RwLock CallbackMutex; + std::vector<CallbackRecord> ProvisionCallbacks; + std::vector<CallbackRecord> DeprovisionCallbacks; + + auto CaptureFunc() + { + return [this](std::string_view ModuleId, + const HubProvisionedInstanceInfo& Info, + HubInstanceState PreviousState, + HubInstanceState NewState) { + ZEN_UNUSED(PreviousState); + if (NewState == HubInstanceState::Provisioned) + { + CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); + } + else if (NewState == HubInstanceState::Unprovisioned) + { + CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); + } + }; + } + }; + + // Poll until Find() returns false for the given module (i.e. async deprovision completes). + static bool WaitForInstanceGone(Hub& HubInstance, + std::string_view ModuleId, + std::chrono::milliseconds PollInterval = std::chrono::milliseconds(50), + std::chrono::seconds Timeout = std::chrono::seconds(30)) + { + const auto Deadline = std::chrono::steady_clock::now() + Timeout; + while (std::chrono::steady_clock::now() < Deadline) + { + if (!HubInstance.Find(ModuleId)) + { + return true; + } + std::this_thread::sleep_for(PollInterval); + } + return !HubInstance.Find(ModuleId); + } + + // Poll until GetInstanceCount() reaches ExpectedCount (i.e. all async deprovisions complete). + static bool WaitForInstanceCount(Hub& HubInstance, + int ExpectedCount, + std::chrono::milliseconds PollInterval = std::chrono::milliseconds(50), + std::chrono::seconds Timeout = std::chrono::seconds(30)) { - return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(ProvisionCallback), std::move(DeprovisionCallback)); + const auto Deadline = std::chrono::steady_clock::now() + Timeout; + while (std::chrono::steady_clock::now() < Deadline) + { + if (HubInstance.GetInstanceCount() == ExpectedCount) + { + return true; + } + std::this_thread::sleep_for(PollInterval); + } + return HubInstance.GetInstanceCount() == ExpectedCount; } } // namespace hub_testutils -TEST_CASE("hub.provision_basic") +TEST_CASE("hub.provision") { ScopedTemporaryDirectory TempDir; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + struct TransitionRecord + { + HubInstanceState OldState; + HubInstanceState NewState; + }; + RwLock CaptureMutex; + std::vector<TransitionRecord> Transitions; + + hub_testutils::StateChangeCapture CaptureInstance; + + auto CaptureFunc = + [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) { + CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); + CaptureInstance.CaptureFunc()(ModuleId, Info, OldState, NewState); + }; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc)); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); + // Provision HubProvisionedInstanceInfo Info; - std::string Reason; - const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); + const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info); + REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); CHECK_EQ(HubInstance->GetInstanceCount(), 1); - CHECK(HubInstance->Find("module_a")); + Hub::InstanceInfo InstanceInfo; + REQUIRE(HubInstance->Find("module_a", &InstanceInfo)); + CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned); + CHECK_NE(InstanceInfo.StateChangeTime, std::chrono::system_clock::time_point::min()); + CHECK_LE(InstanceInfo.StateChangeTime, std::chrono::system_clock::now()); + + { + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } - const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason); - CHECK(DeprovisionResult); + // Verify provision callback + { + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u); + CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "module_a"); + CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port); + } + + // Deprovision + const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); + + { + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } + + // Verify deprovision callback + { + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); + CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "module_a"); + CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port); + } + + // Verify full transition sequence + { + RwLock::SharedLockScope _(CaptureMutex); + REQUIRE_EQ(Transitions.size(), 4u); + CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned); + CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning); + CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning); + CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned); + CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned); + CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning); + CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning); + CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned); + } } TEST_CASE("hub.provision_config") @@ -527,69 +1980,32 @@ TEST_CASE("hub.provision_config") CHECK_FALSE(HubInstance->Find("module_a")); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); + const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info); + REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); CHECK_EQ(HubInstance->GetInstanceCount(), 1); - CHECK(HubInstance->Find("module_a")); + Hub::InstanceInfo InstanceInfo; + REQUIRE(HubInstance->Find("module_a", &InstanceInfo)); + CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned); HttpClient Client(fmt::format("http://127.0.0.1:{}{}", Info.Port, Info.BaseUri)); HttpClient::Response TestResponse = Client.Get("/status/builds"); CHECK(TestResponse.IsSuccess()); CHECK(TestResponse.AsObject()["ok"].AsBool()); - const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason); - CHECK(DeprovisionResult); - CHECK_EQ(HubInstance->GetInstanceCount(), 0); - CHECK_FALSE(HubInstance->Find("module_a")); -} - -TEST_CASE("hub.provision_callbacks") -{ - ScopedTemporaryDirectory TempDir; - - struct CallbackRecord - { - std::string ModuleId; - uint16_t Port; - }; - RwLock CallbackMutex; - std::vector<CallbackRecord> ProvisionRecords; - std::vector<CallbackRecord> DeprovisionRecords; - - auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { - CallbackMutex.WithExclusiveLock([&]() { ProvisionRecords.push_back({std::string(ModuleId), Info.Port}); }); - }; - auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { - CallbackMutex.WithExclusiveLock([&]() { DeprovisionRecords.push_back({std::string(ModuleId), Info.Port}); }); - }; - - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(ProvisionCb), std::move(DeprovisionCb)); - - HubProvisionedInstanceInfo Info; - std::string Reason; - - const bool ProvisionResult = HubInstance->Provision("cb_module", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); - { - RwLock::SharedLockScope _(CallbackMutex); - REQUIRE_EQ(ProvisionRecords.size(), 1u); - CHECK_EQ(ProvisionRecords[0].ModuleId, "cb_module"); - CHECK_EQ(ProvisionRecords[0].Port, Info.Port); - CHECK_NE(ProvisionRecords[0].Port, 0); + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); } - const bool DeprovisionResult = HubInstance->Deprovision("cb_module", Reason); - CHECK(DeprovisionResult); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + CHECK_FALSE(HubInstance->Find("module_a")); { - RwLock::SharedLockScope _(CallbackMutex); - REQUIRE_EQ(DeprovisionRecords.size(), 1u); - CHECK_EQ(DeprovisionRecords[0].ModuleId, "cb_module"); - CHECK_NE(DeprovisionRecords[0].Port, 0); - CHECK_EQ(ProvisionRecords.size(), 1u); + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); } } @@ -603,69 +2019,28 @@ TEST_CASE("hub.instance_limit") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool FirstResult = HubInstance->Provision("limit_a", Info, Reason); - REQUIRE_MESSAGE(FirstResult, Reason); + const Hub::Response FirstResult = HubInstance->Provision("limit_a", Info); + REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Completed, FirstResult.Message); - const bool SecondResult = HubInstance->Provision("limit_b", Info, Reason); - REQUIRE_MESSAGE(SecondResult, Reason); + const Hub::Response SecondResult = HubInstance->Provision("limit_b", Info); + REQUIRE_MESSAGE(SecondResult.ResponseCode == Hub::EResponseCode::Completed, SecondResult.Message); CHECK_EQ(HubInstance->GetInstanceCount(), 2); - Reason.clear(); - const bool ThirdResult = HubInstance->Provision("limit_c", Info, Reason); - CHECK_FALSE(ThirdResult); + const Hub::Response ThirdResult = HubInstance->Provision("limit_c", Info); + CHECK(ThirdResult.ResponseCode == Hub::EResponseCode::Rejected); CHECK_EQ(HubInstance->GetInstanceCount(), 2); - CHECK_NE(Reason.find("instance limit"), std::string::npos); + CHECK_NE(ThirdResult.Message.find("instance limit"), std::string::npos); - HubInstance->Deprovision("limit_a", Reason); + HubInstance->Deprovision("limit_a"); CHECK_EQ(HubInstance->GetInstanceCount(), 1); - Reason.clear(); - const bool FourthResult = HubInstance->Provision("limit_d", Info, Reason); - CHECK_MESSAGE(FourthResult, Reason); + const Hub::Response FourthResult = HubInstance->Provision("limit_d", Info); + CHECK_MESSAGE(FourthResult.ResponseCode == Hub::EResponseCode::Completed, FourthResult.Message); CHECK_EQ(HubInstance->GetInstanceCount(), 2); } -TEST_CASE("hub.deprovision_nonexistent") -{ - ScopedTemporaryDirectory TempDir; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); - - std::string Reason; - const bool Result = HubInstance->Deprovision("never_provisioned", Reason); - CHECK_FALSE(Result); - CHECK(Reason.empty()); - CHECK_EQ(HubInstance->GetInstanceCount(), 0); -} - -TEST_CASE("hub.enumerate_modules") -{ - ScopedTemporaryDirectory TempDir; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); - - HubProvisionedInstanceInfo Info; - std::string Reason; - - REQUIRE_MESSAGE(HubInstance->Provision("enum_a", Info, Reason), Reason); - REQUIRE_MESSAGE(HubInstance->Provision("enum_b", Info, Reason), Reason); - - std::vector<std::string> Ids; - HubInstance->EnumerateModules([&](StorageServerInstance& Instance) { Ids.push_back(std::string(Instance.GetModuleId())); }); - CHECK_EQ(Ids.size(), 2u); - const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end(); - const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end(); - CHECK(FoundA); - CHECK(FoundB); - - HubInstance->Deprovision("enum_a", Reason); - Ids.clear(); - HubInstance->EnumerateModules([&](StorageServerInstance& Instance) { Ids.push_back(std::string(Instance.GetModuleId())); }); - REQUIRE_EQ(Ids.size(), 1u); - CHECK_EQ(Ids[0], "enum_b"); -} - -TEST_CASE("hub.max_instance_count") +TEST_CASE("hub.enumerate_and_instance_tracking") { ScopedTemporaryDirectory TempDir; std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); @@ -673,97 +2048,58 @@ TEST_CASE("hub.max_instance_count") CHECK_EQ(HubInstance->GetMaxInstanceCount(), 0); HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision("max_a", Info, Reason), Reason); + { + const Hub::Response R = HubInstance->Provision("track_a", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } CHECK_GE(HubInstance->GetMaxInstanceCount(), 1); - REQUIRE_MESSAGE(HubInstance->Provision("max_b", Info, Reason), Reason); - CHECK_GE(HubInstance->GetMaxInstanceCount(), 2); - - const int MaxAfterTwo = HubInstance->GetMaxInstanceCount(); - - HubInstance->Deprovision("max_a", Reason); - CHECK_EQ(HubInstance->GetInstanceCount(), 1); - CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo); -} - -TEST_CASE("hub.concurrent") -{ - ScopedTemporaryDirectory TempDir; - Hub::Configuration Config; - Config.BasePortNumber = 22000; - Config.InstanceLimit = 10; - - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); - - constexpr int kHalf = 3; - - // Serially pre-provision kHalf modules - for (int I = 0; I < kHalf; ++I) { - HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason); + const Hub::Response R = HubInstance->Provision("track_b", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } - CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); - - // Simultaneously: - // Provisioner pool → provisions kHalf new modules ("new_0" .. "new_N") - // Deprovisioner pool → deprovisions the kHalf pre-provisioned modules ("pre_0" .. "pre_N") - // The two pools use distinct OS threads, so provisions and deprovisions are interleaved. - - // Use int rather than bool to avoid std::vector<bool> bitfield packing, - // which would cause data races on concurrent per-index writes. - std::vector<int> ProvisionResults(kHalf, 0); - std::vector<std::string> ProvisionReasons(kHalf); - std::vector<int> DeprovisionResults(kHalf, 0); + CHECK_GE(HubInstance->GetMaxInstanceCount(), 2); + // Enumerate both modules { - WorkerThreadPool Provisioners(kHalf, "hub_test_provisioners"); - WorkerThreadPool Deprovisioneers(kHalf, "hub_test_deprovisioneers"); - - std::vector<std::future<void>> ProvisionFutures(kHalf); - std::vector<std::future<void>> DeprovisionFutures(kHalf); + std::vector<std::string> Ids; + int ProvisionedCount = 0; + HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { + Ids.push_back(std::string(ModuleId)); + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + ProvisionedCount++; + } + }); + CHECK_EQ(Ids.size(), 2u); + CHECK_EQ(ProvisionedCount, 2); + CHECK(std::find(Ids.begin(), Ids.end(), "track_a") != Ids.end()); + CHECK(std::find(Ids.begin(), Ids.end(), "track_b") != Ids.end()); + } - for (int I = 0; I < kHalf; ++I) - { - ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] { - HubProvisionedInstanceInfo Info; - std::string Reason; - const bool Result = - HubInstance->Provision(fmt::format("new_{}", I), Info, Reason); - ProvisionResults[I] = Result ? 1 : 0; - ProvisionReasons[I] = Reason; - }), - WorkerThreadPool::EMode::EnableBacklog); - - DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] { - std::string Reason; - const bool Result = - HubInstance->Deprovision(fmt::format("pre_{}", I), Reason); - DeprovisionResults[I] = Result ? 1 : 0; - }), - WorkerThreadPool::EMode::EnableBacklog); - } + const int MaxAfterTwo = HubInstance->GetMaxInstanceCount(); - for (std::future<void>& F : ProvisionFutures) - { - F.get(); - } - for (std::future<void>& F : DeprovisionFutures) - { - F.get(); - } - } + // Deprovision one - max instance count must not decrease + HubInstance->Deprovision("track_a"); + CHECK_EQ(HubInstance->GetInstanceCount(), 1); + CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo); - for (int I = 0; I < kHalf; ++I) + // Enumerate after deprovision { - CHECK_MESSAGE(ProvisionResults[I] != 0, ProvisionReasons[I]); - CHECK(DeprovisionResults[I] != 0); + std::vector<std::string> Ids; + int ProvisionedCount = 0; + HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { + Ids.push_back(std::string(ModuleId)); + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + ProvisionedCount++; + } + }); + REQUIRE_EQ(Ids.size(), 1u); + CHECK_EQ(Ids[0], "track_b"); + CHECK_EQ(ProvisionedCount, 1); } - // Only the newly provisioned modules should remain - CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); } TEST_CASE("hub.concurrent_callbacks") @@ -773,23 +2109,9 @@ TEST_CASE("hub.concurrent_callbacks") Config.BasePortNumber = 22300; Config.InstanceLimit = 10; - struct CallbackRecord - { - std::string ModuleId; - uint16_t Port; - }; - RwLock CallbackMutex; - std::vector<CallbackRecord> ProvisionCallbacks; - std::vector<CallbackRecord> DeprovisionCallbacks; - - auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { - CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); - }; - auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { - CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); - }; + hub_testutils::StateChangeCapture CaptureInstance; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, std::move(ProvisionCb), std::move(DeprovisionCb)); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, CaptureInstance.CaptureFunc()); constexpr int kHalf = 3; @@ -798,15 +2120,15 @@ TEST_CASE("hub.concurrent_callbacks") for (int I = 0; I < kHalf; ++I) { HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason); + const Hub::Response ProvR = HubInstance->Provision(fmt::format("pre_{}", I), Info); + REQUIRE_MESSAGE(ProvR.ResponseCode == Hub::EResponseCode::Completed, ProvR.Message); } CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); { - RwLock::ExclusiveLockScope _(CallbackMutex); - REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); - ProvisionCallbacks.clear(); + RwLock::ExclusiveLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); + CaptureInstance.ProvisionCallbacks.clear(); } // Concurrently provision kHalf new modules while deprovisioning the pre-provisioned ones. @@ -823,23 +2145,21 @@ TEST_CASE("hub.concurrent_callbacks") for (int I = 0; I < kHalf; ++I) { - ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] { - HubProvisionedInstanceInfo Info; - std::string Reason; - const bool Result = - HubInstance->Provision(fmt::format("new_{}", I), Info, Reason); - ProvisionResults[I] = Result ? 1 : 0; - ProvisionReasons[I] = Reason; - }), - WorkerThreadPool::EMode::EnableBacklog); - - DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] { - std::string Reason; - const bool Result = - HubInstance->Deprovision(fmt::format("pre_{}", I), Reason); - DeprovisionResults[I] = Result ? 1 : 0; - }), - WorkerThreadPool::EMode::EnableBacklog); + ProvisionFutures[I] = + Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] { + HubProvisionedInstanceInfo Info; + const Hub::Response Result = HubInstance->Provision(fmt::format("new_{}", I), Info); + ProvisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0; + ProvisionReasons[I] = Result.Message; + }), + WorkerThreadPool::EMode::EnableBacklog); + + DeprovisionFutures[I] = + Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] { + const Hub::Response Result = HubInstance->Deprovision(fmt::format("pre_{}", I)); + DeprovisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0; + }), + WorkerThreadPool::EMode::EnableBacklog); } for (std::future<void>& F : ProvisionFutures) @@ -863,17 +2183,17 @@ TEST_CASE("hub.concurrent_callbacks") // Each new_* module must have triggered exactly one provision callback with a non-zero port. // Each pre_* module must have triggered exactly one deprovision callback with a non-zero port. { - RwLock::SharedLockScope _(CallbackMutex); - REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); - REQUIRE_EQ(DeprovisionCallbacks.size(), static_cast<size_t>(kHalf)); + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), static_cast<size_t>(kHalf)); + REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), static_cast<size_t>(kHalf)); - for (const CallbackRecord& Record : ProvisionCallbacks) + for (const hub_testutils::CallbackRecord& Record : CaptureInstance.ProvisionCallbacks) { CHECK_NE(Record.Port, 0); const bool IsNewModule = Record.ModuleId.rfind("new_", 0) == 0; CHECK_MESSAGE(IsNewModule, Record.ModuleId); } - for (const CallbackRecord& Record : DeprovisionCallbacks) + for (const hub_testutils::CallbackRecord& Record : CaptureInstance.DeprovisionCallbacks) { CHECK_NE(Record.Port, 0); const bool IsPreModule = Record.ModuleId.rfind("pre_", 0) == 0; @@ -894,14 +2214,13 @@ TEST_CASE("hub.job_object") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool ProvisionResult = HubInstance->Provision("jobobj_a", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); + const Hub::Response ProvisionResult = HubInstance->Provision("jobobj_a", Info); + REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); - const bool DeprovisionResult = HubInstance->Deprovision("jobobj_a", Reason); - CHECK(DeprovisionResult); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("jobobj_a"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); } @@ -914,22 +2233,639 @@ TEST_CASE("hub.job_object") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool ProvisionResult = HubInstance->Provision("nojobobj_a", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); + const Hub::Response ProvisionResult = HubInstance->Provision("nojobobj_a", Info); + REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); - const bool DeprovisionResult = HubInstance->Deprovision("nojobobj_a", Reason); - CHECK(DeprovisionResult); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("nojobobj_a"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); } } # endif // ZEN_PLATFORM_WINDOWS +TEST_CASE("hub.hibernate_wake_obliterate") +{ + ScopedTemporaryDirectory TempDir; + Hub::Configuration Config; + Config.BasePortNumber = 22600; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + + HubProvisionedInstanceInfo ProvInfo; + Hub::InstanceInfo Info; + + // Error cases on non-existent modules (no provision needed) + CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + + // Provision + { + const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + REQUIRE(HubInstance->Find("hib_a", &Info)); + CHECK_EQ(Info.State, HubInstanceState::Provisioned); + const std::chrono::system_clock::time_point ProvisionedTime = Info.StateChangeTime; + CHECK_NE(ProvisionedTime, std::chrono::system_clock::time_point::min()); + CHECK_LE(ProvisionedTime, std::chrono::system_clock::now()); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + + // Double-wake on provisioned module is idempotent + CHECK(HubInstance->Wake("hib_a").ResponseCode == Hub::EResponseCode::Completed); + + // Hibernate + { + const Hub::Response R = HubInstance->Hibernate("hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + REQUIRE(HubInstance->Find("hib_a", &Info)); + CHECK_EQ(Info.State, HubInstanceState::Hibernated); + const std::chrono::system_clock::time_point HibernatedTime = Info.StateChangeTime; + CHECK_GE(HibernatedTime, ProvisionedTime); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } + + // Double-hibernate on already-hibernated module is idempotent + CHECK(HubInstance->Hibernate("hib_a").ResponseCode == Hub::EResponseCode::Completed); + + // Wake + { + const Hub::Response R = HubInstance->Wake("hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + REQUIRE(HubInstance->Find("hib_a", &Info)); + CHECK_EQ(Info.State, HubInstanceState::Provisioned); + CHECK_GE(Info.StateChangeTime, HibernatedTime); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + + // Hibernate again for obliterate-from-hibernated test + { + const Hub::Response R = HubInstance->Hibernate("hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + REQUIRE(HubInstance->Find("hib_a", &Info)); + CHECK_EQ(Info.State, HubInstanceState::Hibernated); + + // Obliterate from hibernated + { + const Hub::Response R = HubInstance->Obliterate("hib_a"); + CHECK(R.ResponseCode == Hub::EResponseCode::Completed); + } + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + CHECK_FALSE(HubInstance->Find("hib_a")); + + // Re-provision for obliterate-from-provisioned test + { + const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + REQUIRE(HubInstance->Find("hib_a", &Info)); + CHECK_EQ(Info.State, HubInstanceState::Provisioned); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + + // Obliterate from provisioned + { + const Hub::Response R = HubInstance->Obliterate("hib_a"); + CHECK(R.ResponseCode == Hub::EResponseCode::Completed); + } + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + CHECK_FALSE(HubInstance->Find("hib_a")); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } + + // Obliterate deprovisioned module (not tracked by hub, backend data may exist) + { + const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + { + const Hub::Response R = HubInstance->Deprovision("hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + CHECK_FALSE(HubInstance->Find("hib_a")); + { + const Hub::Response R = HubInstance->Obliterate("hib_a"); + CHECK(R.ResponseCode == Hub::EResponseCode::Completed); + } + + // Obliterate of a never-provisioned module also succeeds (no-op backend cleanup) + CHECK(HubInstance->Obliterate("never_existed").ResponseCode == Hub::EResponseCode::Completed); +} + +TEST_CASE("hub.async_hibernate_wake") +{ + ScopedTemporaryDirectory TempDir; + + Hub::Configuration Config; + Config.BasePortNumber = 23000; + + hub_testutils::TestHubPools Pools(2); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); + + HubProvisionedInstanceInfo ProvInfo; + Hub::InstanceInfo Info; + + constexpr auto kPollInterval = std::chrono::milliseconds(50); + constexpr auto kTimeout = std::chrono::seconds(30); + + // Provision and wait until Provisioned + { + const Hub::Response R = HubInstance->Provision("async_hib_a", ProvInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); + } + { + const auto Deadline = std::chrono::steady_clock::now() + kTimeout; + bool Ready = false; + while (std::chrono::steady_clock::now() < Deadline) + { + if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned) + { + Ready = true; + break; + } + std::this_thread::sleep_for(kPollInterval); + } + REQUIRE_MESSAGE(Ready, "Instance did not reach Provisioned state within timeout"); + } + + // Hibernate asynchronously and poll until Hibernated + { + const Hub::Response R = HubInstance->Hibernate("async_hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); + } + { + const auto Deadline = std::chrono::steady_clock::now() + kTimeout; + bool Hibernated = false; + while (std::chrono::steady_clock::now() < Deadline) + { + if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Hibernated) + { + Hibernated = true; + break; + } + std::this_thread::sleep_for(kPollInterval); + } + REQUIRE_MESSAGE(Hibernated, "Instance did not reach Hibernated state within timeout"); + } + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } + + // Wake asynchronously and poll until Provisioned + { + const Hub::Response R = HubInstance->Wake("async_hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); + } + { + const auto Deadline = std::chrono::steady_clock::now() + kTimeout; + bool Woken = false; + while (std::chrono::steady_clock::now() < Deadline) + { + if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned) + { + Woken = true; + break; + } + std::this_thread::sleep_for(kPollInterval); + } + REQUIRE_MESSAGE(Woken, "Instance did not reach Provisioned state after wake within timeout"); + } + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + + // Deprovision asynchronously and poll until the instance is gone + { + const Hub::Response R = HubInstance->Deprovision("async_hib_a"); + CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); + } + REQUIRE_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "async_hib_a"), "Instance did not deprovision within timeout"); +} + +TEST_CASE("hub.recover_process_crash") +{ + ScopedTemporaryDirectory TempDir; + + struct TransitionRecord + { + HubInstanceState OldState; + HubInstanceState NewState; + }; + RwLock CaptureMutex; + std::vector<TransitionRecord> Transitions; + auto CaptureFunc = [&](std::string_view, const HubProvisionedInstanceInfo&, HubInstanceState OldState, HubInstanceState NewState) { + CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); + }; + + // Fast watchdog cycle so crash detection is near-instant instead of waiting up to the 3s default. + Hub::Configuration Config; + Config.WatchDog.CycleInterval = std::chrono::milliseconds(10); + Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1); + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, std::move(CaptureFunc)); + + HubProvisionedInstanceInfo Info; + { + const Hub::Response R = HubInstance->Provision("module_a", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + + // Kill the child process to simulate a crash, then poll until the watchdog detects it, + // recovers the instance, and the new process is serving requests. + HubInstance->TerminateModuleForTesting("module_a"); + + constexpr auto kPollIntervalMs = std::chrono::milliseconds(50); + constexpr auto kTimeoutMs = std::chrono::seconds(15); + const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs; + + // A successful HTTP health check on the same port confirms the new process is up. + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + bool Recovered = false; + while (std::chrono::steady_clock::now() < Deadline) + { + std::this_thread::sleep_for(kPollIntervalMs); + Hub::InstanceInfo InstanceInfo; + if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned && + ModClient.Get("/health/")) + { + CHECK_EQ(InstanceInfo.Port, Info.Port); + Recovered = true; + break; + } + } + REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout"); + + // Verify the full crash/recovery callback sequence + { + RwLock::SharedLockScope _(CaptureMutex); + REQUIRE_GE(Transitions.size(), 3u); + const auto CrashedIt = std::find_if(Transitions.begin(), Transitions.end(), [](const TransitionRecord& R) { + return R.OldState == HubInstanceState::Provisioned && R.NewState == HubInstanceState::Crashed; + }); + REQUIRE_NE(CrashedIt, Transitions.end()); + const auto RecoveringIt = CrashedIt + 1; + REQUIRE_NE(RecoveringIt, Transitions.end()); + CHECK_EQ(RecoveringIt->OldState, HubInstanceState::Crashed); + CHECK_EQ(RecoveringIt->NewState, HubInstanceState::Recovering); + const auto RecoveredIt = RecoveringIt + 1; + REQUIRE_NE(RecoveredIt, Transitions.end()); + CHECK_EQ(RecoveredIt->OldState, HubInstanceState::Recovering); + CHECK_EQ(RecoveredIt->NewState, HubInstanceState::Provisioned); + } + + // After recovery, deprovision should succeed and a re-provision should work. + { + const Hub::Response R = HubInstance->Deprovision("module_a"); + CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + + HubProvisionedInstanceInfo NewInfo; + { + const Hub::Response R = HubInstance->Provision("module_a", NewInfo); + CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + CHECK_NE(NewInfo.Port, 0); + HttpClient NewClient(fmt::format("http://localhost:{}", NewInfo.Port), kFastTimeout); + CHECK_MESSAGE(NewClient.Get("/health/"), "Re-provisioned instance is not serving requests"); +} + +TEST_CASE("hub.async_provision_concurrent") +{ + ScopedTemporaryDirectory TempDir; + + constexpr int kModuleCount = 8; + + Hub::Configuration Config; + Config.BasePortNumber = 22800; + Config.InstanceLimit = kModuleCount; + + hub_testutils::TestHubPools Pools(4); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); + + std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount); + std::vector<std::string> Reasons(kModuleCount); + std::vector<int> Results(kModuleCount, 0); + + { + WorkerThreadPool Callers(kModuleCount, "hub_async_callers"); + std::vector<std::future<void>> Futures(kModuleCount); + + for (int I = 0; I < kModuleCount; ++I) + { + Futures[I] = Callers.EnqueueTask(std::packaged_task<void()>([&, I] { + const Hub::Response Resp = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]); + Results[I] = (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? 1 : 0; + Reasons[I] = Resp.Message; + }), + WorkerThreadPool::EMode::EnableBacklog); + } + for (std::future<void>& F : Futures) + { + F.get(); + } + } + + for (int I = 0; I < kModuleCount; ++I) + { + REQUIRE_MESSAGE(Results[I] != 0, Reasons[I]); + CHECK_NE(Infos[I].Port, 0); + } + + // Poll until all instances reach Provisioned state + constexpr auto kPollInterval = std::chrono::milliseconds(50); + constexpr auto kTimeout = std::chrono::seconds(30); + const auto Deadline = std::chrono::steady_clock::now() + kTimeout; + + bool AllProvisioned = false; + while (std::chrono::steady_clock::now() < Deadline) + { + int ProvisionedCount = 0; + for (int I = 0; I < kModuleCount; ++I) + { + Hub::InstanceInfo InstanceInfo; + if (HubInstance->Find(fmt::format("async_c{}", I), &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned) + { + ++ProvisionedCount; + } + } + if (ProvisionedCount == kModuleCount) + { + AllProvisioned = true; + break; + } + std::this_thread::sleep_for(kPollInterval); + } + CHECK_MESSAGE(AllProvisioned, "Not all instances reached Provisioned state within timeout"); + + for (int I = 0; I < kModuleCount; ++I) + { + HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout); + CHECK_MESSAGE(ModClient.Get("/health/"), fmt::format("async_c{} not serving requests", I)); + } + + for (int I = 0; I < kModuleCount; ++I) + { + const Hub::Response DepResp = HubInstance->Deprovision(fmt::format("async_c{}", I)); + CHECK_MESSAGE(DepResp.ResponseCode == Hub::EResponseCode::Accepted, DepResp.Message); + } + REQUIRE_MESSAGE(hub_testutils::WaitForInstanceCount(*HubInstance, 0), "Not all instances deprovisioned within timeout"); +} + +TEST_CASE("hub.async_provision_shutdown_waits") +{ + ScopedTemporaryDirectory TempDir; + + constexpr int kModuleCount = 8; + + Hub::Configuration Config; + Config.InstanceLimit = kModuleCount; + Config.BasePortNumber = 22900; + + hub_testutils::TestHubPools Pools(2); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); + + std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount); + + for (int I = 0; I < kModuleCount; ++I) + { + const Hub::Response ProvResult = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]); + REQUIRE_MESSAGE(ProvResult.ResponseCode == Hub::EResponseCode::Accepted, ProvResult.Message); + REQUIRE_NE(Infos[I].Port, 0); + } + + // Shut down without polling for Provisioned; Shutdown() must drain the latch and clean up. + HubInstance->Shutdown(); + + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + + for (int I = 0; I < kModuleCount; ++I) + { + HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout); + CHECK_FALSE(ModClient.Get("/health/")); + } +} + +TEST_CASE("hub.async_provision_rejected") +{ + // Rejection from CanProvisionInstanceLocked fires synchronously even when a WorkerPool is present. + ScopedTemporaryDirectory TempDir; + + Hub::Configuration Config; + Config.InstanceLimit = 1; + Config.BasePortNumber = 23100; + + hub_testutils::TestHubPools Pools(2); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); + + HubProvisionedInstanceInfo Info; + + // First provision: dispatched to WorkerPool, returns Accepted + const Hub::Response FirstResult = HubInstance->Provision("async_r1", Info); + REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Accepted, FirstResult.Message); + REQUIRE_NE(Info.Port, 0); + + // Second provision: CanProvisionInstanceLocked rejects synchronously (limit reached), returns Rejected + HubProvisionedInstanceInfo Info2; + const Hub::Response SecondResult = HubInstance->Provision("async_r2", Info2); + CHECK(SecondResult.ResponseCode == Hub::EResponseCode::Rejected); + CHECK_FALSE(SecondResult.Message.empty()); + CHECK_NE(SecondResult.Message.find("instance limit"), std::string::npos); + CHECK_EQ(HubInstance->GetInstanceCount(), 1); +} + +TEST_CASE("hub.instance.inactivity.deprovision") +{ + ScopedTemporaryDirectory TempDir; + + // Aggressive watchdog settings to keep test duration short. + // Provisioned timeout (2s) > Hibernated timeout (1s) - this is the key invariant under test. + // Margin (1s) means the HTTP activity check fires at LastActivityTime+1s for Provisioned instances. + // The Hibernated branch ignores the margin and uses a direct time-based check. + Hub::Configuration Config; + Config.BasePortNumber = 23200; + Config.InstanceLimit = 3; + Config.WatchDog.CycleInterval = std::chrono::milliseconds(10); + Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1); + Config.WatchDog.ProvisionedInactivityTimeout = std::chrono::seconds(2); + Config.WatchDog.HibernatedInactivityTimeout = std::chrono::seconds(1); + Config.WatchDog.InactivityCheckMargin = std::chrono::seconds(1); + Config.WatchDog.ActivityCheckConnectTimeout = std::chrono::milliseconds(200); + Config.WatchDog.ActivityCheckRequestTimeout = std::chrono::milliseconds(500); + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + + // Provision in order: idle first, idle_hib second (then hibernate), persistent last. + // idle_hib uses the shorter Hibernated timeout (1s) and expires before idle (2s provisioned). + // persistent gets real HTTP PUTs so its activity timer is reset; it must still be alive + // after both idle instances are gone. + + HubProvisionedInstanceInfo IdleInfo; + { + const Hub::Response R = HubInstance->Provision("idle", IdleInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + + HubProvisionedInstanceInfo IdleHibInfo; + { + const Hub::Response R = HubInstance->Provision("idle_hib", IdleHibInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + const Hub::Response H = HubInstance->Hibernate("idle_hib"); + REQUIRE_MESSAGE(H.ResponseCode == Hub::EResponseCode::Completed, H.Message); + } + + HubProvisionedInstanceInfo PersistentInfo; + { + const Hub::Response R = HubInstance->Provision("persistent", PersistentInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + + auto PokeInstance = [&](uint16_t Port) { + // Make a real storage request to increment the instance's activity sum. + // The watchdog detects the changed sum on the next cycle and resets LastActivityTime. + { + HttpClient PersistentClient(fmt::format("http://localhost:{}", Port), + HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200)}); + uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() - + std::chrono::steady_clock::time_point::min()) + .count(); + IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick)); + const HttpClient::Response PutResult = + PersistentClient.Put(fmt::format("/z$/ns1/b/{}", Key), + IoBufferBuilder::MakeFromMemory(MakeMemoryView(std::string_view("keepalive")))); + CHECK(PutResult); + } + }; + + PokeInstance(IdleInfo.Port); + PokeInstance(PersistentInfo.Port); + + Sleep(100); + + // Phase 1: immediately after setup all three instances must still be alive. + // No timeout has elapsed yet (only 100ms have passed). + CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 4s provisioned timeout has not elapsed"); + + CHECK_MESSAGE(HubInstance->Find("idle_hib"), "idle_hib was deprovisioned within 100ms - its 1s hibernated timeout has not elapsed"); + + CHECK_MESSAGE(HubInstance->Find("persistent"), + "persistent was deprovisioned within 100ms - its 4s provisioned timeout has not elapsed"); + + // Phase 2: idle_hib must be deprovisioned by the watchdog within its 1s hibernated timeout. + // idle must remain alive - its 2s provisioned timeout has not elapsed yet. + CHECK_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "idle_hib", std::chrono::milliseconds(100), std::chrono::seconds(3)), + "idle_hib was not deprovisioned within its 1s hibernated timeout"); + + CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should be gone after its 1s hibernated timeout elapsed"); + + CHECK_MESSAGE(HubInstance->Find("idle"), + "idle was deprovisioned before its 2s provisioned timeout - only idle_hib's 1s hibernated timeout has elapsed"); + + CHECK_MESSAGE(HubInstance->Find("persistent"), + "persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance"); + + PokeInstance(PersistentInfo.Port); + + // Phase 3: idle must be deprovisioned by the watchdog within its 2s provisioned timeout. + // persistent must remain alive - its activity timer was reset by PokeInstance. + CHECK_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "idle", std::chrono::milliseconds(100), std::chrono::seconds(4)), + "idle was not deprovisioned within its 2s provisioned timeout"); + + CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should still be gone - it was deprovisioned in phase 2"); + + CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 4s provisioned timeout elapsed"); + + CHECK_MESSAGE(HubInstance->Find("persistent"), + "persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance"); + + HubInstance->Shutdown(); +} + +TEST_CASE("hub.machine_metrics") +{ + ScopedTemporaryDirectory TempDir; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}); + + // UpdateMachineMetrics() is called synchronously in the Hub constructor, so metrics + // are available immediately without waiting for a watchdog cycle. + SystemMetrics SysMetrics; + DiskSpace Disk; + HubInstance->GetMachineMetrics(SysMetrics, Disk); + + CHECK_GT(Disk.Total, 0u); + CHECK_LE(Disk.Free, Disk.Total); + + CHECK_GT(SysMetrics.SystemMemoryMiB, 0u); + CHECK_LE(SysMetrics.AvailSystemMemoryMiB, SysMetrics.SystemMemoryMiB); + + CHECK_GT(SysMetrics.VirtualMemoryMiB, 0u); + CHECK_LE(SysMetrics.AvailVirtualMemoryMiB, SysMetrics.VirtualMemoryMiB); +} + +TEST_CASE("hub.provision_rejected_resource_limits") +{ + // The Hub constructor calls UpdateMachineMetrics() synchronously, so CanProvisionInstanceLocked + // can enforce limits immediately without waiting for a watchdog cycle. + ScopedTemporaryDirectory TempDir; + + { + Hub::Configuration Config; + Config.ResourceLimits.DiskUsageBytes = 1; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + HubProvisionedInstanceInfo Info; + const Hub::Response Result = HubInstance->Provision("disk_limit", Info); + CHECK(Result.ResponseCode == Hub::EResponseCode::Rejected); + CHECK_NE(Result.Message.find("disk usage"), std::string::npos); + } + + { + Hub::Configuration Config; + Config.ResourceLimits.MemoryUsageBytes = 1; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + HubProvisionedInstanceInfo Info; + const Hub::Response Result = HubInstance->Provision("mem_limit", Info); + CHECK(Result.ResponseCode == Hub::EResponseCode::Rejected); + CHECK_NE(Result.Message.find("ram usage"), std::string::npos); + } +} + TEST_SUITE_END(); void +Hub::TerminateModuleForTesting(const std::string& ModuleId) +{ + RwLock::SharedLockScope _(m_Lock); + auto It = m_InstanceLookup.find(ModuleId); + if (It == m_InstanceLookup.end()) + { + return; + } + StorageServerInstance::SharedLockedPtr Locked = m_ActiveInstances[It->second].Instance->LockShared(/*Wait*/ true); + if (Locked) + { + Locked.TerminateForTesting(); + } +} + +void hub_forcelink() { } |