aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-10 09:40:21 +0100
committerGitHub Enterprise <[email protected]>2026-03-10 09:40:21 +0100
commit7a54bc0af3423923e30faf632c0862162e9af62d (patch)
tree50328fc2ed5de0ac1dd6fc8649dfcef730a56681 /src
parentMerge pull request #752 from ue-foundation/lm/restrict-content-type (diff)
downloadzen-7a54bc0af3423923e30faf632c0862162e9af62d.tar.xz
zen-7a54bc0af3423923e30faf632c0862162e9af62d.zip
hubservice refactor (#819)
* move Hub to separate class * move StorageServerInstance to separate files * refactor HttpHubService to not own Hub instance
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/hub/hub.cpp379
-rw-r--r--src/zenserver/hub/hub.h118
-rw-r--r--src/zenserver/hub/hubservice.cpp734
-rw-r--r--src/zenserver/hub/hubservice.h17
-rw-r--r--src/zenserver/hub/hydration.cpp1
-rw-r--r--src/zenserver/hub/hydration.h2
-rw-r--r--src/zenserver/hub/resourcemetrics.h15
-rw-r--r--src/zenserver/hub/storageserverinstance.cpp203
-rw-r--r--src/zenserver/hub/storageserverinstance.h68
-rw-r--r--src/zenserver/hub/zenhubserver.cpp16
-rw-r--r--src/zenserver/hub/zenhubserver.h6
11 files changed, 822 insertions, 737 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
new file mode 100644
index 000000000..f1cc86de6
--- /dev/null
+++ b/src/zenserver/hub/hub.cpp
@@ -0,0 +1,379 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "hub.h"
+
+#include "storageserverinstance.h"
+
+#include <zencore/assertfmt.h>
+#include <zencore/compactbinary.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <EASTL/fixed_vector.h>
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+///////////////////////////////////////////////////////////////////////////
+
+/**
+ * A timeline of events with sequence IDs and timestamps. Used to
+ * track significant events for broadcasting to listeners.
+ */
+class EventTimeline
+{
+public:
+ EventTimeline() { m_Events.reserve(1024); }
+
+ ~EventTimeline() {}
+
+ EventTimeline(const EventTimeline&) = delete;
+ EventTimeline& operator=(const EventTimeline&) = delete;
+
+ void RecordEvent(std::string_view EventTag, CbObject EventMetadata)
+ {
+ const uint64_t SequenceId = m_NextEventId++;
+ const auto Now = std::chrono::steady_clock::now();
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Events.emplace_back(SequenceId, EventTag, Now, std::move(EventMetadata));
+ }
+
+ struct EventRecord
+ {
+ uint64_t SequenceId;
+ std::string Tag;
+ std::chrono::steady_clock::time_point Timestamp;
+ CbObject EventMetadata;
+
+ EventRecord(uint64_t InSequenceId,
+ std::string_view InTag,
+ std::chrono::steady_clock::time_point InTimestamp,
+ CbObject InEventMetadata = CbObject())
+ : SequenceId(InSequenceId)
+ , Tag(InTag)
+ , Timestamp(InTimestamp)
+ , EventMetadata(InEventMetadata)
+ {
+ }
+ };
+
+ /**
+ * Iterate over events that have a SequenceId greater than SinceEventId
+ *
+ * @param Callback A callable that takes a const EventRecord&
+ * @param SinceEventId The SequenceId to compare against
+ */
+ void IterateEventsSince(auto&& Callback, uint64_t SinceEventId)
+ {
+ // Hold the lock for as short a time as possible
+ eastl::fixed_vector<EventRecord, 128> EventsToProcess;
+ m_Lock.WithSharedLock([&] {
+ for (auto& Event : m_Events)
+ {
+ if (Event.SequenceId > SinceEventId)
+ {
+ EventsToProcess.push_back(Event);
+ }
+ }
+ });
+
+ // Now invoke the callback outside the lock
+ for (auto& Event : EventsToProcess)
+ {
+ Callback(Event);
+ }
+ }
+
+ /**
+ * Trim events up to (and including) the given SequenceId. Intended
+ * to be used for cleaning up events which are not longer interesting.
+ *
+ * @param UpToEventId The SequenceId up to which events should be removed
+ */
+ void TrimEventsUpTo(uint64_t UpToEventId)
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ auto It = std::remove_if(m_Events.begin(), m_Events.end(), [UpToEventId](const EventRecord& Event) {
+ return Event.SequenceId <= UpToEventId;
+ });
+ m_Events.erase(It, m_Events.end());
+ }
+
+private:
+ std::atomic<uint64_t> m_NextEventId{0};
+
+ RwLock m_Lock;
+ std::vector<EventRecord> m_Events;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+Hub::Hub(const Configuration& Config) : m_UseJobObject(Config.UseJobObject)
+{
+ m_HostMetrics = GetSystemMetrics();
+ m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024;
+ m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024;
+
+ m_RunEnvironment.InitializeForHub(Config.HubBaseDir, Config.ChildBaseDir);
+
+ m_FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage");
+ ZEN_INFO("using file hydration path: '{}'", m_FileHydrationPath);
+
+ m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp");
+ ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath);
+
+ // This is necessary to ensure the hub assigns a distinct port range.
+ // We need to do this primarily because otherwise automated tests will
+ // fail as the test runner will create processes in the default range.
+ // We should probably make this configurable or dynamic for maximum
+ // flexibility, and to allow running multiple hubs on the same host if
+ // necessary.
+ m_RunEnvironment.SetNextPortNumber(21000);
+
+#if ZEN_PLATFORM_WINDOWS
+ if (m_UseJobObject)
+ {
+ m_JobObject.Initialize();
+ if (m_JobObject.IsValid())
+ {
+ ZEN_INFO("Job object initialized for hub service child process management");
+ }
+ else
+ {
+ ZEN_WARN("Failed to initialize job object; child processes will not be auto-terminated on hub crash");
+ }
+ }
+#endif
+}
+
+Hub::~Hub()
+{
+ try
+ {
+ ZEN_INFO("Hub service shutting down, deprovisioning any current instances");
+
+ m_Lock.WithExclusiveLock([this] {
+ for (auto& [ModuleId, Instance] : m_Instances)
+ {
+ Instance->Deprovision();
+ }
+ m_Instances.clear();
+ });
+ }
+ catch (const std::exception& e)
+ {
+ ZEN_WARN("Exception during hub service shutdown: {}", e.what());
+ }
+}
+
+bool
+Hub::Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason)
+{
+ StorageServerInstance* Instance = nullptr;
+ bool IsNewInstance = false;
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_Instances.find(std::string(ModuleId)); It == m_Instances.end())
+ {
+ std::string Reason;
+ if (!CanProvisionInstance(ModuleId, /* out */ Reason))
+ {
+ ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason);
+
+ OutReason = Reason;
+
+ return false;
+ }
+
+ IsNewInstance = true;
+ auto NewInstance =
+ std::make_unique<StorageServerInstance>(m_RunEnvironment, ModuleId, m_FileHydrationPath, m_HydrationTempPath);
+#if ZEN_PLATFORM_WINDOWS
+ if (m_JobObject.IsValid())
+ {
+ NewInstance->SetJobObject(&m_JobObject);
+ }
+#endif
+ Instance = NewInstance.get();
+ m_Instances.emplace(std::string(ModuleId), std::move(NewInstance));
+
+ ZEN_INFO("Created new storage server instance for module '{}'", ModuleId);
+ }
+ else
+ {
+ Instance = It->second.get();
+ }
+
+ m_ProvisioningModules.emplace(std::string(ModuleId));
+ }
+
+ ZEN_ASSERT(Instance != nullptr);
+
+ auto RemoveProvisioningModule = MakeGuard([&] {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_ProvisioningModules.erase(std::string(ModuleId));
+ });
+
+ // NOTE: this is done while not holding the lock, as provisioning may take time
+ // and we don't want to block other operations. We track which modules are being
+ // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision
+ // those modules while in this state.
+
+ UpdateStats();
+
+ try
+ {
+ Instance->Provision();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
+ if (IsNewInstance)
+ {
+ // Clean up
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Instances.erase(std::string(ModuleId));
+ }
+ return false;
+ }
+
+ OutInfo.Port = Instance->GetBasePort();
+
+ // TODO: base URI? Would need to know what host name / IP to use
+
+ return true;
+}
+
+bool
+Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
+{
+ std::unique_ptr<StorageServerInstance> Instance;
+
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (auto It = m_ProvisioningModules.find(ModuleId); It != m_ProvisioningModules.end())
+ {
+ OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId);
+
+ ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId);
+
+ return false;
+ }
+
+ if (auto It = m_Instances.find(ModuleId); It == m_Instances.end())
+ {
+ ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId);
+
+ // Not found, OutReason should be empty
+ return false;
+ }
+ else
+ {
+ Instance = std::move(It->second);
+ m_Instances.erase(It);
+ m_DeprovisioningModules.emplace(ModuleId);
+ }
+ }
+
+ // The module is deprovisioned outside the lock to avoid blocking other operations.
+ //
+ // To ensure that no new provisioning can occur while we're deprovisioning,
+ // we add the module ID to m_DeprovisioningModules and remove it once
+ // deprovisioning is complete.
+
+ auto _ = MakeGuard([&] {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_DeprovisioningModules.erase(ModuleId);
+ });
+
+ Instance->Deprovision();
+
+ return true;
+}
+
+bool
+Hub::Find(std::string_view ModuleId, StorageServerInstance** OutInstance)
+{
+ RwLock::SharedLockScope _(m_Lock);
+ if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end())
+ {
+ if (OutInstance)
+ {
+ *OutInstance = It->second.get();
+ }
+ return true;
+ }
+ else if (OutInstance)
+ {
+ *OutInstance = nullptr;
+ }
+ return false;
+}
+
+void
+Hub::EnumerateModules(std::function<void(StorageServerInstance&)> Callback)
+{
+ RwLock::SharedLockScope _(m_Lock);
+ for (auto& It : m_Instances)
+ {
+ Callback(*It.second);
+ }
+}
+
+int
+Hub::GetInstanceCount()
+{
+ RwLock::SharedLockScope _(m_Lock);
+ return gsl::narrow_cast<int>(m_Instances.size());
+}
+
+void
+Hub::UpdateCapacityMetrics()
+{
+ m_HostMetrics = GetSystemMetrics();
+
+ // Update per-instance metrics
+}
+
+void
+Hub::UpdateStats()
+{
+ m_Lock.WithSharedLock([this] { m_MaxInstanceCount = Max(m_MaxInstanceCount, gsl::narrow_cast<int>(m_Instances.size())); });
+}
+
+bool
+Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
+{
+ if (m_DeprovisioningModules.find(std::string(ModuleId)) != m_DeprovisioningModules.end())
+ {
+ OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId);
+
+ return false;
+ }
+
+ if (m_ProvisioningModules.find(std::string(ModuleId)) != m_ProvisioningModules.end())
+ {
+ OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId);
+
+ return false;
+ }
+
+ if (gsl::narrow_cast<int>(m_Instances.size()) >= m_InstanceLimit)
+ {
+ OutReason = fmt::format("instance limit exceeded ({})", m_InstanceLimit);
+
+ return false;
+ }
+
+ // TODO: handle additional resource metrics
+
+ return true;
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+} // namespace zen
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
new file mode 100644
index 000000000..caf8890ff
--- /dev/null
+++ b/src/zenserver/hub/hub.h
@@ -0,0 +1,118 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "resourcemetrics.h"
+
+#include <zencore/system.h>
+#include <zenutil/zenserverprocess.h>
+
+#include <filesystem>
+#include <functional>
+#include <memory>
+#include <unordered_map>
+#include <unordered_set>
+
+namespace zen {
+
+class StorageServerInstance;
+
+/**
+ * Hub
+ *
+ * Core logic for managing storage server instances on behalf of external clients.
+ */
+class Hub
+{
+public:
+ struct Configuration
+ {
+ std::filesystem::path HubBaseDir;
+ std::filesystem::path ChildBaseDir;
+
+ /** Enable or disable the use of a Windows Job Object for child process management.
+ * When enabled, all spawned child processes are assigned to a job object with
+ * JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, ensuring children are terminated if the hub
+ * crashes or is force-killed. Must be called before Initialize(). No-op on non-Windows.
+ */
+ bool UseJobObject = true;
+ };
+ Hub(const Configuration& Config);
+ ~Hub();
+
+ Hub(const Hub&) = delete;
+ Hub& operator=(const Hub&) = delete;
+
+ struct ProvisionedInstanceInfo
+ {
+ std::string BaseUri;
+ uint16_t Port;
+ };
+
+ /**
+ * Provision a storage server instance for the given module ID.
+ *
+ * @param ModuleId The ID of the module to provision.
+ * @param OutInfo If successful, information about the provisioned instance will be returned here.
+ * @param OutReason If unsuccessful, the reason will be returned here.
+ */
+ bool Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason);
+
+ /**
+ * Deprovision a storage server instance for the given module ID.
+ *
+ * @param ModuleId The ID of the module to deprovision.
+ * @param OutReason If unsuccessful, the reason will be returned here.
+ * @return true if the instance was found and deprovisioned, false otherwise.
+ */
+ bool Deprovision(const std::string& ModuleId, std::string& OutReason);
+
+ /**
+ * Find a storage server instance for the given module ID.
+ *
+ * Beware that as this returns a raw pointer to the instance, the caller must ensure
+ * that the instance is not deprovisioned while in use.
+ *
+ * @param ModuleId The ID of the module to find.
+ * @param OutInstance If found, the instance will be returned here.
+ * @return true if the instance was found, false otherwise.
+ */
+ bool Find(std::string_view ModuleId, StorageServerInstance** OutInstance = nullptr);
+
+ /**
+ * Enumerate all storage server instances.
+ *
+ * @param Callback The callback to invoke for each instance. Note that you should
+ * not do anything heavyweight in the callback as it is invoked while holding
+ * a shared lock.
+ */
+ void EnumerateModules(std::function<void(StorageServerInstance&)> Callback);
+
+ int GetInstanceCount();
+
+ int GetInstanceLimit() const { return m_InstanceLimit; }
+ int GetMaxInstanceCount() const { return m_MaxInstanceCount; }
+
+private:
+ ZenServerEnvironment m_RunEnvironment;
+ std::filesystem::path m_FileHydrationPath;
+ std::filesystem::path m_HydrationTempPath;
+#if ZEN_PLATFORM_WINDOWS
+ JobObject m_JobObject;
+#endif
+ RwLock m_Lock;
+ std::unordered_map<std::string, std::unique_ptr<StorageServerInstance>> m_Instances;
+ std::unordered_set<std::string> m_DeprovisioningModules;
+ std::unordered_set<std::string> m_ProvisioningModules;
+ int m_MaxInstanceCount = 0;
+ int m_InstanceLimit = 1000;
+ ResourceMetrics m_ResourceLimits;
+ SystemMetrics m_HostMetrics;
+ bool m_UseJobObject = true;
+
+ void UpdateStats();
+ void UpdateCapacityMetrics();
+ bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason);
+};
+
+} // namespace zen
diff --git a/src/zenserver/hub/hubservice.cpp b/src/zenserver/hub/hubservice.cpp
index 7b999ae20..6765340dc 100644
--- a/src/zenserver/hub/hubservice.cpp
+++ b/src/zenserver/hub/hubservice.cpp
@@ -2,720 +2,19 @@
#include "hubservice.h"
-#include "hydration.h"
+#include "hub.h"
+#include "storageserverinstance.h"
-#include <zencore/assertfmt.h>
#include <zencore/compactbinarybuilder.h>
-#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
-#include <zencore/process.h>
-#include <zencore/scopeguard.h>
-#include <zencore/system.h>
-#include <zenutil/zenserverprocess.h>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <EASTL/fixed_vector.h>
-#include <asio.hpp>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-#include <unordered_map>
-#include <unordered_set>
namespace zen {
-///////////////////////////////////////////////////////////////////////////
-
-/**
- * A timeline of events with sequence IDs and timestamps. Used to
- * track significant events for broadcasting to listeners.
- */
-class EventTimeline
-{
-public:
- EventTimeline() { m_Events.reserve(1024); }
-
- ~EventTimeline() {}
-
- EventTimeline(const EventTimeline&) = delete;
- EventTimeline& operator=(const EventTimeline&) = delete;
-
- void RecordEvent(std::string_view EventTag, CbObject EventMetadata)
- {
- const uint64_t SequenceId = m_NextEventId++;
- const auto Now = std::chrono::steady_clock::now();
- RwLock::ExclusiveLockScope _(m_Lock);
- m_Events.emplace_back(SequenceId, EventTag, Now, std::move(EventMetadata));
- }
-
- struct EventRecord
- {
- uint64_t SequenceId;
- std::string Tag;
- std::chrono::steady_clock::time_point Timestamp;
- CbObject EventMetadata;
-
- EventRecord(uint64_t InSequenceId,
- std::string_view InTag,
- std::chrono::steady_clock::time_point InTimestamp,
- CbObject InEventMetadata = CbObject())
- : SequenceId(InSequenceId)
- , Tag(InTag)
- , Timestamp(InTimestamp)
- , EventMetadata(InEventMetadata)
- {
- }
- };
-
- /**
- * Iterate over events that have a SequenceId greater than SinceEventId
- *
- * @param Callback A callable that takes a const EventRecord&
- * @param SinceEventId The SequenceId to compare against
- */
- void IterateEventsSince(auto&& Callback, uint64_t SinceEventId)
- {
- // Hold the lock for as short a time as possible
- eastl::fixed_vector<EventRecord, 128> EventsToProcess;
- m_Lock.WithSharedLock([&] {
- for (auto& Event : m_Events)
- {
- if (Event.SequenceId > SinceEventId)
- {
- EventsToProcess.push_back(Event);
- }
- }
- });
-
- // Now invoke the callback outside the lock
- for (auto& Event : EventsToProcess)
- {
- Callback(Event);
- }
- }
-
- /**
- * Trim events up to (and including) the given SequenceId. Intended
- * to be used for cleaning up events which are not longer interesting.
- *
- * @param UpToEventId The SequenceId up to which events should be removed
- */
- void TrimEventsUpTo(uint64_t UpToEventId)
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- auto It = std::remove_if(m_Events.begin(), m_Events.end(), [UpToEventId](const EventRecord& Event) {
- return Event.SequenceId <= UpToEventId;
- });
- m_Events.erase(It, m_Events.end());
- }
-
-private:
- std::atomic<uint64_t> m_NextEventId{0};
-
- RwLock m_Lock;
- std::vector<EventRecord> m_Events;
-};
-
-//////////////////////////////////////////////////////////////////////////
-
-struct ResourceMetrics
-{
- uint64_t DiskUsageBytes = 0;
- uint64_t MemoryUsageBytes = 0;
-};
-
-/**
- * Storage Server Instance
- *
- * This class manages the lifecycle of a storage server instance, and
- * provides functions to query its state. There should be one instance
- * per module ID.
- */
-struct StorageServerInstance
-{
- StorageServerInstance(ZenServerEnvironment& RunEnvironment,
- std::string_view ModuleId,
- std::filesystem::path FileHydrationPath,
- std::filesystem::path HydrationTempPath);
- ~StorageServerInstance();
-
- void Provision();
- void Deprovision();
-
- void Hibernate();
- void Wake();
-
- const ResourceMetrics& GetResourceMetrics() const { return m_ResourceMetrics; }
-
- inline std::string_view GetModuleId() const { return m_ModuleId; }
- inline bool IsProvisioned() const { return m_IsProvisioned.load(); }
-
- inline uint16_t GetBasePort() const { return m_ServerInstance.GetBasePort(); }
-
-#if ZEN_PLATFORM_WINDOWS
- void SetJobObject(JobObject* InJobObject) { m_JobObject = InJobObject; }
-#endif
-
-private:
- void WakeLocked();
- RwLock m_Lock;
- std::string m_ModuleId;
- std::atomic<bool> m_IsProvisioned{false};
- std::atomic<bool> m_IsHibernated{false};
- ZenServerInstance m_ServerInstance;
- std::filesystem::path m_BaseDir;
- std::filesystem::path m_TempDir;
- std::filesystem::path m_HydrationPath;
- ResourceMetrics m_ResourceMetrics;
-#if ZEN_PLATFORM_WINDOWS
- JobObject* m_JobObject = nullptr;
-#endif
-
- void SpawnServerProcess();
-
- void Hydrate();
- void Dehydrate();
-};
-
-StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment,
- std::string_view ModuleId,
- std::filesystem::path FileHydrationPath,
- std::filesystem::path HydrationTempPath)
-: m_ModuleId(ModuleId)
-, m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer)
-, m_HydrationPath(FileHydrationPath)
-{
- m_BaseDir = RunEnvironment.CreateChildDir(ModuleId);
- m_TempDir = HydrationTempPath / ModuleId;
-}
-
-StorageServerInstance::~StorageServerInstance()
-{
-}
-
-void
-StorageServerInstance::SpawnServerProcess()
-{
- ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId);
-
- m_ServerInstance.SetServerExecutablePath(GetRunningExecutablePath());
- m_ServerInstance.SetDataDir(m_BaseDir);
-#if ZEN_PLATFORM_WINDOWS
- m_ServerInstance.SetJobObject(m_JobObject);
-#endif
- const uint16_t BasePort = m_ServerInstance.SpawnServerAndWaitUntilReady();
-
- ZEN_DEBUG("Storage server instance for module '{}' started, listening on port {}", m_ModuleId, BasePort);
-
- m_ServerInstance.EnableShutdownOnDestroy();
-}
-
-void
-StorageServerInstance::Provision()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (m_IsProvisioned)
- {
- ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId);
-
- return;
- }
-
- if (m_IsHibernated)
- {
- WakeLocked();
- }
- else
- {
- ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir);
-
- Hydrate();
-
- SpawnServerProcess();
- }
-
- m_IsProvisioned = true;
-}
-
-void
-StorageServerInstance::Deprovision()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (!m_IsProvisioned)
- {
- ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned", m_ModuleId);
-
- return;
- }
-
- ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId);
-
- m_ServerInstance.Shutdown();
-
- Dehydrate();
-
- m_IsProvisioned = false;
-}
-
-void
-StorageServerInstance::Hibernate()
-{
- // Signal server to shut down, but keep data around for later wake
-
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (!m_IsProvisioned)
- {
- ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned", m_ModuleId);
-
- return;
- }
-
- if (m_IsHibernated)
- {
- ZEN_WARN("Storage server instance for module '{}' is already hibernated", m_ModuleId);
-
- return;
- }
-
- if (!m_ServerInstance.IsRunning())
- {
- ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not running", m_ModuleId);
-
- // This is an unexpected state. Should consider the instance invalid?
-
- return;
- }
-
- try
- {
- m_ServerInstance.Shutdown();
-
- m_IsHibernated = true;
- m_IsProvisioned = false;
-
- return;
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", m_ModuleId, Ex.what());
- }
-}
-
-void
-StorageServerInstance::Wake()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
- WakeLocked();
-}
-
-void
-StorageServerInstance::WakeLocked()
-{
- // Start server in-place using existing data
-
- if (!m_IsHibernated)
- {
- ZEN_WARN("Attempted to wake storage server instance for module '{}' which is not hibernated", m_ModuleId);
-
- return;
- }
-
- ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId);
-
- try
- {
- SpawnServerProcess();
- m_IsHibernated = false;
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", m_ModuleId, Ex.what());
-
- // TODO: this instance should be marked as invalid
- }
-}
-
-void
-StorageServerInstance::Hydrate()
-{
- HydrationConfig Config{.ServerStateDir = m_BaseDir,
- .TempDir = m_TempDir,
- .ModuleId = m_ModuleId,
- .TargetSpecification = WideToUtf8(m_HydrationPath.native())};
-
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateFileHydrator();
-
- Hydrator->Configure(Config);
- Hydrator->Hydrate();
-}
-
-void
-StorageServerInstance::Dehydrate()
-{
- HydrationConfig Config{.ServerStateDir = m_BaseDir,
- .TempDir = m_TempDir,
- .ModuleId = m_ModuleId,
- .TargetSpecification = WideToUtf8(m_HydrationPath.native())};
-
- std::unique_ptr<HydrationStrategyBase> Hydrator = CreateFileHydrator();
-
- Hydrator->Configure(Config);
- Hydrator->Dehydrate();
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-struct HttpHubService::Impl
-{
- Impl(const Impl&) = delete;
- Impl& operator=(const Impl&) = delete;
-
- Impl();
- ~Impl();
-
- void Initialize(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir)
- {
- m_RunEnvironment.InitializeForHub(HubBaseDir, ChildBaseDir);
- m_FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage");
- ZEN_INFO("using file hydration path: '{}'", m_FileHydrationPath);
-
- m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp");
- ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath);
-
- // This is necessary to ensure the hub assigns a distinct port range.
- // We need to do this primarily because otherwise automated tests will
- // fail as the test runner will create processes in the default range.
- // We should probably make this configurable or dynamic for maximum
- // flexibility, and to allow running multiple hubs on the same host if
- // necessary.
- m_RunEnvironment.SetNextPortNumber(21000);
-
-#if ZEN_PLATFORM_WINDOWS
- if (m_UseJobObject)
- {
- m_JobObject.Initialize();
- if (m_JobObject.IsValid())
- {
- ZEN_INFO("Job object initialized for hub service child process management");
- }
- else
- {
- ZEN_WARN("Failed to initialize job object; child processes will not be auto-terminated on hub crash");
- }
- }
-#endif
- }
-
- void Cleanup()
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_Instances.clear();
- }
-
- struct ProvisionedInstanceInfo
- {
- std::string BaseUri;
- uint16_t Port;
- };
-
- /**
- * Provision a storage server instance for the given module ID.
- *
- * @param ModuleId The ID of the module to provision.
- * @param OutInfo If successful, information about the provisioned instance will be returned here.
- * @param OutReason If unsuccessful, the reason will be returned here.
- */
- bool Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason)
- {
- StorageServerInstance* Instance = nullptr;
- bool IsNewInstance = false;
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- if (auto It = m_Instances.find(std::string(ModuleId)); It == m_Instances.end())
- {
- std::string Reason;
- if (!CanProvisionInstance(ModuleId, /* out */ Reason))
- {
- ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason);
-
- OutReason = Reason;
-
- return false;
- }
-
- IsNewInstance = true;
- auto NewInstance =
- std::make_unique<StorageServerInstance>(m_RunEnvironment, ModuleId, m_FileHydrationPath, m_HydrationTempPath);
-#if ZEN_PLATFORM_WINDOWS
- if (m_JobObject.IsValid())
- {
- NewInstance->SetJobObject(&m_JobObject);
- }
-#endif
- Instance = NewInstance.get();
- m_Instances.emplace(std::string(ModuleId), std::move(NewInstance));
-
- ZEN_INFO("Created new storage server instance for module '{}'", ModuleId);
- }
- else
- {
- Instance = It->second.get();
- }
-
- m_ProvisioningModules.emplace(std::string(ModuleId));
- }
-
- ZEN_ASSERT(Instance != nullptr);
-
- auto RemoveProvisioningModule = MakeGuard([&] {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_ProvisioningModules.erase(std::string(ModuleId));
- });
-
- // NOTE: this is done while not holding the lock, as provisioning may take time
- // and we don't want to block other operations. We track which modules are being
- // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision
- // those modules while in this state.
-
- UpdateStats();
-
- try
- {
- Instance->Provision();
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
- if (IsNewInstance)
- {
- // Clean up
- RwLock::ExclusiveLockScope _(m_Lock);
- m_Instances.erase(std::string(ModuleId));
- }
- return false;
- }
-
- OutInfo.Port = Instance->GetBasePort();
-
- // TODO: base URI? Would need to know what host name / IP to use
-
- return true;
- }
-
- /**
- * Deprovision a storage server instance for the given module ID.
- *
- * @param ModuleId The ID of the module to deprovision.
- * @param OutReason If unsuccessful, the reason will be returned here.
- * @return true if the instance was found and deprovisioned, false otherwise.
- */
- bool Deprovision(const std::string& ModuleId, std::string& OutReason)
- {
- std::unique_ptr<StorageServerInstance> Instance;
-
- {
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (auto It = m_ProvisioningModules.find(ModuleId); It != m_ProvisioningModules.end())
- {
- OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId);
-
- ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId);
-
- return false;
- }
-
- if (auto It = m_Instances.find(ModuleId); It == m_Instances.end())
- {
- ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId);
-
- // Not found, OutReason should be empty
- return false;
- }
- else
- {
- Instance = std::move(It->second);
- m_Instances.erase(It);
- m_DeprovisioningModules.emplace(ModuleId);
- }
- }
-
- // The module is deprovisioned outside the lock to avoid blocking other operations.
- //
- // To ensure that no new provisioning can occur while we're deprovisioning,
- // we add the module ID to m_DeprovisioningModules and remove it once
- // deprovisioning is complete.
-
- auto _ = MakeGuard([&] {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_DeprovisioningModules.erase(ModuleId);
- });
-
- Instance->Deprovision();
-
- return true;
- }
-
- /**
- * Find a storage server instance for the given module ID.
- *
- * Beware that as this returns a raw pointer to the instance, the caller must ensure
- * that the instance is not deprovisioned while in use.
- *
- * @param ModuleId The ID of the module to find.
- * @param OutInstance If found, the instance will be returned here.
- * @return true if the instance was found, false otherwise.
- */
- bool Find(std::string_view ModuleId, StorageServerInstance** OutInstance = nullptr)
- {
- RwLock::SharedLockScope _(m_Lock);
- if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end())
- {
- if (OutInstance)
- {
- *OutInstance = It->second.get();
- }
- return true;
- }
- else if (OutInstance)
- {
- *OutInstance = nullptr;
- }
- return false;
- }
-
- /**
- * Enumerate all storage server instances.
- *
- * @param Callback The callback to invoke for each instance. Note that you should
- * not do anything heavyweight in the callback as it is invoked while holding
- * a shared lock.
- */
- void EnumerateModules(auto&& Callback)
- {
- RwLock::SharedLockScope _(m_Lock);
- for (auto& It : m_Instances)
- {
- Callback(*It.second);
- }
- }
-
- int GetInstanceCount()
- {
- RwLock::SharedLockScope _(m_Lock);
- return gsl::narrow_cast<int>(m_Instances.size());
- }
-
- inline int GetInstanceLimit() { return m_InstanceLimit; }
- inline int GetMaxInstanceCount() { return m_MaxInstanceCount; }
-
- bool m_UseJobObject = true;
-
-private:
- ZenServerEnvironment m_RunEnvironment;
- std::filesystem::path m_FileHydrationPath;
- std::filesystem::path m_HydrationTempPath;
-#if ZEN_PLATFORM_WINDOWS
- JobObject m_JobObject;
-#endif
- RwLock m_Lock;
- std::unordered_map<std::string, std::unique_ptr<StorageServerInstance>> m_Instances;
- std::unordered_set<std::string> m_DeprovisioningModules;
- std::unordered_set<std::string> m_ProvisioningModules;
- int m_MaxInstanceCount = 0;
- void UpdateStats();
-
- // Capacity tracking
-
- int m_InstanceLimit = 1000;
- ResourceMetrics m_ResourceLimits;
- SystemMetrics m_HostMetrics;
-
- void UpdateCapacityMetrics();
- bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason);
-};
-
-HttpHubService::Impl::Impl()
-{
- m_HostMetrics = zen::GetSystemMetrics();
- m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024;
- m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024;
-}
-
-HttpHubService::Impl::~Impl()
-{
- try
- {
- ZEN_INFO("Hub service shutting down, deprovisioning any current instances");
-
- m_Lock.WithExclusiveLock([this] {
- for (auto& [ModuleId, Instance] : m_Instances)
- {
- Instance->Deprovision();
- }
- m_Instances.clear();
- });
- }
- catch (const std::exception& e)
- {
- ZEN_WARN("Exception during hub service shutdown: {}", e.what());
- }
-}
-
-void
-HttpHubService::Impl::UpdateCapacityMetrics()
-{
- m_HostMetrics = zen::GetSystemMetrics();
-
- // Update per-instance metrics
-}
-
-void
-HttpHubService::Impl::UpdateStats()
-{
- m_Lock.WithSharedLock([this] { m_MaxInstanceCount = Max(m_MaxInstanceCount, gsl::narrow_cast<int>(m_Instances.size())); });
-}
-
-bool
-HttpHubService::Impl::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
-{
- if (m_DeprovisioningModules.find(std::string(ModuleId)) != m_DeprovisioningModules.end())
- {
- OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId);
-
- return false;
- }
-
- if (m_ProvisioningModules.find(std::string(ModuleId)) != m_ProvisioningModules.end())
- {
- OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId);
-
- return false;
- }
-
- if (gsl::narrow_cast<int>(m_Instances.size()) >= m_InstanceLimit)
- {
- OutReason = fmt::format("instance limit exceeded ({})", m_InstanceLimit);
-
- return false;
- }
-
- // TODO: handle additional resource metrics
-
- return true;
-}
-
-///////////////////////////////////////////////////////////////////////////
-
-HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir) : m_Impl(std::make_unique<Impl>())
+HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub)
{
using namespace std::literals;
- m_Impl->Initialize(HubBaseDir, ChildBaseDir);
-
m_Router.AddMatcher("moduleid", [](std::string_view Str) -> bool {
for (const auto C : Str)
{
@@ -738,7 +37,7 @@ HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem
[this](HttpRouterRequest& Req) {
CbObjectWriter Obj;
Obj.BeginArray("modules");
- m_Impl->EnumerateModules([&Obj](StorageServerInstance& Instance) {
+ m_Hub.EnumerateModules([&Obj](StorageServerInstance& Instance) {
Obj.BeginObject();
Obj << "moduleId" << Instance.GetModuleId();
Obj << "provisioned" << Instance.IsProvisioned();
@@ -775,8 +74,8 @@ HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem
try
{
- Impl::ProvisionedInstanceInfo Info;
- if (m_Impl->Provision(ModuleId, /* out */ Info, /* out */ FailureReason))
+ Hub::ProvisionedInstanceInfo Info;
+ if (m_Hub.Provision(ModuleId, /* out */ Info, /* out */ FailureReason))
{
CbObjectWriter Obj;
Obj << "moduleId" << ModuleId;
@@ -811,7 +110,7 @@ HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem
try
{
- if (!m_Impl->Deprovision(std::string(ModuleId), /* out */ FailureReason))
+ if (!m_Hub.Deprovision(std::string(ModuleId), /* out */ FailureReason))
{
if (FailureReason.empty())
{
@@ -843,9 +142,9 @@ HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem
"stats",
[this](HttpRouterRequest& Req) {
CbObjectWriter Obj;
- Obj << "currentInstanceCount" << m_Impl->GetInstanceCount();
- Obj << "maxInstanceCount" << m_Impl->GetMaxInstanceCount();
- Obj << "instanceLimit" << m_Impl->GetInstanceLimit();
+ Obj << "currentInstanceCount" << m_Hub.GetInstanceCount();
+ Obj << "maxInstanceCount" << m_Hub.GetMaxInstanceCount();
+ Obj << "instanceLimit" << m_Hub.GetInstanceLimit();
Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
},
HttpVerb::kGet);
@@ -855,12 +154,6 @@ HttpHubService::~HttpHubService()
{
}
-void
-HttpHubService::SetUseJobObject(bool Enable)
-{
- m_Impl->m_UseJobObject = Enable;
-}
-
const char*
HttpHubService::BaseUri() const
{
@@ -884,12 +177,15 @@ void
HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId)
{
StorageServerInstance* Instance = nullptr;
- if (!m_Impl->Find(ModuleId, &Instance))
+ if (!m_Hub.Find(ModuleId, &Instance))
{
Request.WriteResponse(HttpResponseCode::NotFound);
return;
}
+ // TODO: A separate http request for the modules/{moduleid}/deprovision endpoint can be called and deprovision the instance leaving us
+ // with a dangling pointer...
+
CbObjectWriter Obj;
Obj << "moduleId" << Instance->GetModuleId();
Obj << "provisioned" << Instance->IsProvisioned();
@@ -900,7 +196,7 @@ void
HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId)
{
StorageServerInstance* Instance = nullptr;
- if (!m_Impl->Find(ModuleId, &Instance))
+ if (!m_Hub.Find(ModuleId, &Instance))
{
Request.WriteResponse(HttpResponseCode::NotFound);
return;
diff --git a/src/zenserver/hub/hubservice.h b/src/zenserver/hub/hubservice.h
index ef24bba69..d08eeea2a 100644
--- a/src/zenserver/hub/hubservice.h
+++ b/src/zenserver/hub/hubservice.h
@@ -4,10 +4,10 @@
#include <zenhttp/httpserver.h>
-#include "hydration.h"
-
namespace zen {
+class Hub;
+
/** ZenServer Hub Service
*
* Manages a set of storage servers on the behalf of external clients. For
@@ -17,7 +17,7 @@ namespace zen {
class HttpHubService : public zen::HttpService
{
public:
- HttpHubService(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir);
+ HttpHubService(Hub& Hub);
~HttpHubService();
HttpHubService(const HttpHubService&) = delete;
@@ -28,19 +28,10 @@ public:
void SetNotificationEndpoint(std::string_view UpstreamNotificationEndpoint, std::string_view InstanceId);
- /** Enable or disable the use of a Windows Job Object for child process management.
- * When enabled, all spawned child processes are assigned to a job object with
- * JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, ensuring children are terminated if the hub
- * crashes or is force-killed. Must be called before Initialize(). No-op on non-Windows.
- */
- void SetUseJobObject(bool Enable);
-
private:
HttpRequestRouter m_Router;
- struct Impl;
-
- std::unique_ptr<Impl> m_Impl;
+ Hub& m_Hub;
void HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId);
void HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId);
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
index 52c17fe1a..0e78f8545 100644
--- a/src/zenserver/hub/hydration.cpp
+++ b/src/zenserver/hub/hydration.cpp
@@ -4,6 +4,7 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
namespace zen {
diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h
index f86f2accf..95b5cfae0 100644
--- a/src/zenserver/hub/hydration.h
+++ b/src/zenserver/hub/hydration.h
@@ -2,7 +2,7 @@
#pragma once
-#include "zenhubserver.h"
+#include <filesystem>
namespace zen {
diff --git a/src/zenserver/hub/resourcemetrics.h b/src/zenserver/hub/resourcemetrics.h
new file mode 100644
index 000000000..d9b44b603
--- /dev/null
+++ b/src/zenserver/hub/resourcemetrics.h
@@ -0,0 +1,15 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <cstdint>
+
+namespace zen {
+
+struct ResourceMetrics
+{
+ uint64_t DiskUsageBytes = 0;
+ uint64_t MemoryUsageBytes = 0;
+};
+
+} // namespace zen
diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp
new file mode 100644
index 000000000..f24379715
--- /dev/null
+++ b/src/zenserver/hub/storageserverinstance.cpp
@@ -0,0 +1,203 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "storageserverinstance.h"
+
+#include "hydration.h"
+
+#include <zencore/assertfmt.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+
+namespace zen {
+
+StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment,
+ std::string_view ModuleId,
+ std::filesystem::path FileHydrationPath,
+ std::filesystem::path HydrationTempPath)
+: m_ModuleId(ModuleId)
+, m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer)
+, m_HydrationPath(FileHydrationPath)
+{
+ m_BaseDir = RunEnvironment.CreateChildDir(ModuleId);
+ m_TempDir = HydrationTempPath / ModuleId;
+}
+
+StorageServerInstance::~StorageServerInstance()
+{
+}
+
+void
+StorageServerInstance::SpawnServerProcess()
+{
+ ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId);
+
+ m_ServerInstance.SetServerExecutablePath(GetRunningExecutablePath());
+ m_ServerInstance.SetDataDir(m_BaseDir);
+#if ZEN_PLATFORM_WINDOWS
+ m_ServerInstance.SetJobObject(m_JobObject);
+#endif
+ const uint16_t BasePort = m_ServerInstance.SpawnServerAndWaitUntilReady();
+
+ ZEN_DEBUG("Storage server instance for module '{}' started, listening on port {}", m_ModuleId, BasePort);
+
+ m_ServerInstance.EnableShutdownOnDestroy();
+}
+
+void
+StorageServerInstance::Provision()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (m_IsProvisioned)
+ {
+ ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId);
+
+ return;
+ }
+
+ if (m_IsHibernated)
+ {
+ WakeLocked();
+ }
+ else
+ {
+ ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir);
+
+ Hydrate();
+
+ SpawnServerProcess();
+ }
+
+ m_IsProvisioned = true;
+}
+
+void
+StorageServerInstance::Deprovision()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (!m_IsProvisioned)
+ {
+ ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned", m_ModuleId);
+
+ return;
+ }
+
+ ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId);
+
+ m_ServerInstance.Shutdown();
+
+ Dehydrate();
+
+ m_IsProvisioned = false;
+}
+
+void
+StorageServerInstance::Hibernate()
+{
+ // Signal server to shut down, but keep data around for later wake
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (!m_IsProvisioned)
+ {
+ ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned", m_ModuleId);
+
+ return;
+ }
+
+ if (m_IsHibernated)
+ {
+ ZEN_WARN("Storage server instance for module '{}' is already hibernated", m_ModuleId);
+
+ return;
+ }
+
+ if (!m_ServerInstance.IsRunning())
+ {
+ ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not running", m_ModuleId);
+
+ // This is an unexpected state. Should consider the instance invalid?
+
+ return;
+ }
+
+ try
+ {
+ m_ServerInstance.Shutdown();
+
+ m_IsHibernated = true;
+ m_IsProvisioned = false;
+
+ return;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", m_ModuleId, Ex.what());
+ }
+}
+
+void
+StorageServerInstance::Wake()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ WakeLocked();
+}
+
+void
+StorageServerInstance::WakeLocked()
+{
+ // Start server in-place using existing data
+
+ if (!m_IsHibernated)
+ {
+ ZEN_WARN("Attempted to wake storage server instance for module '{}' which is not hibernated", m_ModuleId);
+
+ return;
+ }
+
+ ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId);
+
+ try
+ {
+ SpawnServerProcess();
+ m_IsHibernated = false;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", m_ModuleId, Ex.what());
+
+ // TODO: this instance should be marked as invalid
+ }
+}
+
+void
+StorageServerInstance::Hydrate()
+{
+ HydrationConfig Config{.ServerStateDir = m_BaseDir,
+ .TempDir = m_TempDir,
+ .ModuleId = m_ModuleId,
+ .TargetSpecification = WideToUtf8(m_HydrationPath.native())};
+
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateFileHydrator();
+
+ Hydrator->Configure(Config);
+ Hydrator->Hydrate();
+}
+
+void
+StorageServerInstance::Dehydrate()
+{
+ HydrationConfig Config{.ServerStateDir = m_BaseDir,
+ .TempDir = m_TempDir,
+ .ModuleId = m_ModuleId,
+ .TargetSpecification = WideToUtf8(m_HydrationPath.native())};
+
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateFileHydrator();
+
+ Hydrator->Configure(Config);
+ Hydrator->Dehydrate();
+}
+
+} // namespace zen
diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h
new file mode 100644
index 000000000..a2f3d25d7
--- /dev/null
+++ b/src/zenserver/hub/storageserverinstance.h
@@ -0,0 +1,68 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "resourcemetrics.h"
+
+#include <zenutil/zenserverprocess.h>
+
+#include <atomic>
+#include <filesystem>
+
+namespace zen {
+
+/**
+ * Storage Server Instance
+ *
+ * This class manages the lifecycle of a storage server instance, and
+ * provides functions to query its state. There should be one instance
+ * per module ID.
+ */
+class StorageServerInstance
+{
+public:
+ StorageServerInstance(ZenServerEnvironment& RunEnvironment,
+ std::string_view ModuleId,
+ std::filesystem::path FileHydrationPath,
+ std::filesystem::path HydrationTempPath);
+ ~StorageServerInstance();
+
+ void Provision();
+ void Deprovision();
+
+ void Hibernate();
+ void Wake();
+
+ const ResourceMetrics& GetResourceMetrics() const { return m_ResourceMetrics; }
+
+ inline std::string_view GetModuleId() const { return m_ModuleId; }
+ inline bool IsProvisioned() const { return m_IsProvisioned.load(); }
+
+ inline uint16_t GetBasePort() const { return m_ServerInstance.GetBasePort(); }
+
+#if ZEN_PLATFORM_WINDOWS
+ void SetJobObject(JobObject* InJobObject) { m_JobObject = InJobObject; }
+#endif
+
+private:
+ void WakeLocked();
+ RwLock m_Lock;
+ std::string m_ModuleId;
+ std::atomic<bool> m_IsProvisioned{false};
+ std::atomic<bool> m_IsHibernated{false};
+ ZenServerInstance m_ServerInstance;
+ std::filesystem::path m_BaseDir;
+ std::filesystem::path m_TempDir;
+ std::filesystem::path m_HydrationPath;
+ ResourceMetrics m_ResourceMetrics;
+#if ZEN_PLATFORM_WINDOWS
+ JobObject* m_JobObject = nullptr;
+#endif
+
+ void SpawnServerProcess();
+
+ void Hydrate();
+ void Dehydrate();
+};
+
+} // namespace zen
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index c6d2dc8d4..64ab0bd9e 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -1,6 +1,9 @@
// Copyright Epic Games, Inc. All Rights Reserved.
#include "zenhubserver.h"
+
+#include "frontend/frontend.h"
+#include "hub.h"
#include "hubservice.h"
#include <zencore/fmtutils.h>
@@ -9,7 +12,6 @@
#include <zencore/memory/tagtrace.h>
#include <zencore/scopeguard.h>
#include <zencore/sentryintegration.h>
-#include <zencore/system.h>
#include <zencore/windows.h>
#include <zenhttp/httpapiservice.h>
#include <zenutil/service.h>
@@ -116,11 +118,15 @@ ZenHubServer::Cleanup()
}
ShutdownServices();
-
if (m_Http)
{
m_Http->Close();
}
+
+ m_FrontendService.reset();
+ m_HubService.reset();
+ m_ApiService.reset();
+ m_Hub.reset();
}
catch (const std::exception& Ex)
{
@@ -139,11 +145,15 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
{
ZEN_UNUSED(ServerConfig);
+ ZEN_INFO("instantiating Hub");
+ m_Hub = std::make_unique<Hub>(
+ Hub::Configuration{.HubBaseDir = ServerConfig.DataDir / "hub", .ChildBaseDir = ServerConfig.DataDir / "servers"});
+
ZEN_INFO("instantiating API service");
m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http);
ZEN_INFO("instantiating hub service");
- m_HubService = std::make_unique<HttpHubService>(ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers");
+ m_HubService = std::make_unique<HttpHubService>(*m_Hub);
m_HubService->SetNotificationEndpoint(ServerConfig.UpstreamNotificationEndpoint, ServerConfig.InstanceId);
m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService);
diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h
index 4c56fdce5..a3eeb2ead 100644
--- a/src/zenserver/hub/zenhubserver.h
+++ b/src/zenserver/hub/zenhubserver.h
@@ -2,7 +2,6 @@
#pragma once
-#include "frontend/frontend.h"
#include "zenserver.h"
namespace cxxopts {
@@ -15,6 +14,7 @@ struct Options;
namespace zen {
class HttpApiService;
+class HttpFrontendService;
class HttpHubService;
struct ZenHubServerConfig : public ZenServerConfig
@@ -23,6 +23,8 @@ struct ZenHubServerConfig : public ZenServerConfig
std::string InstanceId; // For use in notifications
};
+class Hub;
+
struct ZenHubServerConfigurator : public ZenServerConfiguratorBase
{
ZenHubServerConfigurator(ZenHubServerConfig& ServerOptions) : ZenServerConfiguratorBase(ServerOptions), m_ServerOptions(ServerOptions)
@@ -82,6 +84,8 @@ private:
std::filesystem::path m_ContentRoot;
bool m_DebugOptionForcedCrash = false;
+ std::unique_ptr<Hub> m_Hub;
+
std::unique_ptr<HttpHubService> m_HubService;
std::unique_ptr<HttpApiService> m_ApiService;
std::unique_ptr<HttpFrontendService> m_FrontendService;