// Copyright Epic Games, Inc. All Rights Reserved. #include "hub.h" #include "storageserverinstance.h" #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include # include # include # include #endif #include namespace zen { /////////////////////////////////////////////////////////////////////////// /** * A timeline of events with sequence IDs and timestamps. Used to * track significant events for broadcasting to listeners. */ class EventTimeline { public: EventTimeline() { m_Events.reserve(1024); } ~EventTimeline() {} EventTimeline(const EventTimeline&) = delete; EventTimeline& operator=(const EventTimeline&) = delete; void RecordEvent(std::string_view EventTag, CbObject EventMetadata) { const uint64_t SequenceId = m_NextEventId++; const auto Now = std::chrono::steady_clock::now(); RwLock::ExclusiveLockScope _(m_Lock); m_Events.emplace_back(SequenceId, EventTag, Now, std::move(EventMetadata)); } struct EventRecord { uint64_t SequenceId; std::string Tag; std::chrono::steady_clock::time_point Timestamp; CbObject EventMetadata; EventRecord(uint64_t InSequenceId, std::string_view InTag, std::chrono::steady_clock::time_point InTimestamp, CbObject InEventMetadata = CbObject()) : SequenceId(InSequenceId) , Tag(InTag) , Timestamp(InTimestamp) , EventMetadata(InEventMetadata) { } }; /** * Iterate over events that have a SequenceId greater than SinceEventId * * @param Callback A callable that takes a const EventRecord& * @param SinceEventId The SequenceId to compare against */ void IterateEventsSince(auto&& Callback, uint64_t SinceEventId) { // Hold the lock for as short a time as possible eastl::fixed_vector EventsToProcess; m_Lock.WithSharedLock([&] { for (auto& Event : m_Events) { if (Event.SequenceId > SinceEventId) { EventsToProcess.push_back(Event); } } }); // Now invoke the callback outside the lock for (auto& Event : EventsToProcess) { Callback(Event); } } /** * Trim events up to (and including) the given SequenceId. Intended * to be used for cleaning up events which are not longer interesting. * * @param UpToEventId The SequenceId up to which events should be removed */ void TrimEventsUpTo(uint64_t UpToEventId) { RwLock::ExclusiveLockScope _(m_Lock); auto It = std::remove_if(m_Events.begin(), m_Events.end(), [UpToEventId](const EventRecord& Event) { return Event.SequenceId <= UpToEventId; }); m_Events.erase(It, m_Events.end()); } private: std::atomic m_NextEventId{0}; RwLock m_Lock; std::vector m_Events; }; ////////////////////////////////////////////////////////////////////////// Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, WorkerThreadPool* OptionalWorkerPool, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) : m_Config(Config) , m_RunEnvironment(std::move(RunEnvironment)) , m_WorkerPool(OptionalWorkerPool) , m_BackgroundWorkLatch(1) , m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback)) { m_HostMetrics = GetSystemMetrics(); m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024; if (m_Config.HydrationTargetSpecification.empty()) { std::filesystem::path FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); ZEN_INFO("using file hydration path: '{}'", FileHydrationPath); m_HydrationTargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native())); } else { m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification; } m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp"); ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath); ZEN_ASSERT(uint64_t(Config.BasePortNumber) + Config.InstanceLimit <= std::numeric_limits::max()); m_InstanceLookup.reserve(Config.InstanceLimit); m_ActiveInstances.reserve(Config.InstanceLimit); m_FreePorts.resize(Config.InstanceLimit); std::iota(m_FreePorts.begin(), m_FreePorts.end(), Config.BasePortNumber); #if ZEN_PLATFORM_WINDOWS if (m_Config.UseJobObject) { m_JobObject.Initialize(); if (m_JobObject.IsValid()) { ZEN_INFO("Job object initialized for hub service child process management"); } else { ZEN_WARN("Failed to initialize job object; child processes will not be auto-terminated on hub crash"); } } #endif m_WatchDog = std::thread([this]() { WatchDog(); }); } Hub::~Hub() { try { // Safety call - should normally be properly Shutdown by owner Shutdown(); } catch (const std::exception& e) { ZEN_WARN("Exception during hub service shutdown: {}", e.what()); } } void Hub::Shutdown() { ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); m_WatchDogEvent.Set(); if (m_WatchDog.joinable()) { m_WatchDog.join(); } m_WatchDog = {}; bool Expected = false; bool WaitForBackgroundWork = m_ShutdownFlag.compare_exchange_strong(Expected, true); if (WaitForBackgroundWork && m_WorkerPool) { m_BackgroundWorkLatch.CountDown(); m_BackgroundWorkLatch.Wait(); // Shutdown flag is set and all background work is drained, safe to shut down remaining instances m_BackgroundWorkLatch.Reset(1); } EnumerateModules([&](std::string_view ModuleId, const InstanceInfo& Info) { ZEN_UNUSED(Info); // This might need to be checked to avoid spurious non-relevant warnings... try { const Response DepResp = InternalDeprovision(std::string(ModuleId)); if (DepResp.ResponseCode != EResponseCode::Completed && DepResp.ResponseCode != EResponseCode::Accepted) { ZEN_WARN("Deprovision instance for module '{}' during hub shutdown rejected: {}", ModuleId, DepResp.Message); } } catch (const std::exception& Ex) { ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what()); } }); if (WaitForBackgroundWork && m_WorkerPool) { m_BackgroundWorkLatch.CountDown(); m_BackgroundWorkLatch.Wait(); } } Hub::Response Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) { ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; bool IsNewInstance = false; uint16_t AllocatedPort = 0; { RwLock::ExclusiveLockScope _(m_Lock); auto RestoreAllocatedPort = MakeGuard([this, ModuleId, &IsNewInstance, &AllocatedPort]() { if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) { m_FreePorts.push_back(AllocatedPort); AllocatedPort = 0; } }); if (auto It = m_ProvisioningModules.find(std::string(ModuleId)); It != m_ProvisioningModules.end()) { // Same operation already in flight -- return the already-allocated port. // RestoreAllocatedPort is a no-op here because IsNewInstance is still false // (we return before line 273 where it is set), so no port is double-freed. OutInfo.Port = It->second; return Response{EResponseCode::Accepted}; } if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end()) { std::string Reason; if (!CanProvisionInstance(ModuleId, /* out */ Reason)) { ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason); return Response{EResponseCode::Rejected, Reason}; } IsNewInstance = true; AllocatedPort = m_FreePorts.front(); ZEN_ASSERT(AllocatedPort != 0); m_FreePorts.pop_front(); auto NewInstance = std::make_unique( m_RunEnvironment, StorageServerInstance::Configuration{.BasePort = AllocatedPort, .HydrationTempPath = m_HydrationTempPath, .HydrationTargetSpecification = m_HydrationTargetSpecification, .HttpThreadCount = m_Config.InstanceHttpThreadCount, .CoreLimit = m_Config.InstanceCoreLimit, .ConfigPath = m_Config.InstanceConfigPath}, ModuleId); #if ZEN_PLATFORM_WINDOWS if (m_JobObject.IsValid()) { NewInstance->SetJobObject(&m_JobObject); } #endif Instance = NewInstance->LockExclusive(/*Wait*/ true); size_t ActiveInstanceIndex = (size_t)-1; if (!m_FreeActiveInstanceIndexes.empty()) { ActiveInstanceIndex = m_FreeActiveInstanceIndexes.back(); m_FreeActiveInstanceIndexes.pop_back(); ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); m_ActiveInstances[ActiveInstanceIndex] = std::move(NewInstance); } else { ActiveInstanceIndex = m_ActiveInstances.size(); m_ActiveInstances.emplace_back(std::move(NewInstance)); } ZEN_ASSERT(ActiveInstanceIndex != (size_t)-1); m_InstanceLookup.insert_or_assign(std::string(ModuleId), ActiveInstanceIndex); ZEN_INFO("Created new storage server instance for module '{}'", ModuleId); const int CurrentInstanceCount = gsl::narrow_cast(m_InstanceLookup.size()); int CurrentMaxCount = m_MaxInstanceCount.load(); const int NewMax = Max(CurrentMaxCount, CurrentInstanceCount); m_MaxInstanceCount.compare_exchange_weak(CurrentMaxCount, NewMax); } else { const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); if (m_RecoveringModules.contains(std::string(ModuleId))) { ZEN_WARN("Attempted to provision module '{}' which is currently recovering", ModuleId); return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; } std::unique_ptr& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; ZEN_ASSERT(InstanceRaw); if (InstanceRaw->GetState() == HubInstanceState::Provisioned) { OutInfo.Port = InstanceRaw->GetBasePort(); return Response{EResponseCode::Completed}; } Instance = InstanceRaw->LockExclusive(/*Wait*/ true); AllocatedPort = InstanceRaw->GetBasePort(); } m_ProvisioningModules.emplace(std::string(ModuleId), AllocatedPort); } // NOTE: done while not holding the hub lock, to avoid blocking other operations. // m_ProvisioningModules tracks which modules are being provisioned, blocking // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. ZEN_ASSERT(Instance); if (m_WorkerPool) { m_BackgroundWorkLatch.AddCount(1); try { m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), AllocatedPort, IsNewInstance, Instance = std::make_shared(std::move(Instance))]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { CompleteProvision(*Instance, AllocatedPort, IsNewInstance); } catch (const std::exception& Ex) { ZEN_ERROR("Failed async provision of module '{}': {}", ModuleId, Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& DispatchEx) { ZEN_ERROR("Failed async dispatch provision of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); if (IsNewInstance) { try { AbortProvision(ModuleId); } catch (const std::exception& DestroyEx) { ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what()); } } Instance = {}; { RwLock::ExclusiveLockScope _(m_Lock); m_ProvisioningModules.erase(std::string(ModuleId)); if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) { m_FreePorts.push_back(AllocatedPort); } } throw; } } else { CompleteProvision(Instance, AllocatedPort, IsNewInstance); } OutInfo.Port = AllocatedPort; return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance) { const std::string ModuleId(Instance.GetModuleId()); uint16_t BasePort = Instance.GetBasePort(); std::string BaseUri; // TODO? HubInstanceState OldState = Instance.GetState(); HubInstanceState NewState = OldState; InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); auto RemoveProvisioningModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_ProvisioningModules.erase(std::string(ModuleId)); if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) { m_FreePorts.push_back(AllocatedPort); } }); if (m_ShutdownFlag.load() == false) { try { (void)Instance.Provision(); // false = already in target state (idempotent); not an error NewState = Instance.GetState(); AllocatedPort = 0; Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); } } if (Instance) { NewState = Instance.GetState(); Instance = {}; if (IsNewInstance) { try { AbortProvision(ModuleId); NewState = HubInstanceState::Unprovisioned; } catch (const std::exception& DestroyEx) { ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what()); } } } } void Hub::AbortProvision(std::string_view ModuleId) { std::unique_ptr DestroyInstance; { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) { const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); ZEN_ASSERT(DestroyInstance); ZEN_ASSERT(!m_ActiveInstances[ActiveInstanceIndex]); m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(It); } else { ZEN_WARN("AbortProvision called for unknown module '{}'", ModuleId); } } DestroyInstance.reset(); } Hub::Response Hub::Deprovision(const std::string& ModuleId) { ZEN_ASSERT(!m_ShutdownFlag.load()); return InternalDeprovision(ModuleId); } Hub::Response Hub::InternalDeprovision(const std::string& ModuleId) { std::unique_ptr RawInstance; StorageServerInstance::ExclusiveLockedPtr Instance; { RwLock::ExclusiveLockScope _(m_Lock); if (m_ProvisioningModules.contains(ModuleId)) { ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId); return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently being provisioned", ModuleId)}; } if (m_RecoveringModules.contains(ModuleId)) { ZEN_WARN("Attempted to deprovision module '{}' which is currently recovering", ModuleId); return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; } if (m_DeprovisioningModules.contains(ModuleId)) { return Response{EResponseCode::Accepted}; } if (auto It = m_InstanceLookup.find(ModuleId); It == m_InstanceLookup.end()) { ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); return Response{EResponseCode::NotFound}; } else { const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); RawInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); ZEN_ASSERT(RawInstance != nullptr); m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(It); m_DeprovisioningModules.emplace(ModuleId, RawInstance->GetBasePort()); Instance = RawInstance->LockExclusive(/*Wait*/ true); } } // NOTE: done while not holding the hub lock, to avoid blocking other operations. // m_DeprovisioningModules tracks which modules are being deprovisioned, blocking // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. ZEN_ASSERT(RawInstance); ZEN_ASSERT(Instance); if (m_WorkerPool) { m_BackgroundWorkLatch.AddCount(1); try { m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), Instance = std::make_shared(std::move(Instance)), RawInstance = std::shared_ptr(std::move(RawInstance))]() mutable { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { CompleteDeprovision(*Instance); RawInstance.reset(); } catch (const std::exception& Ex) { ZEN_ERROR("Failed async deprovision of module '{}': {}", ModuleId, Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& DispatchEx) { ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); HubInstanceState OldState = Instance.GetState(); HubInstanceState NewState = OldState; uint16_t BasePort = Instance.GetBasePort(); InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, /*BaseUri*/ {}); // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly. Instance = {}; NewState = HubInstanceState::Unprovisioned; { RwLock::ExclusiveLockScope _(m_Lock); m_DeprovisioningModules.erase(std::string(ModuleId)); m_FreePorts.push_back(BasePort); } throw; } } else { CompleteDeprovision(Instance); RawInstance.reset(); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance) { const std::string ModuleId(Instance.GetModuleId()); uint16_t BasePort = Instance.GetBasePort(); std::string BaseUri; // TODO? HubInstanceState OldState = Instance.GetState(); HubInstanceState NewState = OldState; InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); auto _ = MakeGuard([&] { { RwLock::ExclusiveLockScope _(m_Lock); m_DeprovisioningModules.erase(std::string(ModuleId)); m_FreePorts.push_back(BasePort); } }); try { (void)Instance.Deprovision(); NewState = Instance.GetState(); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to deprovision storage server instance for module '{}': {}", ModuleId, Ex.what()); // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly. NewState = HubInstanceState::Unprovisioned; Instance = {}; throw; } } Hub::Response Hub::Hibernate(const std::string& ModuleId) { ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; { RwLock::ExclusiveLockScope _(m_Lock); if (m_HibernatingModules.contains(ModuleId)) { return Response{EResponseCode::Accepted}; } if (IsModuleInFlightLocked(ModuleId)) { return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)}; } auto It = m_InstanceLookup.find(ModuleId); if (It == m_InstanceLookup.end()) { return Response{EResponseCode::NotFound}; } const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); std::unique_ptr& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; ZEN_ASSERT(InstanceRaw); if (InstanceRaw->GetState() == HubInstanceState::Hibernated) { return Response{EResponseCode::Completed}; } Instance = InstanceRaw->LockExclusive(/*Wait*/ true); m_HibernatingModules.emplace(ModuleId, Instance.GetBasePort()); } // NOTE: done while not holding the hub lock, to avoid blocking other operations. // m_HibernatingModules tracks which modules are being hibernated, blocking // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. ZEN_ASSERT(Instance); // Validate state while holding the exclusive instance lock (state cannot change). // This gives a synchronous error response for invalid-state calls on both the sync // and async paths, matching the existing behaviour. if (Instance.GetState() != HubInstanceState::Provisioned) { const Response HibernateResp = Response{EResponseCode::Rejected, fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState()))}; Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) RwLock::ExclusiveLockScope _(m_Lock); m_HibernatingModules.erase(ModuleId); return HibernateResp; } if (m_WorkerPool) { m_BackgroundWorkLatch.AddCount(1); try { m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), Instance = std::make_shared(std::move(Instance))]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { CompleteHibernate(*Instance); } catch (const std::exception& Ex) { ZEN_ERROR("Failed async hibernate of module '{}': {}", ModuleId, Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& DispatchEx) { // Dispatch failed: undo the latch increment and tracking-set membership. // State has not changed so no callback is needed. // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) ZEN_ERROR("Failed async dispatch hibernate of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); Instance = {}; { RwLock::ExclusiveLockScope _(m_Lock); m_HibernatingModules.erase(std::string(ModuleId)); } throw; } } else { CompleteHibernate(Instance); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance) { const std::string ModuleId(Instance.GetModuleId()); uint16_t BasePort = Instance.GetBasePort(); std::string BaseUri; // TODO? HubInstanceState OldState = Instance.GetState(); HubInstanceState NewState = OldState; InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); auto RemoveHibernatingModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_HibernatingModules.erase(std::string(ModuleId)); }); try { if (!Instance.Hibernate()) { ZEN_WARN("Hibernate returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState())); NewState = Instance.GetState(); Instance = {}; return; } NewState = Instance.GetState(); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", ModuleId, Ex.what()); NewState = Instance.GetState(); Instance = {}; throw; } } Hub::Response Hub::Wake(const std::string& ModuleId) { ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; { RwLock::ExclusiveLockScope _(m_Lock); if (m_WakingModules.contains(ModuleId)) { return Response{EResponseCode::Accepted}; } if (IsModuleInFlightLocked(ModuleId)) { return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)}; } auto It = m_InstanceLookup.find(ModuleId); if (It == m_InstanceLookup.end()) { return Response{EResponseCode::NotFound}; } const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); std::unique_ptr& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; ZEN_ASSERT(InstanceRaw); if (InstanceRaw->GetState() == HubInstanceState::Provisioned) { return Response{EResponseCode::Completed}; } Instance = InstanceRaw->LockExclusive(/*Wait*/ true); m_WakingModules.emplace(ModuleId, Instance.GetBasePort()); } // NOTE: done while not holding the hub lock, to avoid blocking other operations. // m_WakingModules tracks which modules are being woken, blocking // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. ZEN_ASSERT(Instance); // Validate state while holding the exclusive instance lock (state cannot change). // This gives a synchronous error response for invalid-state calls on both the sync // and async paths, matching the existing behaviour. if (Instance.GetState() != HubInstanceState::Hibernated) { const Response WakeResp = Response{EResponseCode::Rejected, fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState()))}; Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) RwLock::ExclusiveLockScope _(m_Lock); m_WakingModules.erase(ModuleId); return WakeResp; } if (m_WorkerPool) { m_BackgroundWorkLatch.AddCount(1); try { m_WorkerPool->ScheduleWork( [this, ModuleId = std::string(ModuleId), Instance = std::make_shared(std::move(Instance))]() { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { CompleteWake(*Instance); } catch (const std::exception& Ex) { ZEN_ERROR("Failed async wake of module '{}': {}", ModuleId, Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& DispatchEx) { // Dispatch failed: undo the latch increment and tracking-set membership. // State has not changed so no callback is needed. // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) ZEN_ERROR("Failed async dispatch wake of module '{}': {}", ModuleId, DispatchEx.what()); m_BackgroundWorkLatch.CountDown(); Instance = {}; { RwLock::ExclusiveLockScope _(m_Lock); m_WakingModules.erase(std::string(ModuleId)); } throw; } } else { CompleteWake(Instance); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance) { const std::string ModuleId(Instance.GetModuleId()); uint16_t BasePort = Instance.GetBasePort(); std::string BaseUri; // TODO? HubInstanceState OldState = Instance.GetState(); HubInstanceState NewState = OldState; InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); auto RemoveWakingModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_WakingModules.erase(std::string(ModuleId)); }); try { if (!Instance.Wake()) { ZEN_WARN("Wake returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState())); NewState = Instance.GetState(); Instance = {}; return; } NewState = Instance.GetState(); Instance = {}; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", ModuleId, Ex.what()); NewState = Instance.GetState(); Instance = {}; throw; } } bool Hub::Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo) { RwLock::SharedLockScope _(m_Lock); if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) { if (OutInstanceInfo) { const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); const std::unique_ptr& Instance = m_ActiveInstances[ActiveInstanceIndex]; ZEN_ASSERT(Instance); InstanceInfo Info{ Instance->GetState(), std::chrono::system_clock::now() // TODO }; Instance->GetProcessMetrics(Info.Metrics); Info.Port = Instance->GetBasePort(); *OutInstanceInfo = Info; } return true; } return false; } void Hub::EnumerateModules(std::function Callback) { std::vector> Infos; { RwLock::SharedLockScope _(m_Lock); for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup) { const std::unique_ptr& Instance = m_ActiveInstances[ActiveInstanceIndex]; ZEN_ASSERT(Instance); InstanceInfo Info{ Instance->GetState(), std::chrono::system_clock::now() // TODO }; Instance->GetProcessMetrics(Info.Metrics); Info.Port = Instance->GetBasePort(); Infos.push_back(std::make_pair(std::string(Instance->GetModuleId()), Info)); } } for (const std::pair& Info : Infos) { Callback(Info.first, Info.second); } } int Hub::GetInstanceCount() { return m_Lock.WithSharedLock([this]() { return gsl::narrow_cast(m_InstanceLookup.size()); }); } void Hub::UpdateCapacityMetrics() { m_HostMetrics = GetSystemMetrics(); // TODO: Should probably go into WatchDog and use atomic for update so it can be read without locks... // Per-instance stats are already refreshed by WatchDog and are readable via the Find and EnumerateModules } void Hub::UpdateStats() { int CurrentInstanceCount = m_Lock.WithSharedLock([this] { return gsl::narrow_cast(m_InstanceLookup.size()); }); int CurrentMaxCount = m_MaxInstanceCount.load(); int NewMax = Max(CurrentMaxCount, CurrentInstanceCount); m_MaxInstanceCount.compare_exchange_weak(CurrentMaxCount, NewMax); } bool Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) { if (m_DeprovisioningModules.contains(std::string(ModuleId))) { OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId); return false; } if (m_ProvisioningModules.contains(std::string(ModuleId))) { OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId); return false; } if (gsl::narrow_cast(m_InstanceLookup.size()) >= m_Config.InstanceLimit) { OutReason = fmt::format("instance limit ({}) exceeded", m_Config.InstanceLimit); return false; } // Since deprovisioning happens outside the lock and we don't return the port until the instance is fully shut down, we might be below // the instance count limit but with no free ports available if (m_FreePorts.empty()) { OutReason = fmt::format("no free ports available, deprovisioning of instances might be in flight ({})", m_Config.InstanceLimit - m_InstanceLookup.size()); return false; } // TODO: handle additional resource metrics return true; } bool Hub::IsModuleInFlightLocked(std::string_view ModuleId) const { const std::string Key(ModuleId); return m_ProvisioningModules.contains(Key) || m_DeprovisioningModules.contains(Key) || m_HibernatingModules.contains(Key) || m_WakingModules.contains(Key) || m_RecoveringModules.contains(Key); } void Hub::AttemptRecoverInstance(std::string_view ModuleId) { StorageServerInstance::ExclusiveLockedPtr Instance; { RwLock::ExclusiveLockScope _(m_Lock); if (IsModuleInFlightLocked(ModuleId)) { return; } auto It = m_InstanceLookup.find(std::string(ModuleId)); if (It == m_InstanceLookup.end()) { return; } const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true); // Definitive check while hub lock is held: exclusive instance lock is also held, // so state cannot change under us. Bail if state changed (e.g. concurrent deprovision) // or the process restarted since the watchdog fired. if (Instance.GetState() != HubInstanceState::Provisioned || Instance.IsRunning()) { return; } m_RecoveringModules.emplace(std::string(ModuleId)); } ZEN_ASSERT(Instance); const uint16_t BasePort = Instance.GetBasePort(); std::string BaseUri; // TODO? HubInstanceState OldState = Instance.GetState(); HubInstanceState NewState = OldState; auto RemoveRecoveringModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_RecoveringModules.erase(std::string(ModuleId)); }); if (Instance.RecoverFromCrash()) { // Spawn succeeded -- instance is back in Provisioned state. NewState = Instance.GetState(); Instance = {}; OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri); return; } // Spawn failed -- instance is now in Crashed state. Dehydrate before tearing down // so any salvageable data is preserved. try { (void)Instance.Deprovision(); } catch (const std::exception& Ex) { ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what()); } Instance = {}; std::unique_ptr DestroyInstance; { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) { const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(It); } m_FreePorts.push_back(BasePort); m_RecoveringModules.erase(std::string(ModuleId)); } RemoveRecoveringModule.Dismiss(); try { DestroyInstance.reset(); NewState = HubInstanceState::Unprovisioned; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to destroy recovered instance for module '{}': {}", ModuleId, Ex.what()); } // Notify after all cleanup -- port is back in m_FreePorts and the callback sees // a consistent end-state: module gone, transition complete. OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri); } void Hub::WatchDog() { constexpr uint64_t WatchDogWakeupTimeMs = 5000; constexpr uint64_t WatchDogProcessingTimeMs = 500; size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0 while (!m_WatchDogEvent.Wait(WatchDogWakeupTimeMs)) { try { // Snapshot slot count. We iterate all slots (including freed nulls) so // round-robin coverage is not skewed by deprovisioned entries. size_t SlotsRemaining = m_Lock.WithSharedLock([this]() { return m_ActiveInstances.size(); }); Stopwatch Timer; bool ShuttingDown = false; while (SlotsRemaining > 0 && Timer.GetElapsedTimeMs() < WatchDogProcessingTimeMs && !ShuttingDown) { StorageServerInstance::SharedLockedPtr LockedInstance; m_Lock.WithSharedLock([this, &CheckInstanceIndex, &LockedInstance, &SlotsRemaining]() { // Advance through null (freed) slots under a single lock acquisition. while (SlotsRemaining > 0) { SlotsRemaining--; CheckInstanceIndex++; if (CheckInstanceIndex >= m_ActiveInstances.size()) { CheckInstanceIndex = 0; } StorageServerInstance* Instance = m_ActiveInstances[CheckInstanceIndex].get(); if (Instance) { LockedInstance = Instance->LockShared(/*Wait*/ false); break; // Found a live slot (locked or busy); stop scanning this batch. } } }); if (!LockedInstance) { // Either all remaining slots were null, or the live slot's lock was busy -- move on. continue; } if (LockedInstance.IsRunning()) { LockedInstance.UpdateMetrics(); } else if (LockedInstance.GetState() == HubInstanceState::Provisioned) { // Process is not running but state says it should be - instance died unexpectedly. const std::string ModuleId(LockedInstance.GetModuleId()); LockedInstance = {}; AttemptRecoverInstance(ModuleId); } // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering, Crashed) - expected, skip. LockedInstance = {}; // Rate-limit: pause briefly between live-instance checks and respond to shutdown. ShuttingDown = m_WatchDogEvent.Wait(5); } } catch (const std::exception& Ex) { // TODO: Catch specific errors such as asserts, OOM, OOD, system_error etc ZEN_ERROR("Hub watchdog threw exception: {}", Ex.what()); } } } void Hub::OnStateUpdate(std::string_view ModuleId, HubInstanceState OldState, HubInstanceState& NewState, uint16_t BasePort, std::string_view BaseUri) { if (m_ModuleStateChangeCallback && OldState != NewState) { try { m_ModuleStateChangeCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = std::string(BaseUri), .Port = BasePort}, OldState, NewState); } catch (const std::exception& Ex) { ZEN_ERROR("Module state change callback for module '{}' failed. Reason: '{}'", ModuleId, Ex.what()); } } } #if ZEN_WITH_TESTS TEST_SUITE_BEGIN("server.hub"); static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::milliseconds(200)}; namespace hub_testutils { ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir) { return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir); } std::unique_ptr MakeHub(const std::filesystem::path& BaseDir, Hub::Configuration Config = {}, Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {}, WorkerThreadPool* WorkerPool = nullptr) { return std::make_unique(Config, MakeHubEnvironment(BaseDir), WorkerPool, std::move(StateChangeCallback)); } struct CallbackRecord { std::string ModuleId; uint16_t Port; }; struct StateChangeCapture { RwLock CallbackMutex; std::vector ProvisionCallbacks; std::vector DeprovisionCallbacks; auto CaptureFunc() { return [this](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState PreviousState, HubInstanceState NewState) { ZEN_UNUSED(PreviousState); if (NewState == HubInstanceState::Provisioned) { CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); } else if (NewState == HubInstanceState::Unprovisioned) { CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); }); } }; } }; } // namespace hub_testutils TEST_CASE("hub.provision_basic") { ScopedTemporaryDirectory TempDir; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path()); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); HubProvisionedInstanceInfo Info; const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info); REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); CHECK_EQ(HubInstance->GetInstanceCount(), 1); Hub::InstanceInfo InstanceInfo; REQUIRE(HubInstance->Find("module_a", &InstanceInfo)); CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned); { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } } TEST_CASE("hub.provision_config") { ScopedTemporaryDirectory TempDir; CreateDirectories(TempDir.Path() / "hub"); std::string LuaConfig = "server = {\n" " buildstore = {\n" " enabled = true,\n" " }\n" "}\n"; WriteFile(TempDir.Path() / "config.lua", IoBuffer(IoBuffer::Wrap, LuaConfig.data(), LuaConfig.length())); std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path() / "hub", Hub::Configuration{.InstanceConfigPath = TempDir.Path() / "config.lua"}); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); HubProvisionedInstanceInfo Info; const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info); REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); CHECK_EQ(HubInstance->GetInstanceCount(), 1); Hub::InstanceInfo InstanceInfo; REQUIRE(HubInstance->Find("module_a", &InstanceInfo)); CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned); HttpClient Client(fmt::format("http://127.0.0.1:{}{}", Info.Port, Info.BaseUri)); HttpClient::Response TestResponse = Client.Get("/status/builds"); CHECK(TestResponse.IsSuccess()); CHECK(TestResponse.AsObject()["ok"].AsBool()); { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } } TEST_CASE("hub.provision_callbacks") { ScopedTemporaryDirectory TempDir; hub_testutils::StateChangeCapture CaptureInstance; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, CaptureInstance.CaptureFunc()); HubProvisionedInstanceInfo Info; const Hub::Response ProvisionResult = HubInstance->Provision("cb_module", Info); REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); { RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u); CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "cb_module"); CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port); CHECK_NE(CaptureInstance.ProvisionCallbacks[0].Port, 0); } { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } const Hub::Response DeprovisionResult = HubInstance->Deprovision("cb_module"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } { RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "cb_module"); CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port); CHECK_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); } } TEST_CASE("hub.instance_limit") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.InstanceLimit = 2; Config.BasePortNumber = 21500; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; const Hub::Response FirstResult = HubInstance->Provision("limit_a", Info); REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Completed, FirstResult.Message); const Hub::Response SecondResult = HubInstance->Provision("limit_b", Info); REQUIRE_MESSAGE(SecondResult.ResponseCode == Hub::EResponseCode::Completed, SecondResult.Message); CHECK_EQ(HubInstance->GetInstanceCount(), 2); const Hub::Response ThirdResult = HubInstance->Provision("limit_c", Info); CHECK(ThirdResult.ResponseCode == Hub::EResponseCode::Rejected); CHECK_EQ(HubInstance->GetInstanceCount(), 2); CHECK_NE(ThirdResult.Message.find("instance limit"), std::string::npos); HubInstance->Deprovision("limit_a"); CHECK_EQ(HubInstance->GetInstanceCount(), 1); const Hub::Response FourthResult = HubInstance->Provision("limit_d", Info); CHECK_MESSAGE(FourthResult.ResponseCode == Hub::EResponseCode::Completed, FourthResult.Message); CHECK_EQ(HubInstance->GetInstanceCount(), 2); } TEST_CASE("hub.enumerate_modules") { ScopedTemporaryDirectory TempDir; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path()); HubProvisionedInstanceInfo Info; { const Hub::Response R = HubInstance->Provision("enum_a", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } { const Hub::Response R = HubInstance->Provision("enum_b", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } std::vector Ids; int ProvisionedCount = 0; HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { Ids.push_back(std::string(ModuleId)); if (InstanceInfo.State == HubInstanceState::Provisioned) { ProvisionedCount++; } }); CHECK_EQ(Ids.size(), 2u); CHECK_EQ(ProvisionedCount, 2); const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end(); const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end(); CHECK(FoundA); CHECK(FoundB); HubInstance->Deprovision("enum_a"); Ids.clear(); ProvisionedCount = 0; HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { Ids.push_back(std::string(ModuleId)); if (InstanceInfo.State == HubInstanceState::Provisioned) { ProvisionedCount++; } }); REQUIRE_EQ(Ids.size(), 1u); CHECK_EQ(Ids[0], "enum_b"); CHECK_EQ(ProvisionedCount, 1); } TEST_CASE("hub.max_instance_count") { ScopedTemporaryDirectory TempDir; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path()); CHECK_EQ(HubInstance->GetMaxInstanceCount(), 0); HubProvisionedInstanceInfo Info; { const Hub::Response R = HubInstance->Provision("max_a", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_GE(HubInstance->GetMaxInstanceCount(), 1); { const Hub::Response R = HubInstance->Provision("max_b", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_GE(HubInstance->GetMaxInstanceCount(), 2); const int MaxAfterTwo = HubInstance->GetMaxInstanceCount(); HubInstance->Deprovision("max_a"); CHECK_EQ(HubInstance->GetInstanceCount(), 1); CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo); } TEST_CASE("hub.concurrent_callbacks") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.BasePortNumber = 22300; Config.InstanceLimit = 10; hub_testutils::StateChangeCapture CaptureInstance; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, CaptureInstance.CaptureFunc()); constexpr int kHalf = 3; // Serially pre-provision kHalf modules and drain the resulting callbacks before the // concurrent phase so we start with a clean slate. for (int I = 0; I < kHalf; ++I) { HubProvisionedInstanceInfo Info; const Hub::Response ProvR = HubInstance->Provision(fmt::format("pre_{}", I), Info); REQUIRE_MESSAGE(ProvR.ResponseCode == Hub::EResponseCode::Completed, ProvR.Message); } CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); { RwLock::ExclusiveLockScope _(CaptureInstance.CallbackMutex); REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), static_cast(kHalf)); CaptureInstance.ProvisionCallbacks.clear(); } // Concurrently provision kHalf new modules while deprovisioning the pre-provisioned ones. std::vector ProvisionResults(kHalf, 0); std::vector ProvisionReasons(kHalf); std::vector DeprovisionResults(kHalf, 0); { WorkerThreadPool Provisioners(kHalf, "hub_cbtest_provisioners"); WorkerThreadPool Deprovisioneers(kHalf, "hub_cbtest_deprovisioneers"); std::vector> ProvisionFutures(kHalf); std::vector> DeprovisionFutures(kHalf); for (int I = 0; I < kHalf; ++I) { ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task([&, I] { HubProvisionedInstanceInfo Info; const Hub::Response Result = HubInstance->Provision(fmt::format("new_{}", I), Info); ProvisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0; ProvisionReasons[I] = Result.Message; }), WorkerThreadPool::EMode::EnableBacklog); DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task([&, I] { const Hub::Response Result = HubInstance->Deprovision(fmt::format("pre_{}", I)); DeprovisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0; }), WorkerThreadPool::EMode::EnableBacklog); } for (std::future& F : ProvisionFutures) { F.get(); } for (std::future& F : DeprovisionFutures) { F.get(); } } // All operations must have succeeded for (int I = 0; I < kHalf; ++I) { CHECK_MESSAGE(ProvisionResults[I] != 0, ProvisionReasons[I]); CHECK(DeprovisionResults[I] != 0); } CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); // Each new_* module must have triggered exactly one provision callback with a non-zero port. // Each pre_* module must have triggered exactly one deprovision callback with a non-zero port. { RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), static_cast(kHalf)); REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), static_cast(kHalf)); for (const hub_testutils::CallbackRecord& Record : CaptureInstance.ProvisionCallbacks) { CHECK_NE(Record.Port, 0); const bool IsNewModule = Record.ModuleId.rfind("new_", 0) == 0; CHECK_MESSAGE(IsNewModule, Record.ModuleId); } for (const hub_testutils::CallbackRecord& Record : CaptureInstance.DeprovisionCallbacks) { CHECK_NE(Record.Port, 0); const bool IsPreModule = Record.ModuleId.rfind("pre_", 0) == 0; CHECK_MESSAGE(IsPreModule, Record.ModuleId); } } } # if ZEN_PLATFORM_WINDOWS TEST_CASE("hub.job_object") { SUBCASE("UseJobObject=true") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.UseJobObject = true; Config.BasePortNumber = 22100; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; const Hub::Response ProvisionResult = HubInstance->Provision("jobobj_a", Info); REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); const Hub::Response DeprovisionResult = HubInstance->Deprovision("jobobj_a"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); } SUBCASE("UseJobObject=false") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.UseJobObject = false; Config.BasePortNumber = 22200; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; const Hub::Response ProvisionResult = HubInstance->Provision("nojobobj_a", Info); REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); const Hub::Response DeprovisionResult = HubInstance->Deprovision("nojobobj_a"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); } } # endif // ZEN_PLATFORM_WINDOWS TEST_CASE("hub.hibernate_wake") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.BasePortNumber = 22600; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; // Provision { const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } // Hibernate const Hub::Response HibernateResult = HubInstance->Hibernate("hib_a"); REQUIRE_MESSAGE(HibernateResult.ResponseCode == Hub::EResponseCode::Completed, HibernateResult.Message); REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Hibernated); { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } // Wake const Hub::Response WakeResult = HubInstance->Wake("hib_a"); REQUIRE_MESSAGE(WakeResult.ResponseCode == Hub::EResponseCode::Completed, WakeResult.Message); REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } // Deprovision const Hub::Response DeprovisionResult = HubInstance->Deprovision("hib_a"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_FALSE(HubInstance->Find("hib_a")); { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } } TEST_CASE("hub.hibernate_wake_errors") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.BasePortNumber = 22700; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo ProvInfo; // Hibernate/wake on a non-existent module - returns NotFound (-> 404) CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); // Double-hibernate: second hibernate on already-hibernated module returns Completed (idempotent) { const Hub::Response R = HubInstance->Provision("err_b", ProvInfo); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } { const Hub::Response R = HubInstance->Hibernate("err_b"); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } { const Hub::Response HibResp = HubInstance->Hibernate("err_b"); CHECK(HibResp.ResponseCode == Hub::EResponseCode::Completed); } // Wake on provisioned: succeeds (-> Provisioned), then wake again returns Completed (idempotent) { const Hub::Response R = HubInstance->Wake("err_b"); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } { const Hub::Response WakeResp = HubInstance->Wake("err_b"); CHECK(WakeResp.ResponseCode == Hub::EResponseCode::Completed); } // Deprovision not-found - returns NotFound (-> 404) CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); } TEST_CASE("hub.async_hibernate_wake") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.BasePortNumber = 23000; WorkerThreadPool WorkerPool(2, "hub_async_hib_wake"); std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; constexpr auto kPollInterval = std::chrono::milliseconds(200); constexpr auto kTimeout = std::chrono::seconds(30); // Provision and wait until Provisioned { const Hub::Response R = HubInstance->Provision("async_hib_a", ProvInfo); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); } { const auto Deadline = std::chrono::steady_clock::now() + kTimeout; bool Ready = false; while (std::chrono::steady_clock::now() < Deadline) { if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned) { Ready = true; break; } std::this_thread::sleep_for(kPollInterval); } REQUIRE_MESSAGE(Ready, "Instance did not reach Provisioned state within timeout"); } // Hibernate asynchronously and poll until Hibernated { const Hub::Response R = HubInstance->Hibernate("async_hib_a"); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); } { const auto Deadline = std::chrono::steady_clock::now() + kTimeout; bool Hibernated = false; while (std::chrono::steady_clock::now() < Deadline) { if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Hibernated) { Hibernated = true; break; } std::this_thread::sleep_for(kPollInterval); } REQUIRE_MESSAGE(Hibernated, "Instance did not reach Hibernated state within timeout"); } { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } // Wake asynchronously and poll until Provisioned { const Hub::Response R = HubInstance->Wake("async_hib_a"); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); } { const auto Deadline = std::chrono::steady_clock::now() + kTimeout; bool Woken = false; while (std::chrono::steady_clock::now() < Deadline) { if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned) { Woken = true; break; } std::this_thread::sleep_for(kPollInterval); } REQUIRE_MESSAGE(Woken, "Instance did not reach Provisioned state after wake within timeout"); } { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); CHECK(ModClient.Get("/health/")); } // Deprovision { const Hub::Response R = HubInstance->Deprovision("async_hib_a"); CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); } CHECK_FALSE(HubInstance->Find("async_hib_a")); } TEST_CASE("hub.recover_process_crash") { ScopedTemporaryDirectory TempDir; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path()); HubProvisionedInstanceInfo Info; { const Hub::Response R = HubInstance->Provision("module_a", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } // Kill the child process to simulate a crash, then poll until the watchdog detects it, // recovers the instance, and the new process is serving requests. HubInstance->TerminateModuleForTesting("module_a"); constexpr auto kPollIntervalMs = std::chrono::milliseconds(200); constexpr auto kTimeoutMs = std::chrono::seconds(20); const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs; // A successful HTTP health check on the same port confirms the new process is up. HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); bool Recovered = false; while (std::chrono::steady_clock::now() < Deadline) { std::this_thread::sleep_for(kPollIntervalMs); Hub::InstanceInfo InstanceInfo; if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned && ModClient.Get("/health/")) { // Recovery must reuse the same port - the instance was never removed from the hub's // port table during recovery, so AttemptRecoverInstance reuses m_Config.BasePort. CHECK_EQ(InstanceInfo.Port, Info.Port); Recovered = true; break; } } CHECK_MESSAGE(Recovered, "Instance did not recover within timeout"); } TEST_CASE("hub.recover_process_crash_then_deprovision") { ScopedTemporaryDirectory TempDir; std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path()); HubProvisionedInstanceInfo Info; { const Hub::Response R = HubInstance->Provision("module_a", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } // Kill the child process, wait for the watchdog to detect and recover the instance. HubInstance->TerminateModuleForTesting("module_a"); constexpr auto kPollIntervalMs = std::chrono::milliseconds(200); constexpr auto kTimeoutMs = std::chrono::seconds(20); const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs; bool Recovered = false; while (std::chrono::steady_clock::now() < Deadline) { std::this_thread::sleep_for(kPollIntervalMs); Hub::InstanceInfo InstanceInfo; if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned) { Recovered = true; break; } } REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout"); // After recovery, deprovision should succeed and a re-provision should work. { const Hub::Response R = HubInstance->Deprovision("module_a"); CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_EQ(HubInstance->GetInstanceCount(), 0); HubProvisionedInstanceInfo NewInfo; { const Hub::Response R = HubInstance->Provision("module_a", NewInfo); CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_NE(NewInfo.Port, 0); HttpClient NewClient(fmt::format("http://localhost:{}", NewInfo.Port), kFastTimeout); CHECK_MESSAGE(NewClient.Get("/health/"), "Re-provisioned instance is not serving requests"); } TEST_CASE("hub.async_provision_concurrent") { ScopedTemporaryDirectory TempDir; constexpr int kModuleCount = 8; Hub::Configuration Config; Config.BasePortNumber = 22800; Config.InstanceLimit = kModuleCount; WorkerThreadPool WorkerPool(4, "hub_async_concurrent"); std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); std::vector Infos(kModuleCount); std::vector Reasons(kModuleCount); std::vector Results(kModuleCount, 0); { WorkerThreadPool Callers(kModuleCount, "hub_async_callers"); std::vector> Futures(kModuleCount); for (int I = 0; I < kModuleCount; ++I) { Futures[I] = Callers.EnqueueTask(std::packaged_task([&, I] { const Hub::Response Resp = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]); Results[I] = (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? 1 : 0; Reasons[I] = Resp.Message; }), WorkerThreadPool::EMode::EnableBacklog); } for (std::future& F : Futures) { F.get(); } } for (int I = 0; I < kModuleCount; ++I) { REQUIRE_MESSAGE(Results[I] != 0, Reasons[I]); CHECK_NE(Infos[I].Port, 0); } // Poll until all instances reach Provisioned state constexpr auto kPollInterval = std::chrono::milliseconds(200); constexpr auto kTimeout = std::chrono::seconds(30); const auto Deadline = std::chrono::steady_clock::now() + kTimeout; bool AllProvisioned = false; while (std::chrono::steady_clock::now() < Deadline) { int ProvisionedCount = 0; for (int I = 0; I < kModuleCount; ++I) { Hub::InstanceInfo InstanceInfo; if (HubInstance->Find(fmt::format("async_c{}", I), &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned) { ++ProvisionedCount; } } if (ProvisionedCount == kModuleCount) { AllProvisioned = true; break; } std::this_thread::sleep_for(kPollInterval); } CHECK_MESSAGE(AllProvisioned, "Not all instances reached Provisioned state within timeout"); for (int I = 0; I < kModuleCount; ++I) { HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout); CHECK_MESSAGE(ModClient.Get("/health/"), fmt::format("async_c{} not serving requests", I)); } for (int I = 0; I < kModuleCount; ++I) { const Hub::Response DepResp = HubInstance->Deprovision(fmt::format("async_c{}", I)); CHECK_MESSAGE(DepResp.ResponseCode == Hub::EResponseCode::Accepted, DepResp.Message); } CHECK_EQ(HubInstance->GetInstanceCount(), 0); } TEST_CASE("hub.async_provision_shutdown_waits") { ScopedTemporaryDirectory TempDir; constexpr int kModuleCount = 8; Hub::Configuration Config; Config.InstanceLimit = kModuleCount; Config.BasePortNumber = 22900; WorkerThreadPool WorkerPool(2, "hub_async_shutdown"); std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); std::vector Infos(kModuleCount); for (int I = 0; I < kModuleCount; ++I) { const Hub::Response ProvResult = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]); REQUIRE_MESSAGE(ProvResult.ResponseCode == Hub::EResponseCode::Accepted, ProvResult.Message); REQUIRE_NE(Infos[I].Port, 0); } // Shut down without polling for Provisioned; Shutdown() must drain the latch and clean up. HubInstance->Shutdown(); CHECK_EQ(HubInstance->GetInstanceCount(), 0); for (int I = 0; I < kModuleCount; ++I) { HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout); CHECK_FALSE(ModClient.Get("/health/")); } } TEST_CASE("hub.async_provision_rejected") { // Rejection from CanProvisionInstance fires synchronously even when a WorkerPool is present. ScopedTemporaryDirectory TempDir; Hub::Configuration Config; Config.InstanceLimit = 1; Config.BasePortNumber = 23100; WorkerThreadPool WorkerPool(2, "hub_async_rejected"); std::unique_ptr HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); HubProvisionedInstanceInfo Info; // First provision: dispatched to WorkerPool, returns Accepted const Hub::Response FirstResult = HubInstance->Provision("async_r1", Info); REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Accepted, FirstResult.Message); REQUIRE_NE(Info.Port, 0); // Second provision: CanProvisionInstance rejects synchronously (limit reached), returns Rejected HubProvisionedInstanceInfo Info2; const Hub::Response SecondResult = HubInstance->Provision("async_r2", Info2); CHECK(SecondResult.ResponseCode == Hub::EResponseCode::Rejected); CHECK_FALSE(SecondResult.Message.empty()); CHECK_NE(SecondResult.Message.find("instance limit"), std::string::npos); CHECK_EQ(HubInstance->GetInstanceCount(), 1); } TEST_SUITE_END(); void Hub::TerminateModuleForTesting(const std::string& ModuleId) { RwLock::SharedLockScope _(m_Lock); auto It = m_InstanceLookup.find(ModuleId); if (It == m_InstanceLookup.end()) { return; } StorageServerInstance::SharedLockedPtr Locked = m_ActiveInstances[It->second]->LockShared(/*Wait*/ true); if (Locked) { Locked.TerminateForTesting(); } } void hub_forcelink() { } #endif // ZEN_WITH_TESTS } // namespace zen