// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include "hubinstancestate.h" #include "hydration.h" #include "resourcemetrics.h" #include "storageserverinstance.h" #include #include #include #include #include #include #include #include #include #include #include #include namespace zen { class HttpClient; struct HttpClientSettings; class HttpClientShare; class WorkerThreadPool; /** * Hub * * Core logic for managing storage server instances on behalf of external clients. */ struct HubProvisionedInstanceInfo { std::string BaseUri; uint16_t Port; }; class Hub { public: struct WatchDogConfiguration { std::chrono::milliseconds CycleInterval = std::chrono::seconds(3); std::chrono::milliseconds CycleProcessingBudget = std::chrono::milliseconds(500); std::chrono::milliseconds InstanceCheckThrottle = std::chrono::milliseconds(5); std::chrono::seconds ProvisionedInactivityTimeout = std::chrono::minutes(10); std::chrono::seconds HibernatedInactivityTimeout = std::chrono::minutes(30); std::chrono::seconds InactivityCheckMargin = std::chrono::minutes(1); std::chrono::milliseconds ActivityCheckConnectTimeout = std::chrono::milliseconds(100); std::chrono::milliseconds ActivityCheckRequestTimeout = std::chrono::milliseconds(200); }; struct Configuration { /** Enable or disable the use of a Windows Job Object for child process management. * When enabled, all spawned child processes are assigned to a job object with * JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, ensuring children are terminated if the hub * crashes or is force-killed. */ bool UseJobObject = true; uint16_t BasePortNumber = 21000; int InstanceLimit = 1000; uint32_t InstanceHttpThreadCount = 0; // Automatic int InstanceCoreLimit = 0; // Automatic std::string InstanceMalloc; std::string InstanceTrace; std::string InstanceTraceHost; std::string InstanceTraceFile; std::filesystem::path InstanceConfigPath; std::string HydrationTargetSpecification; CbObject HydrationOptions; bool EnableHydration = true; bool EnableDehydration = true; bool HydrationPackEnabled = true; uint64_t HydrationPackThresholdBytes = DefaultPackThresholdBytes; uint64_t HydrationMaxPackBytes = DefaultMaxPackBytes; // Route S3 hydration through AsyncHttpClient. false falls back to the // blocking S3Client path. bool HydrationAsyncEnabled = true; // Hub-wide cap on concurrent S3 hydration requests (sizes the shared // AsyncHttpClient connection pool and the admission semaphore). Only // consulted when HydrationAsyncEnabled. uint32_t HydrationAsyncMaxConcurrentRequests = 128; WatchDogConfiguration WatchDog; ResourceMetrics ResourceLimits; WorkerThreadPool* OptionalProvisionPool = nullptr; WorkerThreadPool* OptionalSpawnPool = nullptr; WorkerThreadPool* OptionalHydrationPool = nullptr; }; typedef std::function< void(std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState)> AsyncModuleStateChangeCallbackFunc; Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback = {}); ~Hub(); Hub(const Hub&) = delete; Hub& operator=(const Hub&) = delete; struct InstanceInfo { HubInstanceState State = HubInstanceState::Unprovisioned; std::chrono::system_clock::time_point StateChangeTime; ProcessMetrics Metrics; uint16_t Port = 0; }; /** * Deprovision all running instances */ void Shutdown(); enum class EResponseCode { NotFound, Rejected, Accepted, Completed }; struct Response { EResponseCode ResponseCode = EResponseCode::Rejected; std::string Message; }; /** * Provision a storage server instance for the given module ID. * * @param ModuleId The ID of the module to provision. * @param OutInfo On success, information about the provisioned instance is returned here. */ Response Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo); /** * Deprovision a storage server instance for the given module ID. * * @param ModuleId The ID of the module to deprovision. */ Response Deprovision(const std::string& ModuleId); /** * Obliterate a storage server instance and all associated data. * Shuts down the process, deletes backend hydration data, and cleans local state. * * @param ModuleId The ID of the module to obliterate. */ Response Obliterate(const std::string& ModuleId); /** * Hibernate a storage server instance for the given module ID. * The instance is shut down but its data is preserved; it can be woken later. * * @param ModuleId The ID of the module to hibernate. */ Response Hibernate(const std::string& ModuleId); /** * Wake a hibernated storage server instance for the given module ID. * * @param ModuleId The ID of the module to wake. */ Response Wake(const std::string& ModuleId); /** * Find info about storage server instance for the given module ID. * * @param ModuleId The ID of the module to find. * @param OutInstanceInfo If found, the instance info will be returned here. * @return true if the instance was found, false otherwise. */ bool Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo = nullptr); /** * Enumerate a snapshot of all storage server instances. * * @param Callback The callback to invoke for each instance. */ void EnumerateModules(std::function Callback); int GetInstanceCount(); int GetMaxInstanceCount() const { return m_MaxInstanceCount.load(); } void GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace) const; bool IsInstancePort(uint16_t Port) const; const Configuration& GetConfig() const { return m_Config; } /** * Construct a sync HttpClient targeting a hub-managed instance on * localhost. Routes through m_InstanceClientShare so DNS and keep-alive * TCP connections are reused across all hub->instance traffic, avoiding * Windows ephemeral-port exhaustion under provision/deprovision churn. * Settings.OptionalShare is unconditionally overwritten. */ HttpClient MakeInstanceClient(uint16_t Port, HttpClientSettings Settings); #if ZEN_WITH_TESTS void TerminateModuleForTesting(const std::string& ModuleId); #endif private: const Configuration m_Config; ZenServerEnvironment m_RunEnvironment; WorkerThreadPool* m_ProvisionPool = nullptr; WorkerThreadPool* m_SpawnPool = nullptr; // Declared early so it destructs late: every HttpClient referencing the // share (watchdog ActivityCheckClient, GC client, in-flight worker // tasks) is required to be gone before this member runs its dtor. // Hub::Shutdown enforces that by joining the watchdog and draining // background work before Hub destruction begins. std::unique_ptr m_InstanceClientShare; Latch m_BackgroundWorkLatch; std::atomic m_ShutdownFlag = false; AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback; std::unique_ptr m_Hydration; std::filesystem::path m_HydrationTempPath; #if ZEN_PLATFORM_WINDOWS JobObject m_JobObject; #endif mutable RwLock m_Lock; std::unordered_map m_InstanceLookup; // Mirrors ProcessMetrics with atomic fields, enabling lock-free reads alongside watchdog writes. struct AtomicProcessMetrics { std::atomic MemoryBytes = 0; std::atomic KernelTimeMs = 0; std::atomic UserTimeMs = 0; std::atomic WorkingSetSize = 0; std::atomic PeakWorkingSetSize = 0; std::atomic PagefileUsage = 0; std::atomic PeakPagefileUsage = 0; ProcessMetrics Load() const; void Store(const ProcessMetrics& Metrics); void Reset(); }; struct ActiveInstance { // Invariant: Instance == nullptr if and only if State == Unprovisioned. // Both fields are only created/destroyed under the hub's exclusive lock. // State is an atomic because the watchdog reads it under a shared instance lock // without holding the hub lock. std::unique_ptr Instance; std::atomic State = HubInstanceState::Unprovisioned; // Process metrics - written by WatchDog (inside instance shared lock), read lock-free. AtomicProcessMetrics ProcessMetrics; // Activity tracking - written by WatchDog, reset on every state transition. std::atomic LastKnownActivitySum = 0; std::atomic LastActivityTime = std::chrono::system_clock::time_point::min(); // Set in UpdateInstanceStateLocked on every state transition; read lock-free by Find/EnumerateModules. std::atomic StateChangeTime = std::chrono::system_clock::time_point::min(); // Cached hydration state returned by Hydrate, consumed by Dehydrate. Synchronized // by the caller's StorageServerInstance ExclusiveLockedPtr (HydrateInstance and // DehydrateInstance run under it); RemoveInstance also writes under the Hub // exclusive lock when finalizing slot teardown. CbObject HydrationState; }; // UpdateInstanceState is overloaded to accept a locked instance pointer (exclusive or shared) or the hub exclusive // lock scope as a proof token that the caller holds an appropriate lock before mutating ActiveInstance::State. // State mutation and notification (NotifyStateUpdate) are intentionally decoupled - see NotifyStateUpdate below. HubInstanceState UpdateInstanceState(const StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState NewState) { ZEN_ASSERT(Instance); return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState); } HubInstanceState UpdateInstanceState(const StorageServerInstance::SharedLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState NewState) { ZEN_ASSERT(Instance); return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState); } HubInstanceState UpdateInstanceState(const RwLock::ExclusiveLockScope& HubLock, size_t ActiveInstanceIndex, HubInstanceState NewState) { ZEN_UNUSED(HubLock); return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState); } HubInstanceState UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState); std::vector m_ActiveInstances; std::deque m_FreeActiveInstanceIndexes; SystemMetrics m_SystemMetrics; DiskSpace m_DiskSpace; std::atomic m_MaxInstanceCount = 0; std::thread m_WatchDog; std::unordered_set m_ObliteratingInstances; Event m_WatchDogEvent; void WatchDog(); void UpdateMachineMetrics(); bool CheckInstanceStatus(HttpClient& ActivityHttpClient, StorageServerInstance::SharedLockedPtr&& LockedInstance, size_t ActiveInstanceIndex); void AttemptRecoverInstance(std::string_view ModuleId); bool CanProvisionInstanceLocked(std::string_view ModuleId, std::string& OutReason); uint16_t GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const; Response InternalDeprovision(const std::string& ModuleId, std::function&& DeprovisionGate); void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState, bool IsNewInstance); void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); void CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex); void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); // Provision/Deprovision/Obliterate are split into two phases scheduled on different worker // pools. The Phase1/Phase2 helpers are shared between sync and async code paths so behavior // cannot diverge between them. bool RunProvisionPhase1(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState, bool IsNewInstance, uint16_t Port); void RunProvisionPhase2(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState, bool IsNewInstance, uint16_t Port); void RollbackFailedProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState, bool IsNewInstance, uint16_t Port); void RunDeprovisionPhase1(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState, uint16_t Port); void RunDeprovisionPhase2(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, uint16_t Port); void RunObliteratePhase1(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, uint16_t Port); void RunObliteratePhase2(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, uint16_t Port); void RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, std::string_view ModuleId); HydrationConfig MakeHydrationConfigForModule(std::string_view ModuleId, std::atomic& AbortFlag, std::atomic& PauseFlag) const; void HydrateInstance(size_t ActiveInstanceIndex, std::string_view ModuleId); void DehydrateInstance(size_t ActiveInstanceIndex, std::string_view ModuleId); void ObliterateBackendData(std::string_view ModuleId); // Notifications may fire slightly out of sync with the Hub's internal State flag. // The guarantee is that notifications are sent in the correct order, but the State // flag may be updated either before or after the notification fires depending on the // code path. Callers must not assume a specific ordering between the two. void NotifyStateUpdate(std::string_view ModuleId, HubInstanceState OldState, HubInstanceState NewState, uint16_t BasePort, std::string_view BaseUri); }; #if ZEN_WITH_TESTS void hub_forcelink(); #endif // ZEN_WITH_TESTS } // namespace zen