diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-10 09:40:21 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-10 09:40:21 +0100 |
| commit | 7a54bc0af3423923e30faf632c0862162e9af62d (patch) | |
| tree | 50328fc2ed5de0ac1dd6fc8649dfcef730a56681 /src | |
| parent | Merge pull request #752 from ue-foundation/lm/restrict-content-type (diff) | |
| download | zen-7a54bc0af3423923e30faf632c0862162e9af62d.tar.xz zen-7a54bc0af3423923e30faf632c0862162e9af62d.zip | |
hubservice refactor (#819)
* move Hub to separate class
* move StorageServerInstance to separate files
* refactor HttpHubService to not own Hub instance
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 379 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 118 | ||||
| -rw-r--r-- | src/zenserver/hub/hubservice.cpp | 734 | ||||
| -rw-r--r-- | src/zenserver/hub/hubservice.h | 17 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.cpp | 1 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.h | 2 | ||||
| -rw-r--r-- | src/zenserver/hub/resourcemetrics.h | 15 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.cpp | 203 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.h | 68 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 16 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.h | 6 |
11 files changed, 822 insertions, 737 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp new file mode 100644 index 000000000..f1cc86de6 --- /dev/null +++ b/src/zenserver/hub/hub.cpp @@ -0,0 +1,379 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "hub.h" + +#include "storageserverinstance.h" + +#include <zencore/assertfmt.h> +#include <zencore/compactbinary.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <EASTL/fixed_vector.h> +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +/////////////////////////////////////////////////////////////////////////// + +/** + * A timeline of events with sequence IDs and timestamps. Used to + * track significant events for broadcasting to listeners. + */ +class EventTimeline +{ +public: + EventTimeline() { m_Events.reserve(1024); } + + ~EventTimeline() {} + + EventTimeline(const EventTimeline&) = delete; + EventTimeline& operator=(const EventTimeline&) = delete; + + void RecordEvent(std::string_view EventTag, CbObject EventMetadata) + { + const uint64_t SequenceId = m_NextEventId++; + const auto Now = std::chrono::steady_clock::now(); + RwLock::ExclusiveLockScope _(m_Lock); + m_Events.emplace_back(SequenceId, EventTag, Now, std::move(EventMetadata)); + } + + struct EventRecord + { + uint64_t SequenceId; + std::string Tag; + std::chrono::steady_clock::time_point Timestamp; + CbObject EventMetadata; + + EventRecord(uint64_t InSequenceId, + std::string_view InTag, + std::chrono::steady_clock::time_point InTimestamp, + CbObject InEventMetadata = CbObject()) + : SequenceId(InSequenceId) + , Tag(InTag) + , Timestamp(InTimestamp) + , EventMetadata(InEventMetadata) + { + } + }; + + /** + * Iterate over events that have a SequenceId greater than SinceEventId + * + * @param Callback A callable that takes a const EventRecord& + * @param SinceEventId The SequenceId to compare against + */ + void IterateEventsSince(auto&& Callback, uint64_t SinceEventId) + { + // Hold the lock for as short a time as possible + eastl::fixed_vector<EventRecord, 128> EventsToProcess; + m_Lock.WithSharedLock([&] { + for (auto& Event : m_Events) + { + if (Event.SequenceId > SinceEventId) + { + EventsToProcess.push_back(Event); + } + } + }); + + // Now invoke the callback outside the lock + for (auto& Event : EventsToProcess) + { + Callback(Event); + } + } + + /** + * Trim events up to (and including) the given SequenceId. Intended + * to be used for cleaning up events which are not longer interesting. + * + * @param UpToEventId The SequenceId up to which events should be removed + */ + void TrimEventsUpTo(uint64_t UpToEventId) + { + RwLock::ExclusiveLockScope _(m_Lock); + auto It = std::remove_if(m_Events.begin(), m_Events.end(), [UpToEventId](const EventRecord& Event) { + return Event.SequenceId <= UpToEventId; + }); + m_Events.erase(It, m_Events.end()); + } + +private: + std::atomic<uint64_t> m_NextEventId{0}; + + RwLock m_Lock; + std::vector<EventRecord> m_Events; +}; + +////////////////////////////////////////////////////////////////////////// + +Hub::Hub(const Configuration& Config) : m_UseJobObject(Config.UseJobObject) +{ + m_HostMetrics = GetSystemMetrics(); + m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; + m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024; + + m_RunEnvironment.InitializeForHub(Config.HubBaseDir, Config.ChildBaseDir); + + m_FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); + ZEN_INFO("using file hydration path: '{}'", m_FileHydrationPath); + + m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp"); + ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath); + + // This is necessary to ensure the hub assigns a distinct port range. + // We need to do this primarily because otherwise automated tests will + // fail as the test runner will create processes in the default range. + // We should probably make this configurable or dynamic for maximum + // flexibility, and to allow running multiple hubs on the same host if + // necessary. + m_RunEnvironment.SetNextPortNumber(21000); + +#if ZEN_PLATFORM_WINDOWS + if (m_UseJobObject) + { + m_JobObject.Initialize(); + if (m_JobObject.IsValid()) + { + ZEN_INFO("Job object initialized for hub service child process management"); + } + else + { + ZEN_WARN("Failed to initialize job object; child processes will not be auto-terminated on hub crash"); + } + } +#endif +} + +Hub::~Hub() +{ + try + { + ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); + + m_Lock.WithExclusiveLock([this] { + for (auto& [ModuleId, Instance] : m_Instances) + { + Instance->Deprovision(); + } + m_Instances.clear(); + }); + } + catch (const std::exception& e) + { + ZEN_WARN("Exception during hub service shutdown: {}", e.what()); + } +} + +bool +Hub::Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason) +{ + StorageServerInstance* Instance = nullptr; + bool IsNewInstance = false; + { + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_Instances.find(std::string(ModuleId)); It == m_Instances.end()) + { + std::string Reason; + if (!CanProvisionInstance(ModuleId, /* out */ Reason)) + { + ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason); + + OutReason = Reason; + + return false; + } + + IsNewInstance = true; + auto NewInstance = + std::make_unique<StorageServerInstance>(m_RunEnvironment, ModuleId, m_FileHydrationPath, m_HydrationTempPath); +#if ZEN_PLATFORM_WINDOWS + if (m_JobObject.IsValid()) + { + NewInstance->SetJobObject(&m_JobObject); + } +#endif + Instance = NewInstance.get(); + m_Instances.emplace(std::string(ModuleId), std::move(NewInstance)); + + ZEN_INFO("Created new storage server instance for module '{}'", ModuleId); + } + else + { + Instance = It->second.get(); + } + + m_ProvisioningModules.emplace(std::string(ModuleId)); + } + + ZEN_ASSERT(Instance != nullptr); + + auto RemoveProvisioningModule = MakeGuard([&] { + RwLock::ExclusiveLockScope _(m_Lock); + m_ProvisioningModules.erase(std::string(ModuleId)); + }); + + // NOTE: this is done while not holding the lock, as provisioning may take time + // and we don't want to block other operations. We track which modules are being + // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision + // those modules while in this state. + + UpdateStats(); + + try + { + Instance->Provision(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); + if (IsNewInstance) + { + // Clean up + RwLock::ExclusiveLockScope _(m_Lock); + m_Instances.erase(std::string(ModuleId)); + } + return false; + } + + OutInfo.Port = Instance->GetBasePort(); + + // TODO: base URI? Would need to know what host name / IP to use + + return true; +} + +bool +Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) +{ + std::unique_ptr<StorageServerInstance> Instance; + + { + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_ProvisioningModules.find(ModuleId); It != m_ProvisioningModules.end()) + { + OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId); + + ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId); + + return false; + } + + if (auto It = m_Instances.find(ModuleId); It == m_Instances.end()) + { + ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); + + // Not found, OutReason should be empty + return false; + } + else + { + Instance = std::move(It->second); + m_Instances.erase(It); + m_DeprovisioningModules.emplace(ModuleId); + } + } + + // The module is deprovisioned outside the lock to avoid blocking other operations. + // + // To ensure that no new provisioning can occur while we're deprovisioning, + // we add the module ID to m_DeprovisioningModules and remove it once + // deprovisioning is complete. + + auto _ = MakeGuard([&] { + RwLock::ExclusiveLockScope _(m_Lock); + m_DeprovisioningModules.erase(ModuleId); + }); + + Instance->Deprovision(); + + return true; +} + +bool +Hub::Find(std::string_view ModuleId, StorageServerInstance** OutInstance) +{ + RwLock::SharedLockScope _(m_Lock); + if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end()) + { + if (OutInstance) + { + *OutInstance = It->second.get(); + } + return true; + } + else if (OutInstance) + { + *OutInstance = nullptr; + } + return false; +} + +void +Hub::EnumerateModules(std::function<void(StorageServerInstance&)> Callback) +{ + RwLock::SharedLockScope _(m_Lock); + for (auto& It : m_Instances) + { + Callback(*It.second); + } +} + +int +Hub::GetInstanceCount() +{ + RwLock::SharedLockScope _(m_Lock); + return gsl::narrow_cast<int>(m_Instances.size()); +} + +void +Hub::UpdateCapacityMetrics() +{ + m_HostMetrics = GetSystemMetrics(); + + // Update per-instance metrics +} + +void +Hub::UpdateStats() +{ + m_Lock.WithSharedLock([this] { m_MaxInstanceCount = Max(m_MaxInstanceCount, gsl::narrow_cast<int>(m_Instances.size())); }); +} + +bool +Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) +{ + if (m_DeprovisioningModules.find(std::string(ModuleId)) != m_DeprovisioningModules.end()) + { + OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId); + + return false; + } + + if (m_ProvisioningModules.find(std::string(ModuleId)) != m_ProvisioningModules.end()) + { + OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId); + + return false; + } + + if (gsl::narrow_cast<int>(m_Instances.size()) >= m_InstanceLimit) + { + OutReason = fmt::format("instance limit exceeded ({})", m_InstanceLimit); + + return false; + } + + // TODO: handle additional resource metrics + + return true; +} + +/////////////////////////////////////////////////////////////////////////// + +} // namespace zen diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h new file mode 100644 index 000000000..caf8890ff --- /dev/null +++ b/src/zenserver/hub/hub.h @@ -0,0 +1,118 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "resourcemetrics.h" + +#include <zencore/system.h> +#include <zenutil/zenserverprocess.h> + +#include <filesystem> +#include <functional> +#include <memory> +#include <unordered_map> +#include <unordered_set> + +namespace zen { + +class StorageServerInstance; + +/** + * Hub + * + * Core logic for managing storage server instances on behalf of external clients. + */ +class Hub +{ +public: + struct Configuration + { + std::filesystem::path HubBaseDir; + std::filesystem::path ChildBaseDir; + + /** Enable or disable the use of a Windows Job Object for child process management. + * When enabled, all spawned child processes are assigned to a job object with + * JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, ensuring children are terminated if the hub + * crashes or is force-killed. Must be called before Initialize(). No-op on non-Windows. + */ + bool UseJobObject = true; + }; + Hub(const Configuration& Config); + ~Hub(); + + Hub(const Hub&) = delete; + Hub& operator=(const Hub&) = delete; + + struct ProvisionedInstanceInfo + { + std::string BaseUri; + uint16_t Port; + }; + + /** + * Provision a storage server instance for the given module ID. + * + * @param ModuleId The ID of the module to provision. + * @param OutInfo If successful, information about the provisioned instance will be returned here. + * @param OutReason If unsuccessful, the reason will be returned here. + */ + bool Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason); + + /** + * Deprovision a storage server instance for the given module ID. + * + * @param ModuleId The ID of the module to deprovision. + * @param OutReason If unsuccessful, the reason will be returned here. + * @return true if the instance was found and deprovisioned, false otherwise. + */ + bool Deprovision(const std::string& ModuleId, std::string& OutReason); + + /** + * Find a storage server instance for the given module ID. + * + * Beware that as this returns a raw pointer to the instance, the caller must ensure + * that the instance is not deprovisioned while in use. + * + * @param ModuleId The ID of the module to find. + * @param OutInstance If found, the instance will be returned here. + * @return true if the instance was found, false otherwise. + */ + bool Find(std::string_view ModuleId, StorageServerInstance** OutInstance = nullptr); + + /** + * Enumerate all storage server instances. + * + * @param Callback The callback to invoke for each instance. Note that you should + * not do anything heavyweight in the callback as it is invoked while holding + * a shared lock. + */ + void EnumerateModules(std::function<void(StorageServerInstance&)> Callback); + + int GetInstanceCount(); + + int GetInstanceLimit() const { return m_InstanceLimit; } + int GetMaxInstanceCount() const { return m_MaxInstanceCount; } + +private: + ZenServerEnvironment m_RunEnvironment; + std::filesystem::path m_FileHydrationPath; + std::filesystem::path m_HydrationTempPath; +#if ZEN_PLATFORM_WINDOWS + JobObject m_JobObject; +#endif + RwLock m_Lock; + std::unordered_map<std::string, std::unique_ptr<StorageServerInstance>> m_Instances; + std::unordered_set<std::string> m_DeprovisioningModules; + std::unordered_set<std::string> m_ProvisioningModules; + int m_MaxInstanceCount = 0; + int m_InstanceLimit = 1000; + ResourceMetrics m_ResourceLimits; + SystemMetrics m_HostMetrics; + bool m_UseJobObject = true; + + void UpdateStats(); + void UpdateCapacityMetrics(); + bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); +}; + +} // namespace zen diff --git a/src/zenserver/hub/hubservice.cpp b/src/zenserver/hub/hubservice.cpp index 7b999ae20..6765340dc 100644 --- a/src/zenserver/hub/hubservice.cpp +++ b/src/zenserver/hub/hubservice.cpp @@ -2,720 +2,19 @@ #include "hubservice.h" -#include "hydration.h" +#include "hub.h" +#include "storageserverinstance.h" -#include <zencore/assertfmt.h> #include <zencore/compactbinarybuilder.h> -#include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> -#include <zencore/process.h> -#include <zencore/scopeguard.h> -#include <zencore/system.h> -#include <zenutil/zenserverprocess.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <EASTL/fixed_vector.h> -#include <asio.hpp> -ZEN_THIRD_PARTY_INCLUDES_END - -#include <unordered_map> -#include <unordered_set> namespace zen { -/////////////////////////////////////////////////////////////////////////// - -/** - * A timeline of events with sequence IDs and timestamps. Used to - * track significant events for broadcasting to listeners. - */ -class EventTimeline -{ -public: - EventTimeline() { m_Events.reserve(1024); } - - ~EventTimeline() {} - - EventTimeline(const EventTimeline&) = delete; - EventTimeline& operator=(const EventTimeline&) = delete; - - void RecordEvent(std::string_view EventTag, CbObject EventMetadata) - { - const uint64_t SequenceId = m_NextEventId++; - const auto Now = std::chrono::steady_clock::now(); - RwLock::ExclusiveLockScope _(m_Lock); - m_Events.emplace_back(SequenceId, EventTag, Now, std::move(EventMetadata)); - } - - struct EventRecord - { - uint64_t SequenceId; - std::string Tag; - std::chrono::steady_clock::time_point Timestamp; - CbObject EventMetadata; - - EventRecord(uint64_t InSequenceId, - std::string_view InTag, - std::chrono::steady_clock::time_point InTimestamp, - CbObject InEventMetadata = CbObject()) - : SequenceId(InSequenceId) - , Tag(InTag) - , Timestamp(InTimestamp) - , EventMetadata(InEventMetadata) - { - } - }; - - /** - * Iterate over events that have a SequenceId greater than SinceEventId - * - * @param Callback A callable that takes a const EventRecord& - * @param SinceEventId The SequenceId to compare against - */ - void IterateEventsSince(auto&& Callback, uint64_t SinceEventId) - { - // Hold the lock for as short a time as possible - eastl::fixed_vector<EventRecord, 128> EventsToProcess; - m_Lock.WithSharedLock([&] { - for (auto& Event : m_Events) - { - if (Event.SequenceId > SinceEventId) - { - EventsToProcess.push_back(Event); - } - } - }); - - // Now invoke the callback outside the lock - for (auto& Event : EventsToProcess) - { - Callback(Event); - } - } - - /** - * Trim events up to (and including) the given SequenceId. Intended - * to be used for cleaning up events which are not longer interesting. - * - * @param UpToEventId The SequenceId up to which events should be removed - */ - void TrimEventsUpTo(uint64_t UpToEventId) - { - RwLock::ExclusiveLockScope _(m_Lock); - auto It = std::remove_if(m_Events.begin(), m_Events.end(), [UpToEventId](const EventRecord& Event) { - return Event.SequenceId <= UpToEventId; - }); - m_Events.erase(It, m_Events.end()); - } - -private: - std::atomic<uint64_t> m_NextEventId{0}; - - RwLock m_Lock; - std::vector<EventRecord> m_Events; -}; - -////////////////////////////////////////////////////////////////////////// - -struct ResourceMetrics -{ - uint64_t DiskUsageBytes = 0; - uint64_t MemoryUsageBytes = 0; -}; - -/** - * Storage Server Instance - * - * This class manages the lifecycle of a storage server instance, and - * provides functions to query its state. There should be one instance - * per module ID. - */ -struct StorageServerInstance -{ - StorageServerInstance(ZenServerEnvironment& RunEnvironment, - std::string_view ModuleId, - std::filesystem::path FileHydrationPath, - std::filesystem::path HydrationTempPath); - ~StorageServerInstance(); - - void Provision(); - void Deprovision(); - - void Hibernate(); - void Wake(); - - const ResourceMetrics& GetResourceMetrics() const { return m_ResourceMetrics; } - - inline std::string_view GetModuleId() const { return m_ModuleId; } - inline bool IsProvisioned() const { return m_IsProvisioned.load(); } - - inline uint16_t GetBasePort() const { return m_ServerInstance.GetBasePort(); } - -#if ZEN_PLATFORM_WINDOWS - void SetJobObject(JobObject* InJobObject) { m_JobObject = InJobObject; } -#endif - -private: - void WakeLocked(); - RwLock m_Lock; - std::string m_ModuleId; - std::atomic<bool> m_IsProvisioned{false}; - std::atomic<bool> m_IsHibernated{false}; - ZenServerInstance m_ServerInstance; - std::filesystem::path m_BaseDir; - std::filesystem::path m_TempDir; - std::filesystem::path m_HydrationPath; - ResourceMetrics m_ResourceMetrics; -#if ZEN_PLATFORM_WINDOWS - JobObject* m_JobObject = nullptr; -#endif - - void SpawnServerProcess(); - - void Hydrate(); - void Dehydrate(); -}; - -StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, - std::string_view ModuleId, - std::filesystem::path FileHydrationPath, - std::filesystem::path HydrationTempPath) -: m_ModuleId(ModuleId) -, m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer) -, m_HydrationPath(FileHydrationPath) -{ - m_BaseDir = RunEnvironment.CreateChildDir(ModuleId); - m_TempDir = HydrationTempPath / ModuleId; -} - -StorageServerInstance::~StorageServerInstance() -{ -} - -void -StorageServerInstance::SpawnServerProcess() -{ - ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); - - m_ServerInstance.SetServerExecutablePath(GetRunningExecutablePath()); - m_ServerInstance.SetDataDir(m_BaseDir); -#if ZEN_PLATFORM_WINDOWS - m_ServerInstance.SetJobObject(m_JobObject); -#endif - const uint16_t BasePort = m_ServerInstance.SpawnServerAndWaitUntilReady(); - - ZEN_DEBUG("Storage server instance for module '{}' started, listening on port {}", m_ModuleId, BasePort); - - m_ServerInstance.EnableShutdownOnDestroy(); -} - -void -StorageServerInstance::Provision() -{ - RwLock::ExclusiveLockScope _(m_Lock); - - if (m_IsProvisioned) - { - ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId); - - return; - } - - if (m_IsHibernated) - { - WakeLocked(); - } - else - { - ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir); - - Hydrate(); - - SpawnServerProcess(); - } - - m_IsProvisioned = true; -} - -void -StorageServerInstance::Deprovision() -{ - RwLock::ExclusiveLockScope _(m_Lock); - - if (!m_IsProvisioned) - { - ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned", m_ModuleId); - - return; - } - - ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId); - - m_ServerInstance.Shutdown(); - - Dehydrate(); - - m_IsProvisioned = false; -} - -void -StorageServerInstance::Hibernate() -{ - // Signal server to shut down, but keep data around for later wake - - RwLock::ExclusiveLockScope _(m_Lock); - - if (!m_IsProvisioned) - { - ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned", m_ModuleId); - - return; - } - - if (m_IsHibernated) - { - ZEN_WARN("Storage server instance for module '{}' is already hibernated", m_ModuleId); - - return; - } - - if (!m_ServerInstance.IsRunning()) - { - ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not running", m_ModuleId); - - // This is an unexpected state. Should consider the instance invalid? - - return; - } - - try - { - m_ServerInstance.Shutdown(); - - m_IsHibernated = true; - m_IsProvisioned = false; - - return; - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", m_ModuleId, Ex.what()); - } -} - -void -StorageServerInstance::Wake() -{ - RwLock::ExclusiveLockScope _(m_Lock); - WakeLocked(); -} - -void -StorageServerInstance::WakeLocked() -{ - // Start server in-place using existing data - - if (!m_IsHibernated) - { - ZEN_WARN("Attempted to wake storage server instance for module '{}' which is not hibernated", m_ModuleId); - - return; - } - - ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); - - try - { - SpawnServerProcess(); - m_IsHibernated = false; - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", m_ModuleId, Ex.what()); - - // TODO: this instance should be marked as invalid - } -} - -void -StorageServerInstance::Hydrate() -{ - HydrationConfig Config{.ServerStateDir = m_BaseDir, - .TempDir = m_TempDir, - .ModuleId = m_ModuleId, - .TargetSpecification = WideToUtf8(m_HydrationPath.native())}; - - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateFileHydrator(); - - Hydrator->Configure(Config); - Hydrator->Hydrate(); -} - -void -StorageServerInstance::Dehydrate() -{ - HydrationConfig Config{.ServerStateDir = m_BaseDir, - .TempDir = m_TempDir, - .ModuleId = m_ModuleId, - .TargetSpecification = WideToUtf8(m_HydrationPath.native())}; - - std::unique_ptr<HydrationStrategyBase> Hydrator = CreateFileHydrator(); - - Hydrator->Configure(Config); - Hydrator->Dehydrate(); -} - -////////////////////////////////////////////////////////////////////////// - -struct HttpHubService::Impl -{ - Impl(const Impl&) = delete; - Impl& operator=(const Impl&) = delete; - - Impl(); - ~Impl(); - - void Initialize(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir) - { - m_RunEnvironment.InitializeForHub(HubBaseDir, ChildBaseDir); - m_FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); - ZEN_INFO("using file hydration path: '{}'", m_FileHydrationPath); - - m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp"); - ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath); - - // This is necessary to ensure the hub assigns a distinct port range. - // We need to do this primarily because otherwise automated tests will - // fail as the test runner will create processes in the default range. - // We should probably make this configurable or dynamic for maximum - // flexibility, and to allow running multiple hubs on the same host if - // necessary. - m_RunEnvironment.SetNextPortNumber(21000); - -#if ZEN_PLATFORM_WINDOWS - if (m_UseJobObject) - { - m_JobObject.Initialize(); - if (m_JobObject.IsValid()) - { - ZEN_INFO("Job object initialized for hub service child process management"); - } - else - { - ZEN_WARN("Failed to initialize job object; child processes will not be auto-terminated on hub crash"); - } - } -#endif - } - - void Cleanup() - { - RwLock::ExclusiveLockScope _(m_Lock); - m_Instances.clear(); - } - - struct ProvisionedInstanceInfo - { - std::string BaseUri; - uint16_t Port; - }; - - /** - * Provision a storage server instance for the given module ID. - * - * @param ModuleId The ID of the module to provision. - * @param OutInfo If successful, information about the provisioned instance will be returned here. - * @param OutReason If unsuccessful, the reason will be returned here. - */ - bool Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason) - { - StorageServerInstance* Instance = nullptr; - bool IsNewInstance = false; - { - RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_Instances.find(std::string(ModuleId)); It == m_Instances.end()) - { - std::string Reason; - if (!CanProvisionInstance(ModuleId, /* out */ Reason)) - { - ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason); - - OutReason = Reason; - - return false; - } - - IsNewInstance = true; - auto NewInstance = - std::make_unique<StorageServerInstance>(m_RunEnvironment, ModuleId, m_FileHydrationPath, m_HydrationTempPath); -#if ZEN_PLATFORM_WINDOWS - if (m_JobObject.IsValid()) - { - NewInstance->SetJobObject(&m_JobObject); - } -#endif - Instance = NewInstance.get(); - m_Instances.emplace(std::string(ModuleId), std::move(NewInstance)); - - ZEN_INFO("Created new storage server instance for module '{}'", ModuleId); - } - else - { - Instance = It->second.get(); - } - - m_ProvisioningModules.emplace(std::string(ModuleId)); - } - - ZEN_ASSERT(Instance != nullptr); - - auto RemoveProvisioningModule = MakeGuard([&] { - RwLock::ExclusiveLockScope _(m_Lock); - m_ProvisioningModules.erase(std::string(ModuleId)); - }); - - // NOTE: this is done while not holding the lock, as provisioning may take time - // and we don't want to block other operations. We track which modules are being - // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision - // those modules while in this state. - - UpdateStats(); - - try - { - Instance->Provision(); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); - if (IsNewInstance) - { - // Clean up - RwLock::ExclusiveLockScope _(m_Lock); - m_Instances.erase(std::string(ModuleId)); - } - return false; - } - - OutInfo.Port = Instance->GetBasePort(); - - // TODO: base URI? Would need to know what host name / IP to use - - return true; - } - - /** - * Deprovision a storage server instance for the given module ID. - * - * @param ModuleId The ID of the module to deprovision. - * @param OutReason If unsuccessful, the reason will be returned here. - * @return true if the instance was found and deprovisioned, false otherwise. - */ - bool Deprovision(const std::string& ModuleId, std::string& OutReason) - { - std::unique_ptr<StorageServerInstance> Instance; - - { - RwLock::ExclusiveLockScope _(m_Lock); - - if (auto It = m_ProvisioningModules.find(ModuleId); It != m_ProvisioningModules.end()) - { - OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId); - - ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId); - - return false; - } - - if (auto It = m_Instances.find(ModuleId); It == m_Instances.end()) - { - ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); - - // Not found, OutReason should be empty - return false; - } - else - { - Instance = std::move(It->second); - m_Instances.erase(It); - m_DeprovisioningModules.emplace(ModuleId); - } - } - - // The module is deprovisioned outside the lock to avoid blocking other operations. - // - // To ensure that no new provisioning can occur while we're deprovisioning, - // we add the module ID to m_DeprovisioningModules and remove it once - // deprovisioning is complete. - - auto _ = MakeGuard([&] { - RwLock::ExclusiveLockScope _(m_Lock); - m_DeprovisioningModules.erase(ModuleId); - }); - - Instance->Deprovision(); - - return true; - } - - /** - * Find a storage server instance for the given module ID. - * - * Beware that as this returns a raw pointer to the instance, the caller must ensure - * that the instance is not deprovisioned while in use. - * - * @param ModuleId The ID of the module to find. - * @param OutInstance If found, the instance will be returned here. - * @return true if the instance was found, false otherwise. - */ - bool Find(std::string_view ModuleId, StorageServerInstance** OutInstance = nullptr) - { - RwLock::SharedLockScope _(m_Lock); - if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end()) - { - if (OutInstance) - { - *OutInstance = It->second.get(); - } - return true; - } - else if (OutInstance) - { - *OutInstance = nullptr; - } - return false; - } - - /** - * Enumerate all storage server instances. - * - * @param Callback The callback to invoke for each instance. Note that you should - * not do anything heavyweight in the callback as it is invoked while holding - * a shared lock. - */ - void EnumerateModules(auto&& Callback) - { - RwLock::SharedLockScope _(m_Lock); - for (auto& It : m_Instances) - { - Callback(*It.second); - } - } - - int GetInstanceCount() - { - RwLock::SharedLockScope _(m_Lock); - return gsl::narrow_cast<int>(m_Instances.size()); - } - - inline int GetInstanceLimit() { return m_InstanceLimit; } - inline int GetMaxInstanceCount() { return m_MaxInstanceCount; } - - bool m_UseJobObject = true; - -private: - ZenServerEnvironment m_RunEnvironment; - std::filesystem::path m_FileHydrationPath; - std::filesystem::path m_HydrationTempPath; -#if ZEN_PLATFORM_WINDOWS - JobObject m_JobObject; -#endif - RwLock m_Lock; - std::unordered_map<std::string, std::unique_ptr<StorageServerInstance>> m_Instances; - std::unordered_set<std::string> m_DeprovisioningModules; - std::unordered_set<std::string> m_ProvisioningModules; - int m_MaxInstanceCount = 0; - void UpdateStats(); - - // Capacity tracking - - int m_InstanceLimit = 1000; - ResourceMetrics m_ResourceLimits; - SystemMetrics m_HostMetrics; - - void UpdateCapacityMetrics(); - bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); -}; - -HttpHubService::Impl::Impl() -{ - m_HostMetrics = zen::GetSystemMetrics(); - m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; - m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024; -} - -HttpHubService::Impl::~Impl() -{ - try - { - ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); - - m_Lock.WithExclusiveLock([this] { - for (auto& [ModuleId, Instance] : m_Instances) - { - Instance->Deprovision(); - } - m_Instances.clear(); - }); - } - catch (const std::exception& e) - { - ZEN_WARN("Exception during hub service shutdown: {}", e.what()); - } -} - -void -HttpHubService::Impl::UpdateCapacityMetrics() -{ - m_HostMetrics = zen::GetSystemMetrics(); - - // Update per-instance metrics -} - -void -HttpHubService::Impl::UpdateStats() -{ - m_Lock.WithSharedLock([this] { m_MaxInstanceCount = Max(m_MaxInstanceCount, gsl::narrow_cast<int>(m_Instances.size())); }); -} - -bool -HttpHubService::Impl::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) -{ - if (m_DeprovisioningModules.find(std::string(ModuleId)) != m_DeprovisioningModules.end()) - { - OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId); - - return false; - } - - if (m_ProvisioningModules.find(std::string(ModuleId)) != m_ProvisioningModules.end()) - { - OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId); - - return false; - } - - if (gsl::narrow_cast<int>(m_Instances.size()) >= m_InstanceLimit) - { - OutReason = fmt::format("instance limit exceeded ({})", m_InstanceLimit); - - return false; - } - - // TODO: handle additional resource metrics - - return true; -} - -/////////////////////////////////////////////////////////////////////////// - -HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir) : m_Impl(std::make_unique<Impl>()) +HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub) { using namespace std::literals; - m_Impl->Initialize(HubBaseDir, ChildBaseDir); - m_Router.AddMatcher("moduleid", [](std::string_view Str) -> bool { for (const auto C : Str) { @@ -738,7 +37,7 @@ HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem [this](HttpRouterRequest& Req) { CbObjectWriter Obj; Obj.BeginArray("modules"); - m_Impl->EnumerateModules([&Obj](StorageServerInstance& Instance) { + m_Hub.EnumerateModules([&Obj](StorageServerInstance& Instance) { Obj.BeginObject(); Obj << "moduleId" << Instance.GetModuleId(); Obj << "provisioned" << Instance.IsProvisioned(); @@ -775,8 +74,8 @@ HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem try { - Impl::ProvisionedInstanceInfo Info; - if (m_Impl->Provision(ModuleId, /* out */ Info, /* out */ FailureReason)) + Hub::ProvisionedInstanceInfo Info; + if (m_Hub.Provision(ModuleId, /* out */ Info, /* out */ FailureReason)) { CbObjectWriter Obj; Obj << "moduleId" << ModuleId; @@ -811,7 +110,7 @@ HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem try { - if (!m_Impl->Deprovision(std::string(ModuleId), /* out */ FailureReason)) + if (!m_Hub.Deprovision(std::string(ModuleId), /* out */ FailureReason)) { if (FailureReason.empty()) { @@ -843,9 +142,9 @@ HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem "stats", [this](HttpRouterRequest& Req) { CbObjectWriter Obj; - Obj << "currentInstanceCount" << m_Impl->GetInstanceCount(); - Obj << "maxInstanceCount" << m_Impl->GetMaxInstanceCount(); - Obj << "instanceLimit" << m_Impl->GetInstanceLimit(); + Obj << "currentInstanceCount" << m_Hub.GetInstanceCount(); + Obj << "maxInstanceCount" << m_Hub.GetMaxInstanceCount(); + Obj << "instanceLimit" << m_Hub.GetInstanceLimit(); Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); }, HttpVerb::kGet); @@ -855,12 +154,6 @@ HttpHubService::~HttpHubService() { } -void -HttpHubService::SetUseJobObject(bool Enable) -{ - m_Impl->m_UseJobObject = Enable; -} - const char* HttpHubService::BaseUri() const { @@ -884,12 +177,15 @@ void HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId) { StorageServerInstance* Instance = nullptr; - if (!m_Impl->Find(ModuleId, &Instance)) + if (!m_Hub.Find(ModuleId, &Instance)) { Request.WriteResponse(HttpResponseCode::NotFound); return; } + // TODO: A separate http request for the modules/{moduleid}/deprovision endpoint can be called and deprovision the instance leaving us + // with a dangling pointer... + CbObjectWriter Obj; Obj << "moduleId" << Instance->GetModuleId(); Obj << "provisioned" << Instance->IsProvisioned(); @@ -900,7 +196,7 @@ void HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId) { StorageServerInstance* Instance = nullptr; - if (!m_Impl->Find(ModuleId, &Instance)) + if (!m_Hub.Find(ModuleId, &Instance)) { Request.WriteResponse(HttpResponseCode::NotFound); return; diff --git a/src/zenserver/hub/hubservice.h b/src/zenserver/hub/hubservice.h index ef24bba69..d08eeea2a 100644 --- a/src/zenserver/hub/hubservice.h +++ b/src/zenserver/hub/hubservice.h @@ -4,10 +4,10 @@ #include <zenhttp/httpserver.h> -#include "hydration.h" - namespace zen { +class Hub; + /** ZenServer Hub Service * * Manages a set of storage servers on the behalf of external clients. For @@ -17,7 +17,7 @@ namespace zen { class HttpHubService : public zen::HttpService { public: - HttpHubService(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir); + HttpHubService(Hub& Hub); ~HttpHubService(); HttpHubService(const HttpHubService&) = delete; @@ -28,19 +28,10 @@ public: void SetNotificationEndpoint(std::string_view UpstreamNotificationEndpoint, std::string_view InstanceId); - /** Enable or disable the use of a Windows Job Object for child process management. - * When enabled, all spawned child processes are assigned to a job object with - * JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, ensuring children are terminated if the hub - * crashes or is force-killed. Must be called before Initialize(). No-op on non-Windows. - */ - void SetUseJobObject(bool Enable); - private: HttpRequestRouter m_Router; - struct Impl; - - std::unique_ptr<Impl> m_Impl; + Hub& m_Hub; void HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId); void HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId); diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp index 52c17fe1a..0e78f8545 100644 --- a/src/zenserver/hub/hydration.cpp +++ b/src/zenserver/hub/hydration.cpp @@ -4,6 +4,7 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> +#include <zencore/logging.h> namespace zen { diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h index f86f2accf..95b5cfae0 100644 --- a/src/zenserver/hub/hydration.h +++ b/src/zenserver/hub/hydration.h @@ -2,7 +2,7 @@ #pragma once -#include "zenhubserver.h" +#include <filesystem> namespace zen { diff --git a/src/zenserver/hub/resourcemetrics.h b/src/zenserver/hub/resourcemetrics.h new file mode 100644 index 000000000..d9b44b603 --- /dev/null +++ b/src/zenserver/hub/resourcemetrics.h @@ -0,0 +1,15 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <cstdint> + +namespace zen { + +struct ResourceMetrics +{ + uint64_t DiskUsageBytes = 0; + uint64_t MemoryUsageBytes = 0; +}; + +} // namespace zen diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp new file mode 100644 index 000000000..f24379715 --- /dev/null +++ b/src/zenserver/hub/storageserverinstance.cpp @@ -0,0 +1,203 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "storageserverinstance.h" + +#include "hydration.h" + +#include <zencore/assertfmt.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> + +namespace zen { + +StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, + std::string_view ModuleId, + std::filesystem::path FileHydrationPath, + std::filesystem::path HydrationTempPath) +: m_ModuleId(ModuleId) +, m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer) +, m_HydrationPath(FileHydrationPath) +{ + m_BaseDir = RunEnvironment.CreateChildDir(ModuleId); + m_TempDir = HydrationTempPath / ModuleId; +} + +StorageServerInstance::~StorageServerInstance() +{ +} + +void +StorageServerInstance::SpawnServerProcess() +{ + ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); + + m_ServerInstance.SetServerExecutablePath(GetRunningExecutablePath()); + m_ServerInstance.SetDataDir(m_BaseDir); +#if ZEN_PLATFORM_WINDOWS + m_ServerInstance.SetJobObject(m_JobObject); +#endif + const uint16_t BasePort = m_ServerInstance.SpawnServerAndWaitUntilReady(); + + ZEN_DEBUG("Storage server instance for module '{}' started, listening on port {}", m_ModuleId, BasePort); + + m_ServerInstance.EnableShutdownOnDestroy(); +} + +void +StorageServerInstance::Provision() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + if (m_IsProvisioned) + { + ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId); + + return; + } + + if (m_IsHibernated) + { + WakeLocked(); + } + else + { + ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir); + + Hydrate(); + + SpawnServerProcess(); + } + + m_IsProvisioned = true; +} + +void +StorageServerInstance::Deprovision() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + if (!m_IsProvisioned) + { + ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned", m_ModuleId); + + return; + } + + ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId); + + m_ServerInstance.Shutdown(); + + Dehydrate(); + + m_IsProvisioned = false; +} + +void +StorageServerInstance::Hibernate() +{ + // Signal server to shut down, but keep data around for later wake + + RwLock::ExclusiveLockScope _(m_Lock); + + if (!m_IsProvisioned) + { + ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned", m_ModuleId); + + return; + } + + if (m_IsHibernated) + { + ZEN_WARN("Storage server instance for module '{}' is already hibernated", m_ModuleId); + + return; + } + + if (!m_ServerInstance.IsRunning()) + { + ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not running", m_ModuleId); + + // This is an unexpected state. Should consider the instance invalid? + + return; + } + + try + { + m_ServerInstance.Shutdown(); + + m_IsHibernated = true; + m_IsProvisioned = false; + + return; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", m_ModuleId, Ex.what()); + } +} + +void +StorageServerInstance::Wake() +{ + RwLock::ExclusiveLockScope _(m_Lock); + WakeLocked(); +} + +void +StorageServerInstance::WakeLocked() +{ + // Start server in-place using existing data + + if (!m_IsHibernated) + { + ZEN_WARN("Attempted to wake storage server instance for module '{}' which is not hibernated", m_ModuleId); + + return; + } + + ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); + + try + { + SpawnServerProcess(); + m_IsHibernated = false; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", m_ModuleId, Ex.what()); + + // TODO: this instance should be marked as invalid + } +} + +void +StorageServerInstance::Hydrate() +{ + HydrationConfig Config{.ServerStateDir = m_BaseDir, + .TempDir = m_TempDir, + .ModuleId = m_ModuleId, + .TargetSpecification = WideToUtf8(m_HydrationPath.native())}; + + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateFileHydrator(); + + Hydrator->Configure(Config); + Hydrator->Hydrate(); +} + +void +StorageServerInstance::Dehydrate() +{ + HydrationConfig Config{.ServerStateDir = m_BaseDir, + .TempDir = m_TempDir, + .ModuleId = m_ModuleId, + .TargetSpecification = WideToUtf8(m_HydrationPath.native())}; + + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateFileHydrator(); + + Hydrator->Configure(Config); + Hydrator->Dehydrate(); +} + +} // namespace zen diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h new file mode 100644 index 000000000..a2f3d25d7 --- /dev/null +++ b/src/zenserver/hub/storageserverinstance.h @@ -0,0 +1,68 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "resourcemetrics.h" + +#include <zenutil/zenserverprocess.h> + +#include <atomic> +#include <filesystem> + +namespace zen { + +/** + * Storage Server Instance + * + * This class manages the lifecycle of a storage server instance, and + * provides functions to query its state. There should be one instance + * per module ID. + */ +class StorageServerInstance +{ +public: + StorageServerInstance(ZenServerEnvironment& RunEnvironment, + std::string_view ModuleId, + std::filesystem::path FileHydrationPath, + std::filesystem::path HydrationTempPath); + ~StorageServerInstance(); + + void Provision(); + void Deprovision(); + + void Hibernate(); + void Wake(); + + const ResourceMetrics& GetResourceMetrics() const { return m_ResourceMetrics; } + + inline std::string_view GetModuleId() const { return m_ModuleId; } + inline bool IsProvisioned() const { return m_IsProvisioned.load(); } + + inline uint16_t GetBasePort() const { return m_ServerInstance.GetBasePort(); } + +#if ZEN_PLATFORM_WINDOWS + void SetJobObject(JobObject* InJobObject) { m_JobObject = InJobObject; } +#endif + +private: + void WakeLocked(); + RwLock m_Lock; + std::string m_ModuleId; + std::atomic<bool> m_IsProvisioned{false}; + std::atomic<bool> m_IsHibernated{false}; + ZenServerInstance m_ServerInstance; + std::filesystem::path m_BaseDir; + std::filesystem::path m_TempDir; + std::filesystem::path m_HydrationPath; + ResourceMetrics m_ResourceMetrics; +#if ZEN_PLATFORM_WINDOWS + JobObject* m_JobObject = nullptr; +#endif + + void SpawnServerProcess(); + + void Hydrate(); + void Dehydrate(); +}; + +} // namespace zen diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index c6d2dc8d4..64ab0bd9e 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -1,6 +1,9 @@ // Copyright Epic Games, Inc. All Rights Reserved. #include "zenhubserver.h" + +#include "frontend/frontend.h" +#include "hub.h" #include "hubservice.h" #include <zencore/fmtutils.h> @@ -9,7 +12,6 @@ #include <zencore/memory/tagtrace.h> #include <zencore/scopeguard.h> #include <zencore/sentryintegration.h> -#include <zencore/system.h> #include <zencore/windows.h> #include <zenhttp/httpapiservice.h> #include <zenutil/service.h> @@ -116,11 +118,15 @@ ZenHubServer::Cleanup() } ShutdownServices(); - if (m_Http) { m_Http->Close(); } + + m_FrontendService.reset(); + m_HubService.reset(); + m_ApiService.reset(); + m_Hub.reset(); } catch (const std::exception& Ex) { @@ -139,11 +145,15 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) { ZEN_UNUSED(ServerConfig); + ZEN_INFO("instantiating Hub"); + m_Hub = std::make_unique<Hub>( + Hub::Configuration{.HubBaseDir = ServerConfig.DataDir / "hub", .ChildBaseDir = ServerConfig.DataDir / "servers"}); + ZEN_INFO("instantiating API service"); m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http); ZEN_INFO("instantiating hub service"); - m_HubService = std::make_unique<HttpHubService>(ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers"); + m_HubService = std::make_unique<HttpHubService>(*m_Hub); m_HubService->SetNotificationEndpoint(ServerConfig.UpstreamNotificationEndpoint, ServerConfig.InstanceId); m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService); diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h index 4c56fdce5..a3eeb2ead 100644 --- a/src/zenserver/hub/zenhubserver.h +++ b/src/zenserver/hub/zenhubserver.h @@ -2,7 +2,6 @@ #pragma once -#include "frontend/frontend.h" #include "zenserver.h" namespace cxxopts { @@ -15,6 +14,7 @@ struct Options; namespace zen { class HttpApiService; +class HttpFrontendService; class HttpHubService; struct ZenHubServerConfig : public ZenServerConfig @@ -23,6 +23,8 @@ struct ZenHubServerConfig : public ZenServerConfig std::string InstanceId; // For use in notifications }; +class Hub; + struct ZenHubServerConfigurator : public ZenServerConfiguratorBase { ZenHubServerConfigurator(ZenHubServerConfig& ServerOptions) : ZenServerConfiguratorBase(ServerOptions), m_ServerOptions(ServerOptions) @@ -82,6 +84,8 @@ private: std::filesystem::path m_ContentRoot; bool m_DebugOptionForcedCrash = false; + std::unique_ptr<Hub> m_Hub; + std::unique_ptr<HttpHubService> m_HubService; std::unique_ptr<HttpApiService> m_ApiService; std::unique_ptr<HttpFrontendService> m_FrontendService; |