aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub/hub.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/hub/hub.h')
-rw-r--r--src/zenserver/hub/hub.h262
1 files changed, 220 insertions, 42 deletions
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index 78be3eda1..40d046ce0 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -2,21 +2,28 @@
#pragma once
+#include "hubinstancestate.h"
#include "resourcemetrics.h"
+#include "storageserverinstance.h"
+#include <zencore/compactbinary.h>
+#include <zencore/filesystem.h>
#include <zencore/system.h>
#include <zenutil/zenserverprocess.h>
+#include <chrono>
#include <deque>
#include <filesystem>
#include <functional>
#include <memory>
+#include <thread>
#include <unordered_map>
#include <unordered_set>
namespace zen {
-class StorageServerInstance;
+class HttpClient;
+class WorkerThreadPool;
/**
* Hub
@@ -33,104 +40,275 @@ struct HubProvisionedInstanceInfo
class Hub
{
public:
+ struct WatchDogConfiguration
+ {
+ std::chrono::milliseconds CycleInterval = std::chrono::seconds(3);
+ std::chrono::milliseconds CycleProcessingBudget = std::chrono::milliseconds(500);
+ std::chrono::milliseconds InstanceCheckThrottle = std::chrono::milliseconds(5);
+ std::chrono::seconds ProvisionedInactivityTimeout = std::chrono::minutes(10);
+ std::chrono::seconds HibernatedInactivityTimeout = std::chrono::minutes(30);
+ std::chrono::seconds InactivityCheckMargin = std::chrono::minutes(1);
+
+ std::chrono::milliseconds ActivityCheckConnectTimeout = std::chrono::milliseconds(100);
+ std::chrono::milliseconds ActivityCheckRequestTimeout = std::chrono::milliseconds(200);
+ };
+
struct Configuration
{
/** Enable or disable the use of a Windows Job Object for child process management.
* When enabled, all spawned child processes are assigned to a job object with
* JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, ensuring children are terminated if the hub
- * crashes or is force-killed. Must be called before Initialize(). No-op on non-Windows.
+ * crashes or is force-killed.
*/
bool UseJobObject = true;
uint16_t BasePortNumber = 21000;
int InstanceLimit = 1000;
- uint32_t InstanceHttpThreadCount = 0; // Deduce from core count
- int InstanceCoreLimit = 0; // Use hardware core count
+ uint32_t InstanceHttpThreadCount = 0; // Automatic
+ int InstanceCoreLimit = 0; // Automatic
+ std::string InstanceMalloc;
+ std::string InstanceTrace;
+ std::string InstanceTraceHost;
+ std::string InstanceTraceFile;
std::filesystem::path InstanceConfigPath;
+ std::string HydrationTargetSpecification;
+ CbObject HydrationOptions;
+
+ WatchDogConfiguration WatchDog;
+
+ ResourceMetrics ResourceLimits;
+
+ WorkerThreadPool* OptionalProvisionWorkerPool = nullptr;
+ WorkerThreadPool* OptionalHydrationWorkerPool = nullptr;
};
- typedef std::function<void(std::string_view ModuleId, const HubProvisionedInstanceInfo& Info)> ProvisionModuleCallbackFunc;
+ typedef std::function<
+ void(std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState)>
+ AsyncModuleStateChangeCallbackFunc;
- Hub(const Configuration& Config,
- ZenServerEnvironment&& RunEnvironment,
- ProvisionModuleCallbackFunc&& ProvisionedModuleCallback = {},
- ProvisionModuleCallbackFunc&& DeprovisionedModuleCallback = {});
+ Hub(const Configuration& Config,
+ ZenServerEnvironment&& RunEnvironment,
+ AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback = {});
~Hub();
Hub(const Hub&) = delete;
Hub& operator=(const Hub&) = delete;
+ struct InstanceInfo
+ {
+ HubInstanceState State = HubInstanceState::Unprovisioned;
+ std::chrono::system_clock::time_point StateChangeTime;
+ ProcessMetrics Metrics;
+ uint16_t Port = 0;
+ };
+
+ /**
+ * Deprovision all running instances
+ */
+ void Shutdown();
+
+ enum class EResponseCode
+ {
+ NotFound,
+ Rejected,
+ Accepted,
+ Completed
+ };
+
+ struct Response
+ {
+ EResponseCode ResponseCode = EResponseCode::Rejected;
+ std::string Message;
+ };
+
/**
* Provision a storage server instance for the given module ID.
*
* @param ModuleId The ID of the module to provision.
- * @param OutInfo If successful, information about the provisioned instance will be returned here.
- * @param OutReason If unsuccessful, the reason will be returned here.
+ * @param OutInfo On success, information about the provisioned instance is returned here.
*/
- bool Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason);
+ Response Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo);
/**
* Deprovision a storage server instance for the given module ID.
*
* @param ModuleId The ID of the module to deprovision.
- * @param OutReason If unsuccessful, the reason will be returned here.
- * @return true if the instance was found and deprovisioned, false otherwise.
*/
- bool Deprovision(const std::string& ModuleId, std::string& OutReason);
+ Response Deprovision(const std::string& ModuleId);
+
+ /**
+ * Obliterate a storage server instance and all associated data.
+ * Shuts down the process, deletes backend hydration data, and cleans local state.
+ *
+ * @param ModuleId The ID of the module to obliterate.
+ */
+ Response Obliterate(const std::string& ModuleId);
/**
- * Find a storage server instance for the given module ID.
+ * Hibernate a storage server instance for the given module ID.
+ * The instance is shut down but its data is preserved; it can be woken later.
*
- * Beware that as this returns a raw pointer to the instance, the caller must ensure
- * that the instance is not deprovisioned while in use.
+ * @param ModuleId The ID of the module to hibernate.
+ */
+ Response Hibernate(const std::string& ModuleId);
+
+ /**
+ * Wake a hibernated storage server instance for the given module ID.
+ *
+ * @param ModuleId The ID of the module to wake.
+ */
+ Response Wake(const std::string& ModuleId);
+
+ /**
+ * Find info about storage server instance for the given module ID.
*
* @param ModuleId The ID of the module to find.
- * @param OutInstance If found, the instance will be returned here.
+ * @param OutInstanceInfo If found, the instance info will be returned here.
* @return true if the instance was found, false otherwise.
*/
- bool Find(std::string_view ModuleId, StorageServerInstance** OutInstance = nullptr);
+ bool Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo = nullptr);
/**
- * Enumerate all storage server instances.
+ * Enumerate a snapshot of all storage server instances.
*
- * @param Callback The callback to invoke for each instance. Note that you should
- * not do anything heavyweight in the callback as it is invoked while holding
- * a shared lock.
+ * @param Callback The callback to invoke for each instance.
*/
- void EnumerateModules(std::function<void(StorageServerInstance&)> Callback);
+ void EnumerateModules(std::function<void(std::string_view ModuleId, const InstanceInfo&)> Callback);
int GetInstanceCount();
- int GetMaxInstanceCount() const { return m_MaxInstanceCount; }
+ int GetMaxInstanceCount() const { return m_MaxInstanceCount.load(); }
+
+ void GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace) const;
+
+ bool IsInstancePort(uint16_t Port) const;
const Configuration& GetConfig() const { return m_Config; }
+#if ZEN_WITH_TESTS
+ void TerminateModuleForTesting(const std::string& ModuleId);
+#endif
+
private:
const Configuration m_Config;
ZenServerEnvironment m_RunEnvironment;
+ WorkerThreadPool* m_WorkerPool = nullptr;
+ Latch m_BackgroundWorkLatch;
+ std::atomic<bool> m_ShutdownFlag = false;
- ProvisionModuleCallbackFunc m_ProvisionedModuleCallback;
- ProvisionModuleCallbackFunc m_DeprovisionedModuleCallback;
+ AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback;
- std::filesystem::path m_FileHydrationPath;
- std::filesystem::path m_HydrationTempPath;
+ std::unique_ptr<HydrationBase> m_Hydration;
+ std::filesystem::path m_HydrationTempPath;
#if ZEN_PLATFORM_WINDOWS
JobObject m_JobObject;
#endif
- RwLock m_Lock;
- std::unordered_map<std::string, std::unique_ptr<StorageServerInstance>> m_Instances;
- std::unordered_set<std::string> m_DeprovisioningModules;
- std::unordered_set<std::string> m_ProvisioningModules;
- ResourceMetrics m_ResourceLimits;
- SystemMetrics m_HostMetrics;
- int m_MaxInstanceCount = 0;
- std::deque<uint16_t> m_FreePorts;
-
- void UpdateStats();
- void UpdateCapacityMetrics();
- bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason);
+ mutable RwLock m_Lock;
+ std::unordered_map<std::string, size_t> m_InstanceLookup;
+
+ // Mirrors ProcessMetrics with atomic fields, enabling lock-free reads alongside watchdog writes.
+ struct AtomicProcessMetrics
+ {
+ std::atomic<uint64_t> MemoryBytes = 0;
+ std::atomic<uint64_t> KernelTimeMs = 0;
+ std::atomic<uint64_t> UserTimeMs = 0;
+ std::atomic<uint64_t> WorkingSetSize = 0;
+ std::atomic<uint64_t> PeakWorkingSetSize = 0;
+ std::atomic<uint64_t> PagefileUsage = 0;
+ std::atomic<uint64_t> PeakPagefileUsage = 0;
+
+ ProcessMetrics Load() const;
+ void Store(const ProcessMetrics& Metrics);
+ void Reset();
+ };
+
+ struct ActiveInstance
+ {
+ // Invariant: Instance == nullptr if and only if State == Unprovisioned.
+ // Both fields are only created/destroyed under the hub's exclusive lock.
+ // State is an atomic because the watchdog reads it under a shared instance lock
+ // without holding the hub lock.
+ std::unique_ptr<StorageServerInstance> Instance;
+ std::atomic<HubInstanceState> State = HubInstanceState::Unprovisioned;
+
+ // Process metrics - written by WatchDog (inside instance shared lock), read lock-free.
+ AtomicProcessMetrics ProcessMetrics;
+
+ // Activity tracking - written by WatchDog, reset on every state transition.
+ std::atomic<uint64_t> LastKnownActivitySum = 0;
+ std::atomic<std::chrono::system_clock::time_point> LastActivityTime = std::chrono::system_clock::time_point::min();
+
+ // Set in UpdateInstanceStateLocked on every state transition; read lock-free by Find/EnumerateModules.
+ std::atomic<std::chrono::system_clock::time_point> StateChangeTime = std::chrono::system_clock::time_point::min();
+ };
+
+ // UpdateInstanceState is overloaded to accept a locked instance pointer (exclusive or shared) or the hub exclusive
+ // lock scope as a proof token that the caller holds an appropriate lock before mutating ActiveInstance::State.
+ // State mutation and notification (NotifyStateUpdate) are intentionally decoupled - see NotifyStateUpdate below.
+
+ HubInstanceState UpdateInstanceState(const StorageServerInstance::ExclusiveLockedPtr& Instance,
+ size_t ActiveInstanceIndex,
+ HubInstanceState NewState)
+ {
+ ZEN_ASSERT(Instance);
+ return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState);
+ }
+ HubInstanceState UpdateInstanceState(const StorageServerInstance::SharedLockedPtr& Instance,
+ size_t ActiveInstanceIndex,
+ HubInstanceState NewState)
+ {
+ ZEN_ASSERT(Instance);
+ return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState);
+ }
+ HubInstanceState UpdateInstanceState(const RwLock::ExclusiveLockScope& HubLock, size_t ActiveInstanceIndex, HubInstanceState NewState)
+ {
+ ZEN_UNUSED(HubLock);
+ return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState);
+ }
+ HubInstanceState UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState);
+
+ std::vector<ActiveInstance> m_ActiveInstances;
+ std::deque<size_t> m_FreeActiveInstanceIndexes;
+ SystemMetrics m_SystemMetrics;
+ DiskSpace m_DiskSpace;
+ std::atomic<int> m_MaxInstanceCount = 0;
+ std::thread m_WatchDog;
+ std::unordered_set<std::string> m_ObliteratingInstances;
+
+ Event m_WatchDogEvent;
+ void WatchDog();
+ void UpdateMachineMetrics();
+ bool CheckInstanceStatus(HttpClient& ActivityHttpClient,
+ StorageServerInstance::SharedLockedPtr&& LockedInstance,
+ size_t ActiveInstanceIndex);
+ void AttemptRecoverInstance(std::string_view ModuleId);
+
+ bool CanProvisionInstanceLocked(std::string_view ModuleId, std::string& OutReason);
+ uint16_t GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const;
+
+ Response InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate);
+ void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance,
+ size_t ActiveInstanceIndex,
+ HubInstanceState OldState,
+ bool IsNewInstance);
+ void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
+ void CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex);
+ void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
+ void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
+ void RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, std::string_view ModuleId);
+ void ObliterateBackendData(std::string_view ModuleId);
+
+ // Notifications may fire slightly out of sync with the Hub's internal State flag.
+ // The guarantee is that notifications are sent in the correct order, but the State
+ // flag may be updated either before or after the notification fires depending on the
+ // code path. Callers must not assume a specific ordering between the two.
+ void NotifyStateUpdate(std::string_view ModuleId,
+ HubInstanceState OldState,
+ HubInstanceState NewState,
+ uint16_t BasePort,
+ std::string_view BaseUri);
};
#if ZEN_WITH_TESTS