diff options
Diffstat (limited to 'src/zenserver/hub/hub.h')
| -rw-r--r-- | src/zenserver/hub/hub.h | 262 |
1 files changed, 220 insertions, 42 deletions
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index 78be3eda1..40d046ce0 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -2,21 +2,28 @@ #pragma once +#include "hubinstancestate.h" #include "resourcemetrics.h" +#include "storageserverinstance.h" +#include <zencore/compactbinary.h> +#include <zencore/filesystem.h> #include <zencore/system.h> #include <zenutil/zenserverprocess.h> +#include <chrono> #include <deque> #include <filesystem> #include <functional> #include <memory> +#include <thread> #include <unordered_map> #include <unordered_set> namespace zen { -class StorageServerInstance; +class HttpClient; +class WorkerThreadPool; /** * Hub @@ -33,104 +40,275 @@ struct HubProvisionedInstanceInfo class Hub { public: + struct WatchDogConfiguration + { + std::chrono::milliseconds CycleInterval = std::chrono::seconds(3); + std::chrono::milliseconds CycleProcessingBudget = std::chrono::milliseconds(500); + std::chrono::milliseconds InstanceCheckThrottle = std::chrono::milliseconds(5); + std::chrono::seconds ProvisionedInactivityTimeout = std::chrono::minutes(10); + std::chrono::seconds HibernatedInactivityTimeout = std::chrono::minutes(30); + std::chrono::seconds InactivityCheckMargin = std::chrono::minutes(1); + + std::chrono::milliseconds ActivityCheckConnectTimeout = std::chrono::milliseconds(100); + std::chrono::milliseconds ActivityCheckRequestTimeout = std::chrono::milliseconds(200); + }; + struct Configuration { /** Enable or disable the use of a Windows Job Object for child process management. * When enabled, all spawned child processes are assigned to a job object with * JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, ensuring children are terminated if the hub - * crashes or is force-killed. Must be called before Initialize(). No-op on non-Windows. + * crashes or is force-killed. */ bool UseJobObject = true; uint16_t BasePortNumber = 21000; int InstanceLimit = 1000; - uint32_t InstanceHttpThreadCount = 0; // Deduce from core count - int InstanceCoreLimit = 0; // Use hardware core count + uint32_t InstanceHttpThreadCount = 0; // Automatic + int InstanceCoreLimit = 0; // Automatic + std::string InstanceMalloc; + std::string InstanceTrace; + std::string InstanceTraceHost; + std::string InstanceTraceFile; std::filesystem::path InstanceConfigPath; + std::string HydrationTargetSpecification; + CbObject HydrationOptions; + + WatchDogConfiguration WatchDog; + + ResourceMetrics ResourceLimits; + + WorkerThreadPool* OptionalProvisionWorkerPool = nullptr; + WorkerThreadPool* OptionalHydrationWorkerPool = nullptr; }; - typedef std::function<void(std::string_view ModuleId, const HubProvisionedInstanceInfo& Info)> ProvisionModuleCallbackFunc; + typedef std::function< + void(std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState)> + AsyncModuleStateChangeCallbackFunc; - Hub(const Configuration& Config, - ZenServerEnvironment&& RunEnvironment, - ProvisionModuleCallbackFunc&& ProvisionedModuleCallback = {}, - ProvisionModuleCallbackFunc&& DeprovisionedModuleCallback = {}); + Hub(const Configuration& Config, + ZenServerEnvironment&& RunEnvironment, + AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback = {}); ~Hub(); Hub(const Hub&) = delete; Hub& operator=(const Hub&) = delete; + struct InstanceInfo + { + HubInstanceState State = HubInstanceState::Unprovisioned; + std::chrono::system_clock::time_point StateChangeTime; + ProcessMetrics Metrics; + uint16_t Port = 0; + }; + + /** + * Deprovision all running instances + */ + void Shutdown(); + + enum class EResponseCode + { + NotFound, + Rejected, + Accepted, + Completed + }; + + struct Response + { + EResponseCode ResponseCode = EResponseCode::Rejected; + std::string Message; + }; + /** * Provision a storage server instance for the given module ID. * * @param ModuleId The ID of the module to provision. - * @param OutInfo If successful, information about the provisioned instance will be returned here. - * @param OutReason If unsuccessful, the reason will be returned here. + * @param OutInfo On success, information about the provisioned instance is returned here. */ - bool Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason); + Response Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo); /** * Deprovision a storage server instance for the given module ID. * * @param ModuleId The ID of the module to deprovision. - * @param OutReason If unsuccessful, the reason will be returned here. - * @return true if the instance was found and deprovisioned, false otherwise. */ - bool Deprovision(const std::string& ModuleId, std::string& OutReason); + Response Deprovision(const std::string& ModuleId); + + /** + * Obliterate a storage server instance and all associated data. + * Shuts down the process, deletes backend hydration data, and cleans local state. + * + * @param ModuleId The ID of the module to obliterate. + */ + Response Obliterate(const std::string& ModuleId); /** - * Find a storage server instance for the given module ID. + * Hibernate a storage server instance for the given module ID. + * The instance is shut down but its data is preserved; it can be woken later. * - * Beware that as this returns a raw pointer to the instance, the caller must ensure - * that the instance is not deprovisioned while in use. + * @param ModuleId The ID of the module to hibernate. + */ + Response Hibernate(const std::string& ModuleId); + + /** + * Wake a hibernated storage server instance for the given module ID. + * + * @param ModuleId The ID of the module to wake. + */ + Response Wake(const std::string& ModuleId); + + /** + * Find info about storage server instance for the given module ID. * * @param ModuleId The ID of the module to find. - * @param OutInstance If found, the instance will be returned here. + * @param OutInstanceInfo If found, the instance info will be returned here. * @return true if the instance was found, false otherwise. */ - bool Find(std::string_view ModuleId, StorageServerInstance** OutInstance = nullptr); + bool Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo = nullptr); /** - * Enumerate all storage server instances. + * Enumerate a snapshot of all storage server instances. * - * @param Callback The callback to invoke for each instance. Note that you should - * not do anything heavyweight in the callback as it is invoked while holding - * a shared lock. + * @param Callback The callback to invoke for each instance. */ - void EnumerateModules(std::function<void(StorageServerInstance&)> Callback); + void EnumerateModules(std::function<void(std::string_view ModuleId, const InstanceInfo&)> Callback); int GetInstanceCount(); - int GetMaxInstanceCount() const { return m_MaxInstanceCount; } + int GetMaxInstanceCount() const { return m_MaxInstanceCount.load(); } + + void GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace) const; + + bool IsInstancePort(uint16_t Port) const; const Configuration& GetConfig() const { return m_Config; } +#if ZEN_WITH_TESTS + void TerminateModuleForTesting(const std::string& ModuleId); +#endif + private: const Configuration m_Config; ZenServerEnvironment m_RunEnvironment; + WorkerThreadPool* m_WorkerPool = nullptr; + Latch m_BackgroundWorkLatch; + std::atomic<bool> m_ShutdownFlag = false; - ProvisionModuleCallbackFunc m_ProvisionedModuleCallback; - ProvisionModuleCallbackFunc m_DeprovisionedModuleCallback; + AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback; - std::filesystem::path m_FileHydrationPath; - std::filesystem::path m_HydrationTempPath; + std::unique_ptr<HydrationBase> m_Hydration; + std::filesystem::path m_HydrationTempPath; #if ZEN_PLATFORM_WINDOWS JobObject m_JobObject; #endif - RwLock m_Lock; - std::unordered_map<std::string, std::unique_ptr<StorageServerInstance>> m_Instances; - std::unordered_set<std::string> m_DeprovisioningModules; - std::unordered_set<std::string> m_ProvisioningModules; - ResourceMetrics m_ResourceLimits; - SystemMetrics m_HostMetrics; - int m_MaxInstanceCount = 0; - std::deque<uint16_t> m_FreePorts; - - void UpdateStats(); - void UpdateCapacityMetrics(); - bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); + mutable RwLock m_Lock; + std::unordered_map<std::string, size_t> m_InstanceLookup; + + // Mirrors ProcessMetrics with atomic fields, enabling lock-free reads alongside watchdog writes. + struct AtomicProcessMetrics + { + std::atomic<uint64_t> MemoryBytes = 0; + std::atomic<uint64_t> KernelTimeMs = 0; + std::atomic<uint64_t> UserTimeMs = 0; + std::atomic<uint64_t> WorkingSetSize = 0; + std::atomic<uint64_t> PeakWorkingSetSize = 0; + std::atomic<uint64_t> PagefileUsage = 0; + std::atomic<uint64_t> PeakPagefileUsage = 0; + + ProcessMetrics Load() const; + void Store(const ProcessMetrics& Metrics); + void Reset(); + }; + + struct ActiveInstance + { + // Invariant: Instance == nullptr if and only if State == Unprovisioned. + // Both fields are only created/destroyed under the hub's exclusive lock. + // State is an atomic because the watchdog reads it under a shared instance lock + // without holding the hub lock. + std::unique_ptr<StorageServerInstance> Instance; + std::atomic<HubInstanceState> State = HubInstanceState::Unprovisioned; + + // Process metrics - written by WatchDog (inside instance shared lock), read lock-free. + AtomicProcessMetrics ProcessMetrics; + + // Activity tracking - written by WatchDog, reset on every state transition. + std::atomic<uint64_t> LastKnownActivitySum = 0; + std::atomic<std::chrono::system_clock::time_point> LastActivityTime = std::chrono::system_clock::time_point::min(); + + // Set in UpdateInstanceStateLocked on every state transition; read lock-free by Find/EnumerateModules. + std::atomic<std::chrono::system_clock::time_point> StateChangeTime = std::chrono::system_clock::time_point::min(); + }; + + // UpdateInstanceState is overloaded to accept a locked instance pointer (exclusive or shared) or the hub exclusive + // lock scope as a proof token that the caller holds an appropriate lock before mutating ActiveInstance::State. + // State mutation and notification (NotifyStateUpdate) are intentionally decoupled - see NotifyStateUpdate below. + + HubInstanceState UpdateInstanceState(const StorageServerInstance::ExclusiveLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState NewState) + { + ZEN_ASSERT(Instance); + return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState); + } + HubInstanceState UpdateInstanceState(const StorageServerInstance::SharedLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState NewState) + { + ZEN_ASSERT(Instance); + return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState); + } + HubInstanceState UpdateInstanceState(const RwLock::ExclusiveLockScope& HubLock, size_t ActiveInstanceIndex, HubInstanceState NewState) + { + ZEN_UNUSED(HubLock); + return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState); + } + HubInstanceState UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState); + + std::vector<ActiveInstance> m_ActiveInstances; + std::deque<size_t> m_FreeActiveInstanceIndexes; + SystemMetrics m_SystemMetrics; + DiskSpace m_DiskSpace; + std::atomic<int> m_MaxInstanceCount = 0; + std::thread m_WatchDog; + std::unordered_set<std::string> m_ObliteratingInstances; + + Event m_WatchDogEvent; + void WatchDog(); + void UpdateMachineMetrics(); + bool CheckInstanceStatus(HttpClient& ActivityHttpClient, + StorageServerInstance::SharedLockedPtr&& LockedInstance, + size_t ActiveInstanceIndex); + void AttemptRecoverInstance(std::string_view ModuleId); + + bool CanProvisionInstanceLocked(std::string_view ModuleId, std::string& OutReason); + uint16_t GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const; + + Response InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate); + void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState OldState, + bool IsNewInstance); + void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + void CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex); + void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + void RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, std::string_view ModuleId); + void ObliterateBackendData(std::string_view ModuleId); + + // Notifications may fire slightly out of sync with the Hub's internal State flag. + // The guarantee is that notifications are sent in the correct order, but the State + // flag may be updated either before or after the notification fires depending on the + // code path. Callers must not assume a specific ordering between the two. + void NotifyStateUpdate(std::string_view ModuleId, + HubInstanceState OldState, + HubInstanceState NewState, + uint16_t BasePort, + std::string_view BaseUri); }; #if ZEN_WITH_TESTS |