diff options
Diffstat (limited to 'src/zenserver/hub/hub.h')
| -rw-r--r-- | src/zenserver/hub/hub.h | 234 |
1 files changed, 166 insertions, 68 deletions
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index 28e77e729..40d046ce0 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -4,10 +4,14 @@ #include "hubinstancestate.h" #include "resourcemetrics.h" +#include "storageserverinstance.h" +#include <zencore/compactbinary.h> +#include <zencore/filesystem.h> #include <zencore/system.h> #include <zenutil/zenserverprocess.h> +#include <chrono> #include <deque> #include <filesystem> #include <functional> @@ -18,7 +22,8 @@ namespace zen { -class StorageServerInstance; +class HttpClient; +class WorkerThreadPool; /** * Hub @@ -35,6 +40,19 @@ struct HubProvisionedInstanceInfo class Hub { public: + struct WatchDogConfiguration + { + std::chrono::milliseconds CycleInterval = std::chrono::seconds(3); + std::chrono::milliseconds CycleProcessingBudget = std::chrono::milliseconds(500); + std::chrono::milliseconds InstanceCheckThrottle = std::chrono::milliseconds(5); + std::chrono::seconds ProvisionedInactivityTimeout = std::chrono::minutes(10); + std::chrono::seconds HibernatedInactivityTimeout = std::chrono::minutes(30); + std::chrono::seconds InactivityCheckMargin = std::chrono::minutes(1); + + std::chrono::milliseconds ActivityCheckConnectTimeout = std::chrono::milliseconds(100); + std::chrono::milliseconds ActivityCheckRequestTimeout = std::chrono::milliseconds(200); + }; + struct Configuration { /** Enable or disable the use of a Windows Job Object for child process management. @@ -49,8 +67,20 @@ public: uint32_t InstanceHttpThreadCount = 0; // Automatic int InstanceCoreLimit = 0; // Automatic + std::string InstanceMalloc; + std::string InstanceTrace; + std::string InstanceTraceHost; + std::string InstanceTraceFile; std::filesystem::path InstanceConfigPath; std::string HydrationTargetSpecification; + CbObject HydrationOptions; + + WatchDogConfiguration WatchDog; + + ResourceMetrics ResourceLimits; + + WorkerThreadPool* OptionalProvisionWorkerPool = nullptr; + WorkerThreadPool* OptionalHydrationWorkerPool = nullptr; }; typedef std::function< @@ -68,7 +98,7 @@ public: struct InstanceInfo { HubInstanceState State = HubInstanceState::Unprovisioned; - std::chrono::system_clock::time_point ProvisionTime; + std::chrono::system_clock::time_point StateChangeTime; ProcessMetrics Metrics; uint16_t Port = 0; }; @@ -78,42 +108,57 @@ public: */ void Shutdown(); + enum class EResponseCode + { + NotFound, + Rejected, + Accepted, + Completed + }; + + struct Response + { + EResponseCode ResponseCode = EResponseCode::Rejected; + std::string Message; + }; + /** * Provision a storage server instance for the given module ID. * * @param ModuleId The ID of the module to provision. - * @param OutInfo If successful, information about the provisioned instance will be returned here. - * @param OutReason If unsuccessful, the reason will be returned here. + * @param OutInfo On success, information about the provisioned instance is returned here. */ - bool Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason); + Response Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo); /** * Deprovision a storage server instance for the given module ID. * * @param ModuleId The ID of the module to deprovision. - * @param OutReason If unsuccessful, the reason will be returned here. - * @return true if the instance was found and deprovisioned, false otherwise. */ - bool Deprovision(const std::string& ModuleId, std::string& OutReason); + Response Deprovision(const std::string& ModuleId); + + /** + * Obliterate a storage server instance and all associated data. + * Shuts down the process, deletes backend hydration data, and cleans local state. + * + * @param ModuleId The ID of the module to obliterate. + */ + Response Obliterate(const std::string& ModuleId); /** * Hibernate a storage server instance for the given module ID. * The instance is shut down but its data is preserved; it can be woken later. * * @param ModuleId The ID of the module to hibernate. - * @param OutReason If unsuccessful, the reason will be returned here (empty = not found). - * @return true if the instance was hibernated, false otherwise. */ - bool Hibernate(const std::string& ModuleId, std::string& OutReason); + Response Hibernate(const std::string& ModuleId); /** * Wake a hibernated storage server instance for the given module ID. * * @param ModuleId The ID of the module to wake. - * @param OutReason If unsuccessful, the reason will be returned here (empty = not found). - * @return true if the instance was woken, false otherwise. */ - bool Wake(const std::string& ModuleId, std::string& OutReason); + Response Wake(const std::string& ModuleId); /** * Find info about storage server instance for the given module ID. @@ -135,6 +180,10 @@ public: int GetMaxInstanceCount() const { return m_MaxInstanceCount.load(); } + void GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace) const; + + bool IsInstancePort(uint16_t Port) const; + const Configuration& GetConfig() const { return m_Config; } #if ZEN_WITH_TESTS @@ -144,73 +193,122 @@ public: private: const Configuration m_Config; ZenServerEnvironment m_RunEnvironment; + WorkerThreadPool* m_WorkerPool = nullptr; + Latch m_BackgroundWorkLatch; + std::atomic<bool> m_ShutdownFlag = false; AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback; - std::string m_HydrationTargetSpecification; - std::filesystem::path m_HydrationTempPath; + std::unique_ptr<HydrationBase> m_Hydration; + std::filesystem::path m_HydrationTempPath; #if ZEN_PLATFORM_WINDOWS JobObject m_JobObject; #endif - RwLock m_Lock; - std::unordered_map<std::string, size_t> m_InstanceLookup; - std::unordered_set<std::string> m_DeprovisioningModules; - std::unordered_set<std::string> m_ProvisioningModules; - std::unordered_set<std::string> m_HibernatingModules; - std::unordered_set<std::string> m_WakingModules; - std::unordered_set<std::string> m_RecoveringModules; - std::vector<std::unique_ptr<StorageServerInstance>> m_ActiveInstances; - std::vector<size_t> m_FreeActiveInstanceIndexes; - ResourceMetrics m_ResourceLimits; - SystemMetrics m_HostMetrics; - std::atomic<int> m_MaxInstanceCount = 0; - std::deque<uint16_t> m_FreePorts; - std::thread m_WatchDog; - - Event m_WatchDogEvent; - void WatchDog(); - void AttemptRecoverInstance(std::string_view ModuleId); + mutable RwLock m_Lock; + std::unordered_map<std::string, size_t> m_InstanceLookup; - void UpdateStats(); - void UpdateCapacityMetrics(); - bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); + // Mirrors ProcessMetrics with atomic fields, enabling lock-free reads alongside watchdog writes. + struct AtomicProcessMetrics + { + std::atomic<uint64_t> MemoryBytes = 0; + std::atomic<uint64_t> KernelTimeMs = 0; + std::atomic<uint64_t> UserTimeMs = 0; + std::atomic<uint64_t> WorkingSetSize = 0; + std::atomic<uint64_t> PeakWorkingSetSize = 0; + std::atomic<uint64_t> PagefileUsage = 0; + std::atomic<uint64_t> PeakPagefileUsage = 0; + + ProcessMetrics Load() const; + void Store(const ProcessMetrics& Metrics); + void Reset(); + }; - class InstanceStateUpdateGuard + struct ActiveInstance { - public: - InstanceStateUpdateGuard(Hub& InHub, - std::string_view ModuleId, - HubInstanceState OldState, - HubInstanceState& NewState, - uint16_t BasePort, - const std::string& BaseUri) - : m_Hub(InHub) - , m_ModuleId(ModuleId) - , m_OldState(OldState) - , m_NewState(NewState) - , m_BasePort(BasePort) - , m_BaseUri(BaseUri) - { - } - ~InstanceStateUpdateGuard() { m_Hub.OnStateUpdate(m_ModuleId, m_OldState, m_NewState, m_BasePort, m_BaseUri); } - - private: - Hub& m_Hub; - const std::string m_ModuleId; - HubInstanceState m_OldState; - HubInstanceState& m_NewState; - uint16_t m_BasePort; - const std::string m_BaseUri; + // Invariant: Instance == nullptr if and only if State == Unprovisioned. + // Both fields are only created/destroyed under the hub's exclusive lock. + // State is an atomic because the watchdog reads it under a shared instance lock + // without holding the hub lock. + std::unique_ptr<StorageServerInstance> Instance; + std::atomic<HubInstanceState> State = HubInstanceState::Unprovisioned; + + // Process metrics - written by WatchDog (inside instance shared lock), read lock-free. + AtomicProcessMetrics ProcessMetrics; + + // Activity tracking - written by WatchDog, reset on every state transition. + std::atomic<uint64_t> LastKnownActivitySum = 0; + std::atomic<std::chrono::system_clock::time_point> LastActivityTime = std::chrono::system_clock::time_point::min(); + + // Set in UpdateInstanceStateLocked on every state transition; read lock-free by Find/EnumerateModules. + std::atomic<std::chrono::system_clock::time_point> StateChangeTime = std::chrono::system_clock::time_point::min(); }; - void OnStateUpdate(std::string_view ModuleId, - HubInstanceState OldState, - HubInstanceState& NewState, - uint16_t BasePort, - std::string_view BaseUri); + // UpdateInstanceState is overloaded to accept a locked instance pointer (exclusive or shared) or the hub exclusive + // lock scope as a proof token that the caller holds an appropriate lock before mutating ActiveInstance::State. + // State mutation and notification (NotifyStateUpdate) are intentionally decoupled - see NotifyStateUpdate below. + + HubInstanceState UpdateInstanceState(const StorageServerInstance::ExclusiveLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState NewState) + { + ZEN_ASSERT(Instance); + return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState); + } + HubInstanceState UpdateInstanceState(const StorageServerInstance::SharedLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState NewState) + { + ZEN_ASSERT(Instance); + return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState); + } + HubInstanceState UpdateInstanceState(const RwLock::ExclusiveLockScope& HubLock, size_t ActiveInstanceIndex, HubInstanceState NewState) + { + ZEN_UNUSED(HubLock); + return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState); + } + HubInstanceState UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState); + + std::vector<ActiveInstance> m_ActiveInstances; + std::deque<size_t> m_FreeActiveInstanceIndexes; + SystemMetrics m_SystemMetrics; + DiskSpace m_DiskSpace; + std::atomic<int> m_MaxInstanceCount = 0; + std::thread m_WatchDog; + std::unordered_set<std::string> m_ObliteratingInstances; + + Event m_WatchDogEvent; + void WatchDog(); + void UpdateMachineMetrics(); + bool CheckInstanceStatus(HttpClient& ActivityHttpClient, + StorageServerInstance::SharedLockedPtr&& LockedInstance, + size_t ActiveInstanceIndex); + void AttemptRecoverInstance(std::string_view ModuleId); - friend class InstanceStateUpdateGuard; + bool CanProvisionInstanceLocked(std::string_view ModuleId, std::string& OutReason); + uint16_t GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const; + + Response InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate); + void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, + size_t ActiveInstanceIndex, + HubInstanceState OldState, + bool IsNewInstance); + void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + void CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex); + void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState); + void RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, std::string_view ModuleId); + void ObliterateBackendData(std::string_view ModuleId); + + // Notifications may fire slightly out of sync with the Hub's internal State flag. + // The guarantee is that notifications are sent in the correct order, but the State + // flag may be updated either before or after the notification fires depending on the + // code path. Callers must not assume a specific ordering between the two. + void NotifyStateUpdate(std::string_view ModuleId, + HubInstanceState OldState, + HubInstanceState NewState, + uint16_t BasePort, + std::string_view BaseUri); }; #if ZEN_WITH_TESTS |