// Copyright Epic Games, Inc. All Rights Reserved. #include "hub.h" #include "storageserverinstance.h" #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include # include # include # include #endif #include namespace zen { /////////////////////////////////////////////////////////////////////////// /** * A timeline of events with sequence IDs and timestamps. Used to * track significant events for broadcasting to listeners. */ class EventTimeline { public: EventTimeline() { m_Events.reserve(1024); } ~EventTimeline() {} EventTimeline(const EventTimeline&) = delete; EventTimeline& operator=(const EventTimeline&) = delete; void RecordEvent(std::string_view EventTag, CbObject EventMetadata) { const uint64_t SequenceId = m_NextEventId++; const auto Now = std::chrono::steady_clock::now(); RwLock::ExclusiveLockScope _(m_Lock); m_Events.emplace_back(SequenceId, EventTag, Now, std::move(EventMetadata)); } struct EventRecord { uint64_t SequenceId; std::string Tag; std::chrono::steady_clock::time_point Timestamp; CbObject EventMetadata; EventRecord(uint64_t InSequenceId, std::string_view InTag, std::chrono::steady_clock::time_point InTimestamp, CbObject InEventMetadata = CbObject()) : SequenceId(InSequenceId) , Tag(InTag) , Timestamp(InTimestamp) , EventMetadata(InEventMetadata) { } }; /** * Iterate over events that have a SequenceId greater than SinceEventId * * @param Callback A callable that takes a const EventRecord& * @param SinceEventId The SequenceId to compare against */ void IterateEventsSince(auto&& Callback, uint64_t SinceEventId) { // Hold the lock for as short a time as possible eastl::fixed_vector EventsToProcess; m_Lock.WithSharedLock([&] { for (auto& Event : m_Events) { if (Event.SequenceId > SinceEventId) { EventsToProcess.push_back(Event); } } }); // Now invoke the callback outside the lock for (auto& Event : EventsToProcess) { Callback(Event); } } /** * Trim events up to (and including) the given SequenceId. Intended * to be used for cleaning up events which are not longer interesting. * * @param UpToEventId The SequenceId up to which events should be removed */ void TrimEventsUpTo(uint64_t UpToEventId) { RwLock::ExclusiveLockScope _(m_Lock); auto It = std::remove_if(m_Events.begin(), m_Events.end(), [UpToEventId](const EventRecord& Event) { return Event.SequenceId <= UpToEventId; }); m_Events.erase(It, m_Events.end()); } private: std::atomic m_NextEventId{0}; RwLock m_Lock; std::vector m_Events; }; ////////////////////////////////////////////////////////////////////////// Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, WorkerThreadPool* OptionalWorkerPool, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) : m_Config(Config) , m_RunEnvironment(std::move(RunEnvironment)) , m_WorkerPool(OptionalWorkerPool) , m_BackgroundWorkLatch(1) , m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback)) , m_ActiveInstances(Config.InstanceLimit) , m_FreeActiveInstanceIndexes(Config.InstanceLimit) { m_HostMetrics = GetSystemMetrics(); m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024; if (m_Config.HydrationTargetSpecification.empty()) { std::filesystem::path FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); ZEN_INFO("using file hydration path: '{}'", FileHydrationPath); m_HydrationTargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native())); } else { m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification; } m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp"); ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath); ZEN_ASSERT(uint64_t(Config.BasePortNumber) + Config.InstanceLimit <= std::numeric_limits::max()); m_InstanceLookup.reserve(Config.InstanceLimit); std::iota(m_FreeActiveInstanceIndexes.begin(), m_FreeActiveInstanceIndexes.end(), 0); #if ZEN_PLATFORM_WINDOWS if (m_Config.UseJobObject) { m_JobObject.Initialize(); if (m_JobObject.IsValid()) { ZEN_INFO("Job object initialized for hub service child process management"); } else { ZEN_WARN("Failed to initialize job object; child processes will not be auto-terminated on hub crash"); } } #endif m_WatchDog = std::thread([this]() { WatchDog(); }); } Hub::~Hub() { try { // Safety call - should normally be properly Shutdown by owner Shutdown(); } catch (const std::exception& e) { ZEN_WARN("Exception during hub service shutdown: {}", e.what()); } } void Hub::Shutdown() { ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); m_WatchDogEvent.Set(); if (m_WatchDog.joinable()) { m_WatchDog.join(); } m_WatchDog = {}; bool Expected = false; bool WaitForBackgroundWork = m_ShutdownFlag.compare_exchange_strong(Expected, true); if (WaitForBackgroundWork && m_WorkerPool) { m_BackgroundWorkLatch.CountDown(); m_BackgroundWorkLatch.Wait(); // Shutdown flag is set and all background work is drained, safe to shut down remaining instances m_BackgroundWorkLatch.Reset(1); } EnumerateModules([&](std::string_view ModuleId, const InstanceInfo& Info) { ZEN_UNUSED(Info); // This might need to be checked to avoid spurious non-relevant warnings... try { const Response DepResp = InternalDeprovision(std::string(ModuleId)); if (DepResp.ResponseCode != EResponseCode::Completed && DepResp.ResponseCode != EResponseCode::Accepted) { ZEN_WARN("Deprovision instance for module '{}' during hub shutdown rejected: {}", ModuleId, DepResp.Message); } } catch (const std::exception& Ex) { ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what()); } }); if (WaitForBackgroundWork && m_WorkerPool) { m_BackgroundWorkLatch.CountDown(); m_BackgroundWorkLatch.Wait(); } } Hub::Response Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) { ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; bool IsNewInstance = false; size_t ActiveInstanceIndex = (size_t)-1; HubInstanceState OldState = HubInstanceState::Unprovisioned; { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end()) { std::string Reason; if (!CanProvisionInstance(ModuleId, /* out */ Reason)) { ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason); return Response{EResponseCode::Rejected, Reason}; } IsNewInstance = true; ActiveInstanceIndex = m_FreeActiveInstanceIndexes.front(); m_FreeActiveInstanceIndexes.pop_front(); ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); try { auto NewInstance = std::make_unique( m_RunEnvironment, StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), .HydrationTempPath = m_HydrationTempPath, .HydrationTargetSpecification = m_HydrationTargetSpecification, .HttpThreadCount = m_Config.InstanceHttpThreadCount, .CoreLimit = m_Config.InstanceCoreLimit, .ConfigPath = m_Config.InstanceConfigPath}, ModuleId); #if ZEN_PLATFORM_WINDOWS if (m_JobObject.IsValid()) { NewInstance->SetJobObject(&m_JobObject); } #endif Instance = NewInstance->LockExclusive(/*Wait*/ true); m_ActiveInstances[ActiveInstanceIndex].Instance = std::move(NewInstance); m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex); // Set Provisioning while both hub lock and instance lock are held so that any // concurrent Deprovision sees the in-flight state, not Unprovisioned. OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioning); } catch (const std::exception&) { Instance = {}; m_ActiveInstances[ActiveInstanceIndex].Instance.reset(); m_ActiveInstances[ActiveInstanceIndex].State.store(HubInstanceState::Unprovisioned); m_InstanceLookup.erase(std::string(ModuleId)); m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); throw; } OutInfo.Port = GetInstanceIndexAssignedPort(ActiveInstanceIndex); ZEN_INFO("Created new storage server instance for module '{}'", ModuleId); const int CurrentInstanceCount = gsl::narrow_cast(m_InstanceLookup.size()); int CurrentMaxCount = m_MaxInstanceCount.load(); const int NewMax = Max(CurrentMaxCount, CurrentInstanceCount); m_MaxInstanceCount.compare_exchange_weak(CurrentMaxCount, NewMax); } else { ActiveInstanceIndex = It->second; ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); std::unique_ptr& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(InstanceRaw); OutInfo.Port = InstanceRaw->GetBasePort(); switch (CurrentState) { case HubInstanceState::Provisioning: return Response{EResponseCode::Accepted}; case HubInstanceState::Crashed: case HubInstanceState::Unprovisioned: break; case HubInstanceState::Provisioned: return Response{EResponseCode::Completed}; case HubInstanceState::Hibernated: _.ReleaseNow(); return Wake(std::string(ModuleId)); default: return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; } Instance = InstanceRaw->LockExclusive(/*Wait*/ true); // Re-validate state after acquiring the instance lock: a concurrent Provision may have // completed between our hub-lock read and LockExclusive, transitioning the state away // from Crashed/Unprovisioned. HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load(); if (ActualState != HubInstanceState::Crashed && ActualState != HubInstanceState::Unprovisioned) { Instance = {}; if (ActualState == HubInstanceState::Provisioned) { return Response{EResponseCode::Completed}; } if (ActualState == HubInstanceState::Provisioning) { return Response{EResponseCode::Accepted}; } return Response{ EResponseCode::Rejected, fmt::format("Module '{}' state changed to '{}' before provision could proceed", ModuleId, ToString(ActualState))}; } // Set Provisioning while both hub lock and instance lock are held so that any // concurrent Deprovision sees the in-flight state, not Crashed/Unprovisioned. OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioning); } } // NOTE: done while not holding the hub lock, to avoid blocking other operations. // Both hub-lock paths above set OldState and updated the state to Provisioning before // releasing the hub lock, so concurrent operations already see the in-flight state. ZEN_ASSERT(Instance); ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Provisioning, OutInfo.Port, {}); if (m_WorkerPool) { m_BackgroundWorkLatch.AddCount(1); try { m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, OldState, IsNewInstance, Instance = std::make_shared(std::move(Instance))]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { CompleteProvision(*Instance, ActiveInstanceIndex, OldState, IsNewInstance); } catch (const std::exception& Ex) { ZEN_ERROR("Failed async provision of module '{}': {}", ModuleId, Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& DispatchEx) { // Dispatch failed: undo latch increment and roll back state. ZEN_ERROR("Failed async dispatch provision of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); // dispatch failed before the lambda ran, so ActiveInstance::State is still Provisioning NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, OldState, OutInfo.Port, {}); std::unique_ptr DestroyInstance; { RwLock::ExclusiveLockScope HubLock(m_Lock); ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); if (IsNewInstance) { DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(std::string(ModuleId)); } UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); } DestroyInstance.reset(); throw; } } else { CompleteProvision(Instance, ActiveInstanceIndex, OldState, IsNewInstance); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState, bool IsNewInstance) { const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); std::string BaseUri; // TODO? if (m_ShutdownFlag.load() == false) { try { switch (OldState) { case HubInstanceState::Crashed: case HubInstanceState::Unprovisioned: Instance.Provision(); break; case HubInstanceState::Hibernated: ZEN_ASSERT(false); // unreachable: Provision redirects Hibernated->Wake before setting Provisioning break; default: ZEN_ASSERT(false); } UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Provisioned, Port, BaseUri); Instance = {}; return; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); // Instance will be notified and removed below. } } if (IsNewInstance) { NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, HubInstanceState::Unprovisioned, Port, {}); Instance = {}; std::unique_ptr DestroyInstance; { RwLock::ExclusiveLockScope HubLock(m_Lock); ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(std::string(ModuleId)); UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); } DestroyInstance.reset(); } else { // OldState = Crashed: restore without cleanup (instance stays in lookup) NotifyStateUpdate(ModuleId, HubInstanceState::Provisioning, OldState, Port, {}); UpdateInstanceState(Instance, ActiveInstanceIndex, OldState); Instance = {}; } } Hub::Response Hub::Deprovision(const std::string& ModuleId) { ZEN_ASSERT(!m_ShutdownFlag.load()); return InternalDeprovision(ModuleId); } Hub::Response Hub::InternalDeprovision(const std::string& ModuleId) { StorageServerInstance::ExclusiveLockedPtr Instance; size_t ActiveInstanceIndex = (size_t)-1; { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_InstanceLookup.find(ModuleId); It == m_InstanceLookup.end()) { ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); return Response{EResponseCode::NotFound}; } else { ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); switch (CurrentState) { case HubInstanceState::Deprovisioning: return Response{EResponseCode::Accepted}; case HubInstanceState::Crashed: case HubInstanceState::Hibernated: case HubInstanceState::Provisioned: break; case HubInstanceState::Unprovisioned: return Response{EResponseCode::Completed}; case HubInstanceState::Recovering: // Recovering is watchdog-managed; reject to avoid interfering with the in-progress // recovery. The watchdog will transition to Provisioned or Unprovisioned, after // which deprovision can be retried. return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; default: return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; } std::unique_ptr& RawInstance = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(RawInstance != nullptr); Instance = RawInstance->LockExclusive(/*Wait*/ true); } } // NOTE: done while not holding the hub lock, to avoid blocking other operations. // The exclusive instance lock acquired above prevents concurrent LockExclusive callers // from modifying instance state. The state transition to Deprovisioning happens below, // after the hub lock is released. ZEN_ASSERT(Instance); ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Deprovisioning); const uint16_t Port = Instance.GetBasePort(); NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Deprovisioning, Port, {}); if (m_WorkerPool) { std::shared_ptr SharedInstancePtr = std::make_shared(std::move(Instance)); m_BackgroundWorkLatch.AddCount(1); try { m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { CompleteDeprovision(*Instance, ActiveInstanceIndex); } catch (const std::exception& Ex) { ZEN_ERROR("Failed async deprovision of module '{}': {}", ModuleId, Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& DispatchEx) { // Dispatch failed: undo latch increment and roll back state. ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, OldState, Port, {}); { RwLock::ExclusiveLockScope HubLock(m_Lock); ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); } throw; } } else { CompleteDeprovision(Instance, ActiveInstanceIndex); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex) { const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); try { Instance.Deprovision(); } catch (const std::exception& Ex) { ZEN_ERROR("Failed to deprovision storage server instance for module '{}': {}", ModuleId, Ex.what()); // Effectively unreachable: Shutdown() never throws and Dehydrate() failures are swallowed // by DeprovisionLocked. Kept as a safety net; if somehow reached, transition to Crashed // so the watchdog can attempt recovery. Instance = {}; { RwLock::ExclusiveLockScope HubLock(m_Lock); UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Crashed); } NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Crashed, Port, {}); throw; } NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Unprovisioned, Port, {}); Instance = {}; std::unique_ptr DeleteInstance; { RwLock::ExclusiveLockScope HubLock(m_Lock); auto It = m_InstanceLookup.find(std::string(ModuleId)); ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); ZEN_ASSERT_SLOW(It->second == ActiveInstanceIndex); DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(It); UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); } DeleteInstance.reset(); } Hub::Response Hub::Hibernate(const std::string& ModuleId) { ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; size_t ActiveInstanceIndex = (size_t)-1; { RwLock::ExclusiveLockScope _(m_Lock); auto It = m_InstanceLookup.find(ModuleId); if (It == m_InstanceLookup.end()) { return Response{EResponseCode::NotFound}; } ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); switch (CurrentState) { case HubInstanceState::Hibernating: return Response{EResponseCode::Accepted}; case HubInstanceState::Provisioned: break; case HubInstanceState::Hibernated: return Response{EResponseCode::Completed}; default: return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; } std::unique_ptr& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(InstanceRaw); Instance = InstanceRaw->LockExclusive(/*Wait*/ true); // Re-validate state after acquiring the instance lock: WatchDog may have transitioned // Provisioned -> Crashed between our hub-lock read and the LockExclusive call above. HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load(); if (ActualState != HubInstanceState::Provisioned) { Instance = {}; return Response{ EResponseCode::Rejected, fmt::format("Module '{}' state changed to '{}' before hibernate could proceed", ModuleId, ToString(ActualState))}; } } // NOTE: done while not holding the hub lock, to avoid blocking other operations. // Any concurrent caller that acquired the hub lock and saw Provisioned will now block on // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below // will have already changed the state and the re-validate above will reject it. ZEN_ASSERT(Instance); ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Hibernating); const uint16_t Port = Instance.GetBasePort(); NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Hibernating, Port, {}); if (m_WorkerPool) { m_BackgroundWorkLatch.AddCount(1); try { m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, OldState, Instance = std::make_shared(std::move(Instance))]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { CompleteHibernate(*Instance, ActiveInstanceIndex, OldState); } catch (const std::exception& Ex) { ZEN_ERROR("Failed async hibernate of module '{}': {}", ModuleId, Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& DispatchEx) { // Dispatch failed: undo latch increment and roll back state. ZEN_ERROR("Failed async dispatch hibernate of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, OldState, Port, {}); { RwLock::ExclusiveLockScope HubLock(m_Lock); ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); } throw; } } else { CompleteHibernate(Instance, ActiveInstanceIndex, OldState); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) { const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); try { Instance.Hibernate(); UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Hibernated); NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, HubInstanceState::Hibernated, Port, {}); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", ModuleId, Ex.what()); UpdateInstanceState(Instance, ActiveInstanceIndex, OldState); NotifyStateUpdate(ModuleId, HubInstanceState::Hibernating, OldState, Port, {}); Instance = {}; throw; } } Hub::Response Hub::Wake(const std::string& ModuleId) { ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; size_t ActiveInstanceIndex = (size_t)-1; { RwLock::ExclusiveLockScope _(m_Lock); auto It = m_InstanceLookup.find(ModuleId); if (It == m_InstanceLookup.end()) { return Response{EResponseCode::NotFound}; } ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); switch (CurrentState) { case HubInstanceState::Waking: return Response{EResponseCode::Accepted}; case HubInstanceState::Hibernated: break; case HubInstanceState::Provisioned: return Response{EResponseCode::Completed}; default: return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; } std::unique_ptr& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(InstanceRaw); Instance = InstanceRaw->LockExclusive(/*Wait*/ true); // Re-validate state after acquiring the instance lock: a concurrent Wake or Deprovision may // have transitioned Hibernated -> something else between our hub-lock read and LockExclusive. HubInstanceState ActualState = m_ActiveInstances[ActiveInstanceIndex].State.load(); if (ActualState != HubInstanceState::Hibernated) { Instance = {}; return Response{EResponseCode::Rejected, fmt::format("Module '{}' state changed to '{}' before wake could proceed", ModuleId, ToString(ActualState))}; } } // NOTE: done while not holding the hub lock, to avoid blocking other operations. // Any concurrent caller that acquired the hub lock and saw Hibernated will now block on // LockExclusive(Wait=true); by the time it acquires the lock, UpdateInstanceState below // will have already changed the state and the re-validate above will reject it. ZEN_ASSERT(Instance); ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Waking); const uint16_t Port = Instance.GetBasePort(); NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Waking, Port, {}); if (m_WorkerPool) { m_BackgroundWorkLatch.AddCount(1); try { m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, OldState, Instance = std::make_shared(std::move(Instance))]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { CompleteWake(*Instance, ActiveInstanceIndex, OldState); } catch (const std::exception& Ex) { ZEN_ERROR("Failed async wake of module '{}': {}", ModuleId, Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& DispatchEx) { // Dispatch failed: undo latch increment and roll back state. ZEN_ERROR("Failed async dispatch wake of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); NotifyStateUpdate(ModuleId, HubInstanceState::Waking, OldState, Port, {}); { RwLock::ExclusiveLockScope HubLock(m_Lock); ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); } throw; } } else { CompleteWake(Instance, ActiveInstanceIndex, OldState); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) { const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); try { Instance.Wake(); UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); NotifyStateUpdate(ModuleId, HubInstanceState::Waking, HubInstanceState::Provisioned, Port, {}); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", ModuleId, Ex.what()); UpdateInstanceState(Instance, ActiveInstanceIndex, OldState); NotifyStateUpdate(ModuleId, HubInstanceState::Waking, OldState, Port, {}); Instance = {}; throw; } } bool Hub::Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo) { RwLock::SharedLockScope _(m_Lock); if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) { if (OutInstanceInfo) { const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); const std::unique_ptr& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(Instance); InstanceInfo Info{ m_ActiveInstances[ActiveInstanceIndex].State.load(), std::chrono::system_clock::now() // TODO }; Instance->GetProcessMetrics(Info.Metrics); Info.Port = Instance->GetBasePort(); *OutInstanceInfo = Info; } return true; } return false; } void Hub::EnumerateModules(std::function Callback) { std::vector> Infos; { RwLock::SharedLockScope _(m_Lock); for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup) { const std::unique_ptr& Instance = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(Instance); InstanceInfo Info{ m_ActiveInstances[ActiveInstanceIndex].State.load(), std::chrono::system_clock::now() // TODO }; Instance->GetProcessMetrics(Info.Metrics); Info.Port = Instance->GetBasePort(); Infos.push_back(std::make_pair(std::string(Instance->GetModuleId()), Info)); } } for (const std::pair& Info : Infos) { Callback(Info.first, Info.second); } } int Hub::GetInstanceCount() { return m_Lock.WithSharedLock([this]() { return gsl::narrow_cast(m_InstanceLookup.size()); }); } void Hub::UpdateCapacityMetrics() { m_HostMetrics = GetSystemMetrics(); // TODO: Should probably go into WatchDog and use atomic for update so it can be read without locks... // Per-instance stats are already refreshed by WatchDog and are readable via the Find and EnumerateModules } void Hub::UpdateStats() { int CurrentInstanceCount = m_Lock.WithSharedLock([this] { return gsl::narrow_cast(m_InstanceLookup.size()); }); int CurrentMaxCount = m_MaxInstanceCount.load(); int NewMax = Max(CurrentMaxCount, CurrentInstanceCount); m_MaxInstanceCount.compare_exchange_weak(CurrentMaxCount, NewMax); } bool Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) { ZEN_UNUSED(ModuleId); if (m_FreeActiveInstanceIndexes.empty()) { OutReason = fmt::format("instance limit ({}) exceeded", m_Config.InstanceLimit); return false; } // TODO: handle additional resource metrics return true; } uint16_t Hub::GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const { return gsl::narrow(m_Config.BasePortNumber + ActiveInstanceIndex); } HubInstanceState Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState) { ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); ZEN_ASSERT_SLOW([](HubInstanceState From, HubInstanceState To) { switch (From) { case HubInstanceState::Unprovisioned: return To == HubInstanceState::Provisioning; case HubInstanceState::Provisioned: return To == HubInstanceState::Hibernating || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Crashed; case HubInstanceState::Hibernated: return To == HubInstanceState::Waking || To == HubInstanceState::Deprovisioning; case HubInstanceState::Crashed: return To == HubInstanceState::Provisioning || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Recovering; case HubInstanceState::Provisioning: return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned || To == HubInstanceState::Crashed; case HubInstanceState::Hibernating: return To == HubInstanceState::Hibernated || To == HubInstanceState::Provisioned; case HubInstanceState::Waking: return To == HubInstanceState::Provisioned || To == HubInstanceState::Hibernated; case HubInstanceState::Deprovisioning: return To == HubInstanceState::Unprovisioned || To == HubInstanceState::Provisioned || To == HubInstanceState::Hibernated || To == HubInstanceState::Crashed; case HubInstanceState::Recovering: return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned; } return false; }(m_ActiveInstances[ActiveInstanceIndex].State.load(), NewState)); return m_ActiveInstances[ActiveInstanceIndex].State.exchange(NewState); } void Hub::AttemptRecoverInstance(std::string_view ModuleId) { StorageServerInstance::ExclusiveLockedPtr Instance; size_t ActiveInstanceIndex = (size_t)-1; { RwLock::ExclusiveLockScope _(m_Lock); auto It = m_InstanceLookup.find(std::string(ModuleId)); if (It == m_InstanceLookup.end()) { return; } ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); std::unique_ptr& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex].Instance; ZEN_ASSERT(InstanceRaw); HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); if (CurrentState != HubInstanceState::Crashed) { return; } Instance = m_ActiveInstances[ActiveInstanceIndex].Instance->LockExclusive(/*Wait*/ false); if (!Instance) { // Instance lock is held by another operation; the watchdog will retry on the next cycle if the state is still Crashed. return; } ZEN_ASSERT(!Instance.IsRunning()); (void)UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Recovering); } ZEN_ASSERT(Instance); ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); ZEN_ASSERT_SLOW(m_ActiveInstances[ActiveInstanceIndex].State.load() == HubInstanceState::Recovering); NotifyStateUpdate(ModuleId, HubInstanceState::Crashed, HubInstanceState::Recovering, Instance.GetBasePort(), /*BaseUri*/ {}); // Dehydrate before trying to recover so any salvageable data is preserved. try { Instance.Deprovision(); } catch (const std::exception& Ex) { ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what()); NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Unprovisioned, Instance.GetBasePort(), /*BaseUri*/ {}); Instance = {}; std::unique_ptr DestroyInstance; { RwLock::ExclusiveLockScope HubLock(m_Lock); auto It = m_InstanceLookup.find(std::string(ModuleId)); ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); ZEN_ASSERT_SLOW(ActiveInstanceIndex == It->second); DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(It); (void)UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); } DestroyInstance.reset(); return; } try { Instance.Provision(); UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Provisioned); NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Provisioned, Instance.GetBasePort(), /*BaseUri*/ {}); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to reprovision instance for module '{}' during crash recovery reprovision: {}", ModuleId, Ex.what()); NotifyStateUpdate(ModuleId, HubInstanceState::Recovering, HubInstanceState::Unprovisioned, Instance.GetBasePort(), /*BaseUri*/ {}); Instance = {}; std::unique_ptr DestroyInstance; { RwLock::ExclusiveLockScope HubLock(m_Lock); auto It = m_InstanceLookup.find(std::string(ModuleId)); ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); ZEN_ASSERT_SLOW(ActiveInstanceIndex == It->second); DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(It); (void)UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); } DestroyInstance.reset(); return; } } void Hub::WatchDog() { constexpr uint64_t WatchDogWakeupTimeMs = 3000; constexpr uint64_t WatchDogProcessingTimeMs = 500; size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0 while (!m_WatchDogEvent.Wait(WatchDogWakeupTimeMs)) { try { // Snapshot slot count. We iterate all slots (including freed nulls) so // round-robin coverage is not skewed by deprovisioned entries. size_t SlotsRemaining = m_Lock.WithSharedLock([this]() { return m_ActiveInstances.size(); }); Stopwatch Timer; bool ShuttingDown = false; while (SlotsRemaining > 0 && Timer.GetElapsedTimeMs() < WatchDogProcessingTimeMs && !ShuttingDown) { StorageServerInstance::SharedLockedPtr LockedInstance; m_Lock.WithSharedLock([this, &CheckInstanceIndex, &LockedInstance, &SlotsRemaining]() { // Advance through null (freed) slots under a single lock acquisition. while (SlotsRemaining > 0) { SlotsRemaining--; CheckInstanceIndex++; if (CheckInstanceIndex >= m_ActiveInstances.size()) { CheckInstanceIndex = 0; } StorageServerInstance* Instance = m_ActiveInstances[CheckInstanceIndex].Instance.get(); if (Instance) { LockedInstance = Instance->LockShared(/*Wait*/ false); break; // Found a live slot (locked or busy); stop scanning this batch. } } }); if (!LockedInstance) { // Either all remaining slots were null, or the live slot's lock was busy -- move on. continue; } if (LockedInstance.IsRunning()) { LockedInstance.UpdateMetrics(); } else if (m_ActiveInstances[CheckInstanceIndex].State.load() == HubInstanceState::Provisioned) { // Process is not running but state says it should be - instance died unexpectedly. const std::string ModuleId(LockedInstance.GetModuleId()); const uint16_t Port = LockedInstance.GetBasePort(); UpdateInstanceState(LockedInstance, CheckInstanceIndex, HubInstanceState::Crashed); NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {}); LockedInstance = {}; AttemptRecoverInstance(ModuleId); } // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip. // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance // lock was busy on a previous cycle and recovery is already pending. LockedInstance = {}; // Rate-limit: pause briefly between live-instance checks and respond to shutdown. ShuttingDown = m_WatchDogEvent.Wait(5); } } catch (const std::exception& Ex) { // TODO: Catch specific errors such as asserts, OOM, OOD, system_error etc ZEN_ERROR("Hub watchdog threw exception: {}", Ex.what()); } } } void Hub::NotifyStateUpdate(std::string_view ModuleId, HubInstanceState OldState, HubInstanceState NewState, uint16_t BasePort, std::string_view BaseUri) { if (m_ModuleStateChangeCallback && OldState != NewState) { try { m_ModuleStateChangeCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = std::string(BaseUri), .Port = BasePort}, OldState, NewState); } catch (const std::exception& Ex) { ZEN_ERROR("Module state change callback for module '{}' failed. Reason: '{}'", ModuleId, Ex.what()); } } } #if ZEN_WITH_TESTS TEST_SUITE_BEGIN("server.hub"); static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::milliseconds(200)}; namespace hub_testutils { ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir) { return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir); } std::unique_ptr MakeHub(const std::filesystem::path& BaseDir, Hub::Configuration Config = {}, Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {}, WorkerThreadPool* WorkerPool = nullptr) { return std::make_unique(Config, MakeHubEnvironment(BaseDir), WorkerPool, std::move(StateChangeCallback)); } struct CallbackRecord { std::string ModuleId; uint16_t Port; }; struct StateChangeCapture { RwLock CallbackMutex; std::vector ProvisionCallbacks; std::vector DeprovisionCallbacks; auto CaptureFunc() { return [this](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState PreviousState, HubInstanceState NewState) { ZEN_UNUSED(PreviousState); if (NewState == HubInstanceState::Provisioned) { CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); } else if (NewState == HubInstanceState::Unprovisioned) { CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); } }; } }; // Poll until Find() returns false for the given module (i.e. async deprovision completes). static bool WaitForInstanceGone(Hub& HubInstance, std::string_view ModuleId, std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200), std::chrono::seconds Timeout = std::chrono::seconds(30)) { const auto Deadline = std::chrono::steady_clock::now() + Timeout; while (std::chrono::steady_clock::now() < Deadline) { if (!HubInstance.Find(ModuleId)) { return true; } std::this_thread::sleep_for(PollInterval); } return !HubInstance.Find(ModuleId); } // Poll until GetInstanceCount() reaches ExpectedCount (i.e. all async deprovisions complete). static bool WaitForInstanceCount(Hub& HubInstance, int ExpectedCount, std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200), std::chrono::seconds Timeout = std::chrono::seconds(30)) { const auto Deadline = std::chrono::steady_clock::now() + Timeout; while (std::chrono::steady_clock::now() < Deadline) { if (HubInstance.GetInstanceCount() == ExpectedCount) { return true; } std::this_thread::sleep_for(PollInterval); } return HubInstance.GetInstanceCount() == ExpectedCount; } } // namespace hub_testutils TEST_CASE("hub.provision_basic") { ScopedTemporaryDirectory TempDir; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path()); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); HubProvisionedInstanceInfo Info; const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info); REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); CHECK_EQ(HubInstance->GetInstanceCount(), 1); Hub::InstanceInfo InstanceInfo; REQUIRE(HubInstance->Find("module_a", &InstanceInfo)); CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned); { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } } TEST_CASE("hub.provision_config") { ScopedTemporaryDirectory TempDir; CreateDirectories(TempDir.Path() / "hub"); std::string LuaConfig = "server = {\n" " buildstore = {\n" " enabled = true,\n" " }\n" "}\n"; WriteFile(TempDir.Path() / "config.lua", IoBuffer(IoBuffer::Wrap, LuaConfig.data(), LuaConfig.length())); std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path() / "hub", Hub::Configuration{.InstanceConfigPath = TempDir.Path() / "config.lua"}); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); HubProvisionedInstanceInfo Info; const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info); REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); CHECK_EQ(HubInstance->GetInstanceCount(), 1); Hub::InstanceInfo InstanceInfo; REQUIRE(HubInstance->Find("module_a", &InstanceInfo)); CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned); HttpClient Client(fmt::format("http://127.0.0.1:{}{}", Info.Port, Info.BaseUri)); HttpClient::Response TestResponse = Client.Get("/status/builds"); CHECK(TestResponse.IsSuccess()); CHECK(TestResponse.AsObject()["ok"].AsBool()); { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } } TEST_CASE("hub.provision_callbacks") { ScopedTemporaryDirectory TempDir; hub_testutils::StateChangeCapture CaptureInstance; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, CaptureInstance.CaptureFunc()); HubProvisionedInstanceInfo Info; const Hub::Response ProvisionResult = HubInstance->Provision("cb_module", Info); REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); { RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u); CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "cb_module"); CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port); CHECK_NE(CaptureInstance.ProvisionCallbacks[0].Port, 0); } { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } const Hub::Response DeprovisionResult = HubInstance->Deprovision("cb_module"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } { RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "cb_module"); CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port); CHECK_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); } } TEST_CASE("hub.provision_callback_sequence") { ScopedTemporaryDirectory TempDir; struct TransitionRecord { HubInstanceState OldState; HubInstanceState NewState; }; RwLock CaptureMutex; std::vector Transitions; auto CaptureFunc = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) { ZEN_UNUSED(ModuleId); ZEN_UNUSED(Info); CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); }; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc)); HubProvisionedInstanceInfo Info; { const Hub::Response R = HubInstance->Provision("seq_module", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } { const Hub::Response R = HubInstance->Deprovision("seq_module"); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } RwLock::SharedLockScope _(CaptureMutex); REQUIRE_EQ(Transitions.size(), 4u); CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned); CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning); CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning); CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned); CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned); CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning); CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning); CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned); } TEST_CASE("hub.instance_limit") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.InstanceLimit = 2; Config.BasePortNumber = 21500; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; const Hub::Response FirstResult = HubInstance->Provision("limit_a", Info); REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Completed, FirstResult.Message); const Hub::Response SecondResult = HubInstance->Provision("limit_b", Info); REQUIRE_MESSAGE(SecondResult.ResponseCode == Hub::EResponseCode::Completed, SecondResult.Message); CHECK_EQ(HubInstance->GetInstanceCount(), 2); const Hub::Response ThirdResult = HubInstance->Provision("limit_c", Info); CHECK(ThirdResult.ResponseCode == Hub::EResponseCode::Rejected); CHECK_EQ(HubInstance->GetInstanceCount(), 2); CHECK_NE(ThirdResult.Message.find("instance limit"), std::string::npos); HubInstance->Deprovision("limit_a"); CHECK_EQ(HubInstance->GetInstanceCount(), 1); const Hub::Response FourthResult = HubInstance->Provision("limit_d", Info); CHECK_MESSAGE(FourthResult.ResponseCode == Hub::EResponseCode::Completed, FourthResult.Message); CHECK_EQ(HubInstance->GetInstanceCount(), 2); } TEST_CASE("hub.enumerate_modules") { ScopedTemporaryDirectory TempDir; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path()); HubProvisionedInstanceInfo Info; { const Hub::Response R = HubInstance->Provision("enum_a", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } { const Hub::Response R = HubInstance->Provision("enum_b", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } std::vector Ids; int ProvisionedCount = 0; HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { Ids.push_back(std::string(ModuleId)); if (InstanceInfo.State == HubInstanceState::Provisioned) { ProvisionedCount++; } }); CHECK_EQ(Ids.size(), 2u); CHECK_EQ(ProvisionedCount, 2); const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end(); const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end(); CHECK(FoundA); CHECK(FoundB); HubInstance->Deprovision("enum_a"); Ids.clear(); ProvisionedCount = 0; HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { Ids.push_back(std::string(ModuleId)); if (InstanceInfo.State == HubInstanceState::Provisioned) { ProvisionedCount++; } }); REQUIRE_EQ(Ids.size(), 1u); CHECK_EQ(Ids[0], "enum_b"); CHECK_EQ(ProvisionedCount, 1); } TEST_CASE("hub.max_instance_count") { ScopedTemporaryDirectory TempDir; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path()); CHECK_EQ(HubInstance->GetMaxInstanceCount(), 0); HubProvisionedInstanceInfo Info; { const Hub::Response R = HubInstance->Provision("max_a", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_GE(HubInstance->GetMaxInstanceCount(), 1); { const Hub::Response R = HubInstance->Provision("max_b", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_GE(HubInstance->GetMaxInstanceCount(), 2); const int MaxAfterTwo = HubInstance->GetMaxInstanceCount(); HubInstance->Deprovision("max_a"); CHECK_EQ(HubInstance->GetInstanceCount(), 1); CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo); } TEST_CASE("hub.concurrent_callbacks") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.BasePortNumber = 22300; Config.InstanceLimit = 10; hub_testutils::StateChangeCapture CaptureInstance; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, CaptureInstance.CaptureFunc()); constexpr int kHalf = 3; // Serially pre-provision kHalf modules and drain the resulting callbacks before the // concurrent phase so we start with a clean slate. for (int I = 0; I < kHalf; ++I) { HubProvisionedInstanceInfo Info; const Hub::Response ProvR = HubInstance->Provision(fmt::format("pre_{}", I), Info); REQUIRE_MESSAGE(ProvR.ResponseCode == Hub::EResponseCode::Completed, ProvR.Message); } CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); { RwLock::ExclusiveLockScope _(CaptureInstance.CallbackMutex); REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), static_cast(kHalf)); CaptureInstance.ProvisionCallbacks.clear(); } // Concurrently provision kHalf new modules while deprovisioning the pre-provisioned ones. std::vector ProvisionResults(kHalf, 0); std::vector ProvisionReasons(kHalf); std::vector DeprovisionResults(kHalf, 0); { WorkerThreadPool Provisioners(kHalf, "hub_cbtest_provisioners"); WorkerThreadPool Deprovisioneers(kHalf, "hub_cbtest_deprovisioneers"); std::vector> ProvisionFutures(kHalf); std::vector> DeprovisionFutures(kHalf); for (int I = 0; I < kHalf; ++I) { ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task([&, I] { HubProvisionedInstanceInfo Info; const Hub::Response Result = HubInstance->Provision(fmt::format("new_{}", I), Info); ProvisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0; ProvisionReasons[I] = Result.Message; }), WorkerThreadPool::EMode::EnableBacklog); DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task([&, I] { const Hub::Response Result = HubInstance->Deprovision(fmt::format("pre_{}", I)); DeprovisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0; }), WorkerThreadPool::EMode::EnableBacklog); } for (std::future& F : ProvisionFutures) { F.get(); } for (std::future& F : DeprovisionFutures) { F.get(); } } // All operations must have succeeded for (int I = 0; I < kHalf; ++I) { CHECK_MESSAGE(ProvisionResults[I] != 0, ProvisionReasons[I]); CHECK(DeprovisionResults[I] != 0); } CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); // Each new_* module must have triggered exactly one provision callback with a non-zero port. // Each pre_* module must have triggered exactly one deprovision callback with a non-zero port. { RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), static_cast(kHalf)); REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), static_cast(kHalf)); for (const hub_testutils::CallbackRecord& Record : CaptureInstance.ProvisionCallbacks) { CHECK_NE(Record.Port, 0); const bool IsNewModule = Record.ModuleId.rfind("new_", 0) == 0; CHECK_MESSAGE(IsNewModule, Record.ModuleId); } for (const hub_testutils::CallbackRecord& Record : CaptureInstance.DeprovisionCallbacks) { CHECK_NE(Record.Port, 0); const bool IsPreModule = Record.ModuleId.rfind("pre_", 0) == 0; CHECK_MESSAGE(IsPreModule, Record.ModuleId); } } } # if ZEN_PLATFORM_WINDOWS TEST_CASE("hub.job_object") { SUBCASE("UseJobObject=true") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.UseJobObject = true; Config.BasePortNumber = 22100; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; const Hub::Response ProvisionResult = HubInstance->Provision("jobobj_a", Info); REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); const Hub::Response DeprovisionResult = HubInstance->Deprovision("jobobj_a"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); } SUBCASE("UseJobObject=false") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.UseJobObject = false; Config.BasePortNumber = 22200; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; const Hub::Response ProvisionResult = HubInstance->Provision("nojobobj_a", Info); REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); const Hub::Response DeprovisionResult = HubInstance->Deprovision("nojobobj_a"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); } } # endif // ZEN_PLATFORM_WINDOWS TEST_CASE("hub.hibernate_wake") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.BasePortNumber = 22600; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; // Provision { const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } // Hibernate const Hub::Response HibernateResult = HubInstance->Hibernate("hib_a"); REQUIRE_MESSAGE(HibernateResult.ResponseCode == Hub::EResponseCode::Completed, HibernateResult.Message); REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Hibernated); { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } // Wake const Hub::Response WakeResult = HubInstance->Wake("hib_a"); REQUIRE_MESSAGE(WakeResult.ResponseCode == Hub::EResponseCode::Completed, WakeResult.Message); REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } // Deprovision const Hub::Response DeprovisionResult = HubInstance->Deprovision("hib_a"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_FALSE(HubInstance->Find("hib_a")); { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } } TEST_CASE("hub.hibernate_wake_errors") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.BasePortNumber = 22700; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo ProvInfo; // Hibernate/wake on a non-existent module - returns NotFound (-> 404) CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); // Double-hibernate: second hibernate on already-hibernated module returns Completed (idempotent) { const Hub::Response R = HubInstance->Provision("err_b", ProvInfo); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } { const Hub::Response R = HubInstance->Hibernate("err_b"); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } { const Hub::Response HibResp = HubInstance->Hibernate("err_b"); CHECK(HibResp.ResponseCode == Hub::EResponseCode::Completed); } // Wake on provisioned: succeeds (-> Provisioned), then wake again returns Completed (idempotent) { const Hub::Response R = HubInstance->Wake("err_b"); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } { const Hub::Response WakeResp = HubInstance->Wake("err_b"); CHECK(WakeResp.ResponseCode == Hub::EResponseCode::Completed); } // Deprovision not-found - returns NotFound (-> 404) CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); } TEST_CASE("hub.async_hibernate_wake") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.BasePortNumber = 23000; WorkerThreadPool WorkerPool(2, "hub_async_hib_wake"); std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; constexpr auto kPollInterval = std::chrono::milliseconds(200); constexpr auto kTimeout = std::chrono::seconds(30); // Provision and wait until Provisioned { const Hub::Response R = HubInstance->Provision("async_hib_a", ProvInfo); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); } { const auto Deadline = std::chrono::steady_clock::now() + kTimeout; bool Ready = false; while (std::chrono::steady_clock::now() < Deadline) { if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned) { Ready = true; break; } std::this_thread::sleep_for(kPollInterval); } REQUIRE_MESSAGE(Ready, "Instance did not reach Provisioned state within timeout"); } // Hibernate asynchronously and poll until Hibernated { const Hub::Response R = HubInstance->Hibernate("async_hib_a"); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); } { const auto Deadline = std::chrono::steady_clock::now() + kTimeout; bool Hibernated = false; while (std::chrono::steady_clock::now() < Deadline) { if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Hibernated) { Hibernated = true; break; } std::this_thread::sleep_for(kPollInterval); } REQUIRE_MESSAGE(Hibernated, "Instance did not reach Hibernated state within timeout"); } { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } // Wake asynchronously and poll until Provisioned { const Hub::Response R = HubInstance->Wake("async_hib_a"); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); } { const auto Deadline = std::chrono::steady_clock::now() + kTimeout; bool Woken = false; while (std::chrono::steady_clock::now() < Deadline) { if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned) { Woken = true; break; } std::this_thread::sleep_for(kPollInterval); } REQUIRE_MESSAGE(Woken, "Instance did not reach Provisioned state after wake within timeout"); } { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } // Deprovision asynchronously and poll until the instance is gone { const Hub::Response R = HubInstance->Deprovision("async_hib_a"); CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); } REQUIRE_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "async_hib_a"), "Instance did not deprovision within timeout"); } TEST_CASE("hub.recover_process_crash") { ScopedTemporaryDirectory TempDir; struct TransitionRecord { HubInstanceState OldState; HubInstanceState NewState; }; RwLock CaptureMutex; std::vector Transitions; auto CaptureFunc = [&](std::string_view, const HubProvisionedInstanceInfo&, HubInstanceState OldState, HubInstanceState NewState) { CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); }; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc)); HubProvisionedInstanceInfo Info; { const Hub::Response R = HubInstance->Provision("module_a", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } // Kill the child process to simulate a crash, then poll until the watchdog detects it, // recovers the instance, and the new process is serving requests. HubInstance->TerminateModuleForTesting("module_a"); constexpr auto kPollIntervalMs = std::chrono::milliseconds(200); constexpr auto kTimeoutMs = std::chrono::seconds(20); const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs; // A successful HTTP health check on the same port confirms the new process is up. HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); bool Recovered = false; while (std::chrono::steady_clock::now() < Deadline) { std::this_thread::sleep_for(kPollIntervalMs); Hub::InstanceInfo InstanceInfo; if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned && ModClient.Get("/health/")) { // Recovery must reuse the same port - the instance was never removed from the hub's // port table during recovery, so AttemptRecoverInstance reuses m_Config.BasePort. CHECK_EQ(InstanceInfo.Port, Info.Port); Recovered = true; break; } } CHECK_MESSAGE(Recovered, "Instance did not recover within timeout"); // Verify the full crash/recovery callback sequence { RwLock::SharedLockScope _(CaptureMutex); REQUIRE_GE(Transitions.size(), 3u); // Find the Provisioned->Crashed transition const auto CrashedIt = std::find_if(Transitions.begin(), Transitions.end(), [](const TransitionRecord& R) { return R.OldState == HubInstanceState::Provisioned && R.NewState == HubInstanceState::Crashed; }); REQUIRE_NE(CrashedIt, Transitions.end()); // Recovery sequence follows: Crashed->Recovering, Recovering->Provisioned const auto RecoveringIt = CrashedIt + 1; REQUIRE_NE(RecoveringIt, Transitions.end()); CHECK_EQ(RecoveringIt->OldState, HubInstanceState::Crashed); CHECK_EQ(RecoveringIt->NewState, HubInstanceState::Recovering); const auto RecoveredIt = RecoveringIt + 1; REQUIRE_NE(RecoveredIt, Transitions.end()); CHECK_EQ(RecoveredIt->OldState, HubInstanceState::Recovering); CHECK_EQ(RecoveredIt->NewState, HubInstanceState::Provisioned); } } TEST_CASE("hub.recover_process_crash_then_deprovision") { ScopedTemporaryDirectory TempDir; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path()); HubProvisionedInstanceInfo Info; { const Hub::Response R = HubInstance->Provision("module_a", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } // Kill the child process, wait for the watchdog to detect and recover the instance. HubInstance->TerminateModuleForTesting("module_a"); constexpr auto kPollIntervalMs = std::chrono::milliseconds(200); constexpr auto kTimeoutMs = std::chrono::seconds(20); const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs; bool Recovered = false; while (std::chrono::steady_clock::now() < Deadline) { std::this_thread::sleep_for(kPollIntervalMs); Hub::InstanceInfo InstanceInfo; if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned) { Recovered = true; break; } } REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout"); // After recovery, deprovision should succeed and a re-provision should work. { const Hub::Response R = HubInstance->Deprovision("module_a"); CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_EQ(HubInstance->GetInstanceCount(), 0); HubProvisionedInstanceInfo NewInfo; { const Hub::Response R = HubInstance->Provision("module_a", NewInfo); CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_NE(NewInfo.Port, 0); HttpClient NewClient(fmt::format("http://localhost:{}", NewInfo.Port), kFastTimeout); CHECK_MESSAGE(NewClient.Get("/health/"), "Re-provisioned instance is not serving requests"); } TEST_CASE("hub.async_provision_concurrent") { ScopedTemporaryDirectory TempDir; constexpr int kModuleCount = 8; Hub::Configuration Config; Config.BasePortNumber = 22800; Config.InstanceLimit = kModuleCount; WorkerThreadPool WorkerPool(4, "hub_async_concurrent"); std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); std::vector Infos(kModuleCount); std::vector Reasons(kModuleCount); std::vector Results(kModuleCount, 0); { WorkerThreadPool Callers(kModuleCount, "hub_async_callers"); std::vector> Futures(kModuleCount); for (int I = 0; I < kModuleCount; ++I) { Futures[I] = Callers.EnqueueTask(std::packaged_task([&, I] { const Hub::Response Resp = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]); Results[I] = (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? 1 : 0; Reasons[I] = Resp.Message; }), WorkerThreadPool::EMode::EnableBacklog); } for (std::future& F : Futures) { F.get(); } } for (int I = 0; I < kModuleCount; ++I) { REQUIRE_MESSAGE(Results[I] != 0, Reasons[I]); CHECK_NE(Infos[I].Port, 0); } // Poll until all instances reach Provisioned state constexpr auto kPollInterval = std::chrono::milliseconds(200); constexpr auto kTimeout = std::chrono::seconds(30); const auto Deadline = std::chrono::steady_clock::now() + kTimeout; bool AllProvisioned = false; while (std::chrono::steady_clock::now() < Deadline) { int ProvisionedCount = 0; for (int I = 0; I < kModuleCount; ++I) { Hub::InstanceInfo InstanceInfo; if (HubInstance->Find(fmt::format("async_c{}", I), &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned) { ++ProvisionedCount; } } if (ProvisionedCount == kModuleCount) { AllProvisioned = true; break; } std::this_thread::sleep_for(kPollInterval); } CHECK_MESSAGE(AllProvisioned, "Not all instances reached Provisioned state within timeout"); for (int I = 0; I < kModuleCount; ++I) { HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout); CHECK_MESSAGE(ModClient.Get("/health/"), fmt::format("async_c{} not serving requests", I)); } for (int I = 0; I < kModuleCount; ++I) { const Hub::Response DepResp = HubInstance->Deprovision(fmt::format("async_c{}", I)); CHECK_MESSAGE(DepResp.ResponseCode == Hub::EResponseCode::Accepted, DepResp.Message); } REQUIRE_MESSAGE(hub_testutils::WaitForInstanceCount(*HubInstance, 0), "Not all instances deprovisioned within timeout"); } TEST_CASE("hub.async_provision_shutdown_waits") { ScopedTemporaryDirectory TempDir; constexpr int kModuleCount = 8; Hub::Configuration Config; Config.InstanceLimit = kModuleCount; Config.BasePortNumber = 22900; WorkerThreadPool WorkerPool(2, "hub_async_shutdown"); std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); std::vector Infos(kModuleCount); for (int I = 0; I < kModuleCount; ++I) { const Hub::Response ProvResult = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]); REQUIRE_MESSAGE(ProvResult.ResponseCode == Hub::EResponseCode::Accepted, ProvResult.Message); REQUIRE_NE(Infos[I].Port, 0); } // Shut down without polling for Provisioned; Shutdown() must drain the latch and clean up. HubInstance->Shutdown(); CHECK_EQ(HubInstance->GetInstanceCount(), 0); for (int I = 0; I < kModuleCount; ++I) { HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout); CHECK_FALSE(ModClient.Get("/health/")); } } TEST_CASE("hub.async_provision_rejected") { // Rejection from CanProvisionInstance fires synchronously even when a WorkerPool is present. ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.InstanceLimit = 1; Config.BasePortNumber = 23100; WorkerThreadPool WorkerPool(2, "hub_async_rejected"); std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); HubProvisionedInstanceInfo Info; // First provision: dispatched to WorkerPool, returns Accepted const Hub::Response FirstResult = HubInstance->Provision("async_r1", Info); REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Accepted, FirstResult.Message); REQUIRE_NE(Info.Port, 0); // Second provision: CanProvisionInstance rejects synchronously (limit reached), returns Rejected HubProvisionedInstanceInfo Info2; const Hub::Response SecondResult = HubInstance->Provision("async_r2", Info2); CHECK(SecondResult.ResponseCode == Hub::EResponseCode::Rejected); CHECK_FALSE(SecondResult.Message.empty()); CHECK_NE(SecondResult.Message.find("instance limit"), std::string::npos); CHECK_EQ(HubInstance->GetInstanceCount(), 1); } TEST_SUITE_END(); void Hub::TerminateModuleForTesting(const std::string& ModuleId) { RwLock::SharedLockScope _(m_Lock); auto It = m_InstanceLookup.find(ModuleId); if (It == m_InstanceLookup.end()) { return; } StorageServerInstance::SharedLockedPtr Locked = m_ActiveInstances[It->second].Instance->LockShared(/*Wait*/ true); if (Locked) { Locked.TerminateForTesting(); } } void hub_forcelink() { } #endif // ZEN_WITH_TESTS } // namespace zen