aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub/hub.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/hub/hub.h')
-rw-r--r--src/zenserver/hub/hub.h234
1 files changed, 166 insertions, 68 deletions
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index 28e77e729..40d046ce0 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -4,10 +4,14 @@
#include "hubinstancestate.h"
#include "resourcemetrics.h"
+#include "storageserverinstance.h"
+#include <zencore/compactbinary.h>
+#include <zencore/filesystem.h>
#include <zencore/system.h>
#include <zenutil/zenserverprocess.h>
+#include <chrono>
#include <deque>
#include <filesystem>
#include <functional>
@@ -18,7 +22,8 @@
namespace zen {
-class StorageServerInstance;
+class HttpClient;
+class WorkerThreadPool;
/**
* Hub
@@ -35,6 +40,19 @@ struct HubProvisionedInstanceInfo
class Hub
{
public:
+ struct WatchDogConfiguration
+ {
+ std::chrono::milliseconds CycleInterval = std::chrono::seconds(3);
+ std::chrono::milliseconds CycleProcessingBudget = std::chrono::milliseconds(500);
+ std::chrono::milliseconds InstanceCheckThrottle = std::chrono::milliseconds(5);
+ std::chrono::seconds ProvisionedInactivityTimeout = std::chrono::minutes(10);
+ std::chrono::seconds HibernatedInactivityTimeout = std::chrono::minutes(30);
+ std::chrono::seconds InactivityCheckMargin = std::chrono::minutes(1);
+
+ std::chrono::milliseconds ActivityCheckConnectTimeout = std::chrono::milliseconds(100);
+ std::chrono::milliseconds ActivityCheckRequestTimeout = std::chrono::milliseconds(200);
+ };
+
struct Configuration
{
/** Enable or disable the use of a Windows Job Object for child process management.
@@ -49,8 +67,20 @@ public:
uint32_t InstanceHttpThreadCount = 0; // Automatic
int InstanceCoreLimit = 0; // Automatic
+ std::string InstanceMalloc;
+ std::string InstanceTrace;
+ std::string InstanceTraceHost;
+ std::string InstanceTraceFile;
std::filesystem::path InstanceConfigPath;
std::string HydrationTargetSpecification;
+ CbObject HydrationOptions;
+
+ WatchDogConfiguration WatchDog;
+
+ ResourceMetrics ResourceLimits;
+
+ WorkerThreadPool* OptionalProvisionWorkerPool = nullptr;
+ WorkerThreadPool* OptionalHydrationWorkerPool = nullptr;
};
typedef std::function<
@@ -68,7 +98,7 @@ public:
struct InstanceInfo
{
HubInstanceState State = HubInstanceState::Unprovisioned;
- std::chrono::system_clock::time_point ProvisionTime;
+ std::chrono::system_clock::time_point StateChangeTime;
ProcessMetrics Metrics;
uint16_t Port = 0;
};
@@ -78,42 +108,57 @@ public:
*/
void Shutdown();
+ enum class EResponseCode
+ {
+ NotFound,
+ Rejected,
+ Accepted,
+ Completed
+ };
+
+ struct Response
+ {
+ EResponseCode ResponseCode = EResponseCode::Rejected;
+ std::string Message;
+ };
+
/**
* Provision a storage server instance for the given module ID.
*
* @param ModuleId The ID of the module to provision.
- * @param OutInfo If successful, information about the provisioned instance will be returned here.
- * @param OutReason If unsuccessful, the reason will be returned here.
+ * @param OutInfo On success, information about the provisioned instance is returned here.
*/
- bool Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason);
+ Response Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo);
/**
* Deprovision a storage server instance for the given module ID.
*
* @param ModuleId The ID of the module to deprovision.
- * @param OutReason If unsuccessful, the reason will be returned here.
- * @return true if the instance was found and deprovisioned, false otherwise.
*/
- bool Deprovision(const std::string& ModuleId, std::string& OutReason);
+ Response Deprovision(const std::string& ModuleId);
+
+ /**
+ * Obliterate a storage server instance and all associated data.
+ * Shuts down the process, deletes backend hydration data, and cleans local state.
+ *
+ * @param ModuleId The ID of the module to obliterate.
+ */
+ Response Obliterate(const std::string& ModuleId);
/**
* Hibernate a storage server instance for the given module ID.
* The instance is shut down but its data is preserved; it can be woken later.
*
* @param ModuleId The ID of the module to hibernate.
- * @param OutReason If unsuccessful, the reason will be returned here (empty = not found).
- * @return true if the instance was hibernated, false otherwise.
*/
- bool Hibernate(const std::string& ModuleId, std::string& OutReason);
+ Response Hibernate(const std::string& ModuleId);
/**
* Wake a hibernated storage server instance for the given module ID.
*
* @param ModuleId The ID of the module to wake.
- * @param OutReason If unsuccessful, the reason will be returned here (empty = not found).
- * @return true if the instance was woken, false otherwise.
*/
- bool Wake(const std::string& ModuleId, std::string& OutReason);
+ Response Wake(const std::string& ModuleId);
/**
* Find info about storage server instance for the given module ID.
@@ -135,6 +180,10 @@ public:
int GetMaxInstanceCount() const { return m_MaxInstanceCount.load(); }
+ void GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace) const;
+
+ bool IsInstancePort(uint16_t Port) const;
+
const Configuration& GetConfig() const { return m_Config; }
#if ZEN_WITH_TESTS
@@ -144,73 +193,122 @@ public:
private:
const Configuration m_Config;
ZenServerEnvironment m_RunEnvironment;
+ WorkerThreadPool* m_WorkerPool = nullptr;
+ Latch m_BackgroundWorkLatch;
+ std::atomic<bool> m_ShutdownFlag = false;
AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback;
- std::string m_HydrationTargetSpecification;
- std::filesystem::path m_HydrationTempPath;
+ std::unique_ptr<HydrationBase> m_Hydration;
+ std::filesystem::path m_HydrationTempPath;
#if ZEN_PLATFORM_WINDOWS
JobObject m_JobObject;
#endif
- RwLock m_Lock;
- std::unordered_map<std::string, size_t> m_InstanceLookup;
- std::unordered_set<std::string> m_DeprovisioningModules;
- std::unordered_set<std::string> m_ProvisioningModules;
- std::unordered_set<std::string> m_HibernatingModules;
- std::unordered_set<std::string> m_WakingModules;
- std::unordered_set<std::string> m_RecoveringModules;
- std::vector<std::unique_ptr<StorageServerInstance>> m_ActiveInstances;
- std::vector<size_t> m_FreeActiveInstanceIndexes;
- ResourceMetrics m_ResourceLimits;
- SystemMetrics m_HostMetrics;
- std::atomic<int> m_MaxInstanceCount = 0;
- std::deque<uint16_t> m_FreePorts;
- std::thread m_WatchDog;
-
- Event m_WatchDogEvent;
- void WatchDog();
- void AttemptRecoverInstance(std::string_view ModuleId);
+ mutable RwLock m_Lock;
+ std::unordered_map<std::string, size_t> m_InstanceLookup;
- void UpdateStats();
- void UpdateCapacityMetrics();
- bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason);
+ // Mirrors ProcessMetrics with atomic fields, enabling lock-free reads alongside watchdog writes.
+ struct AtomicProcessMetrics
+ {
+ std::atomic<uint64_t> MemoryBytes = 0;
+ std::atomic<uint64_t> KernelTimeMs = 0;
+ std::atomic<uint64_t> UserTimeMs = 0;
+ std::atomic<uint64_t> WorkingSetSize = 0;
+ std::atomic<uint64_t> PeakWorkingSetSize = 0;
+ std::atomic<uint64_t> PagefileUsage = 0;
+ std::atomic<uint64_t> PeakPagefileUsage = 0;
+
+ ProcessMetrics Load() const;
+ void Store(const ProcessMetrics& Metrics);
+ void Reset();
+ };
- class InstanceStateUpdateGuard
+ struct ActiveInstance
{
- public:
- InstanceStateUpdateGuard(Hub& InHub,
- std::string_view ModuleId,
- HubInstanceState OldState,
- HubInstanceState& NewState,
- uint16_t BasePort,
- const std::string& BaseUri)
- : m_Hub(InHub)
- , m_ModuleId(ModuleId)
- , m_OldState(OldState)
- , m_NewState(NewState)
- , m_BasePort(BasePort)
- , m_BaseUri(BaseUri)
- {
- }
- ~InstanceStateUpdateGuard() { m_Hub.OnStateUpdate(m_ModuleId, m_OldState, m_NewState, m_BasePort, m_BaseUri); }
-
- private:
- Hub& m_Hub;
- const std::string m_ModuleId;
- HubInstanceState m_OldState;
- HubInstanceState& m_NewState;
- uint16_t m_BasePort;
- const std::string m_BaseUri;
+ // Invariant: Instance == nullptr if and only if State == Unprovisioned.
+ // Both fields are only created/destroyed under the hub's exclusive lock.
+ // State is an atomic because the watchdog reads it under a shared instance lock
+ // without holding the hub lock.
+ std::unique_ptr<StorageServerInstance> Instance;
+ std::atomic<HubInstanceState> State = HubInstanceState::Unprovisioned;
+
+ // Process metrics - written by WatchDog (inside instance shared lock), read lock-free.
+ AtomicProcessMetrics ProcessMetrics;
+
+ // Activity tracking - written by WatchDog, reset on every state transition.
+ std::atomic<uint64_t> LastKnownActivitySum = 0;
+ std::atomic<std::chrono::system_clock::time_point> LastActivityTime = std::chrono::system_clock::time_point::min();
+
+ // Set in UpdateInstanceStateLocked on every state transition; read lock-free by Find/EnumerateModules.
+ std::atomic<std::chrono::system_clock::time_point> StateChangeTime = std::chrono::system_clock::time_point::min();
};
- void OnStateUpdate(std::string_view ModuleId,
- HubInstanceState OldState,
- HubInstanceState& NewState,
- uint16_t BasePort,
- std::string_view BaseUri);
+ // UpdateInstanceState is overloaded to accept a locked instance pointer (exclusive or shared) or the hub exclusive
+ // lock scope as a proof token that the caller holds an appropriate lock before mutating ActiveInstance::State.
+ // State mutation and notification (NotifyStateUpdate) are intentionally decoupled - see NotifyStateUpdate below.
+
+ HubInstanceState UpdateInstanceState(const StorageServerInstance::ExclusiveLockedPtr& Instance,
+ size_t ActiveInstanceIndex,
+ HubInstanceState NewState)
+ {
+ ZEN_ASSERT(Instance);
+ return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState);
+ }
+ HubInstanceState UpdateInstanceState(const StorageServerInstance::SharedLockedPtr& Instance,
+ size_t ActiveInstanceIndex,
+ HubInstanceState NewState)
+ {
+ ZEN_ASSERT(Instance);
+ return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState);
+ }
+ HubInstanceState UpdateInstanceState(const RwLock::ExclusiveLockScope& HubLock, size_t ActiveInstanceIndex, HubInstanceState NewState)
+ {
+ ZEN_UNUSED(HubLock);
+ return UpdateInstanceStateLocked(ActiveInstanceIndex, NewState);
+ }
+ HubInstanceState UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState);
+
+ std::vector<ActiveInstance> m_ActiveInstances;
+ std::deque<size_t> m_FreeActiveInstanceIndexes;
+ SystemMetrics m_SystemMetrics;
+ DiskSpace m_DiskSpace;
+ std::atomic<int> m_MaxInstanceCount = 0;
+ std::thread m_WatchDog;
+ std::unordered_set<std::string> m_ObliteratingInstances;
+
+ Event m_WatchDogEvent;
+ void WatchDog();
+ void UpdateMachineMetrics();
+ bool CheckInstanceStatus(HttpClient& ActivityHttpClient,
+ StorageServerInstance::SharedLockedPtr&& LockedInstance,
+ size_t ActiveInstanceIndex);
+ void AttemptRecoverInstance(std::string_view ModuleId);
- friend class InstanceStateUpdateGuard;
+ bool CanProvisionInstanceLocked(std::string_view ModuleId, std::string& OutReason);
+ uint16_t GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const;
+
+ Response InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate);
+ void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance,
+ size_t ActiveInstanceIndex,
+ HubInstanceState OldState,
+ bool IsNewInstance);
+ void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
+ void CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex);
+ void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
+ void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState);
+ void RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, std::string_view ModuleId);
+ void ObliterateBackendData(std::string_view ModuleId);
+
+ // Notifications may fire slightly out of sync with the Hub's internal State flag.
+ // The guarantee is that notifications are sent in the correct order, but the State
+ // flag may be updated either before or after the notification fires depending on the
+ // code path. Callers must not assume a specific ordering between the two.
+ void NotifyStateUpdate(std::string_view ModuleId,
+ HubInstanceState OldState,
+ HubInstanceState NewState,
+ uint16_t BasePort,
+ std::string_view BaseUri);
};
#if ZEN_WITH_TESTS