aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub/hub.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-10 09:40:21 +0100
committerGitHub Enterprise <[email protected]>2026-03-10 09:40:21 +0100
commit7a54bc0af3423923e30faf632c0862162e9af62d (patch)
tree50328fc2ed5de0ac1dd6fc8649dfcef730a56681 /src/zenserver/hub/hub.cpp
parentMerge pull request #752 from ue-foundation/lm/restrict-content-type (diff)
downloadzen-7a54bc0af3423923e30faf632c0862162e9af62d.tar.xz
zen-7a54bc0af3423923e30faf632c0862162e9af62d.zip
hubservice refactor (#819)
* move Hub to separate class * move StorageServerInstance to separate files * refactor HttpHubService to not own Hub instance
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
-rw-r--r--src/zenserver/hub/hub.cpp379
1 files changed, 379 insertions, 0 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
new file mode 100644
index 000000000..f1cc86de6
--- /dev/null
+++ b/src/zenserver/hub/hub.cpp
@@ -0,0 +1,379 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "hub.h"
+
+#include "storageserverinstance.h"
+
+#include <zencore/assertfmt.h>
+#include <zencore/compactbinary.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <EASTL/fixed_vector.h>
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+///////////////////////////////////////////////////////////////////////////
+
+/**
+ * A timeline of events with sequence IDs and timestamps. Used to
+ * track significant events for broadcasting to listeners.
+ */
+class EventTimeline
+{
+public:
+ EventTimeline() { m_Events.reserve(1024); }
+
+ ~EventTimeline() {}
+
+ EventTimeline(const EventTimeline&) = delete;
+ EventTimeline& operator=(const EventTimeline&) = delete;
+
+ void RecordEvent(std::string_view EventTag, CbObject EventMetadata)
+ {
+ const uint64_t SequenceId = m_NextEventId++;
+ const auto Now = std::chrono::steady_clock::now();
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Events.emplace_back(SequenceId, EventTag, Now, std::move(EventMetadata));
+ }
+
+ struct EventRecord
+ {
+ uint64_t SequenceId;
+ std::string Tag;
+ std::chrono::steady_clock::time_point Timestamp;
+ CbObject EventMetadata;
+
+ EventRecord(uint64_t InSequenceId,
+ std::string_view InTag,
+ std::chrono::steady_clock::time_point InTimestamp,
+ CbObject InEventMetadata = CbObject())
+ : SequenceId(InSequenceId)
+ , Tag(InTag)
+ , Timestamp(InTimestamp)
+ , EventMetadata(InEventMetadata)
+ {
+ }
+ };
+
+ /**
+ * Iterate over events that have a SequenceId greater than SinceEventId
+ *
+ * @param Callback A callable that takes a const EventRecord&
+ * @param SinceEventId The SequenceId to compare against
+ */
+ void IterateEventsSince(auto&& Callback, uint64_t SinceEventId)
+ {
+ // Hold the lock for as short a time as possible
+ eastl::fixed_vector<EventRecord, 128> EventsToProcess;
+ m_Lock.WithSharedLock([&] {
+ for (auto& Event : m_Events)
+ {
+ if (Event.SequenceId > SinceEventId)
+ {
+ EventsToProcess.push_back(Event);
+ }
+ }
+ });
+
+ // Now invoke the callback outside the lock
+ for (auto& Event : EventsToProcess)
+ {
+ Callback(Event);
+ }
+ }
+
+ /**
+ * Trim events up to (and including) the given SequenceId. Intended
+ * to be used for cleaning up events which are not longer interesting.
+ *
+ * @param UpToEventId The SequenceId up to which events should be removed
+ */
+ void TrimEventsUpTo(uint64_t UpToEventId)
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ auto It = std::remove_if(m_Events.begin(), m_Events.end(), [UpToEventId](const EventRecord& Event) {
+ return Event.SequenceId <= UpToEventId;
+ });
+ m_Events.erase(It, m_Events.end());
+ }
+
+private:
+ std::atomic<uint64_t> m_NextEventId{0};
+
+ RwLock m_Lock;
+ std::vector<EventRecord> m_Events;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+Hub::Hub(const Configuration& Config) : m_UseJobObject(Config.UseJobObject)
+{
+ m_HostMetrics = GetSystemMetrics();
+ m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024;
+ m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024;
+
+ m_RunEnvironment.InitializeForHub(Config.HubBaseDir, Config.ChildBaseDir);
+
+ m_FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage");
+ ZEN_INFO("using file hydration path: '{}'", m_FileHydrationPath);
+
+ m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp");
+ ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath);
+
+ // This is necessary to ensure the hub assigns a distinct port range.
+ // We need to do this primarily because otherwise automated tests will
+ // fail as the test runner will create processes in the default range.
+ // We should probably make this configurable or dynamic for maximum
+ // flexibility, and to allow running multiple hubs on the same host if
+ // necessary.
+ m_RunEnvironment.SetNextPortNumber(21000);
+
+#if ZEN_PLATFORM_WINDOWS
+ if (m_UseJobObject)
+ {
+ m_JobObject.Initialize();
+ if (m_JobObject.IsValid())
+ {
+ ZEN_INFO("Job object initialized for hub service child process management");
+ }
+ else
+ {
+ ZEN_WARN("Failed to initialize job object; child processes will not be auto-terminated on hub crash");
+ }
+ }
+#endif
+}
+
+Hub::~Hub()
+{
+ try
+ {
+ ZEN_INFO("Hub service shutting down, deprovisioning any current instances");
+
+ m_Lock.WithExclusiveLock([this] {
+ for (auto& [ModuleId, Instance] : m_Instances)
+ {
+ Instance->Deprovision();
+ }
+ m_Instances.clear();
+ });
+ }
+ catch (const std::exception& e)
+ {
+ ZEN_WARN("Exception during hub service shutdown: {}", e.what());
+ }
+}
+
+bool
+Hub::Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason)
+{
+ StorageServerInstance* Instance = nullptr;
+ bool IsNewInstance = false;
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_Instances.find(std::string(ModuleId)); It == m_Instances.end())
+ {
+ std::string Reason;
+ if (!CanProvisionInstance(ModuleId, /* out */ Reason))
+ {
+ ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason);
+
+ OutReason = Reason;
+
+ return false;
+ }
+
+ IsNewInstance = true;
+ auto NewInstance =
+ std::make_unique<StorageServerInstance>(m_RunEnvironment, ModuleId, m_FileHydrationPath, m_HydrationTempPath);
+#if ZEN_PLATFORM_WINDOWS
+ if (m_JobObject.IsValid())
+ {
+ NewInstance->SetJobObject(&m_JobObject);
+ }
+#endif
+ Instance = NewInstance.get();
+ m_Instances.emplace(std::string(ModuleId), std::move(NewInstance));
+
+ ZEN_INFO("Created new storage server instance for module '{}'", ModuleId);
+ }
+ else
+ {
+ Instance = It->second.get();
+ }
+
+ m_ProvisioningModules.emplace(std::string(ModuleId));
+ }
+
+ ZEN_ASSERT(Instance != nullptr);
+
+ auto RemoveProvisioningModule = MakeGuard([&] {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_ProvisioningModules.erase(std::string(ModuleId));
+ });
+
+ // NOTE: this is done while not holding the lock, as provisioning may take time
+ // and we don't want to block other operations. We track which modules are being
+ // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision
+ // those modules while in this state.
+
+ UpdateStats();
+
+ try
+ {
+ Instance->Provision();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
+ if (IsNewInstance)
+ {
+ // Clean up
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Instances.erase(std::string(ModuleId));
+ }
+ return false;
+ }
+
+ OutInfo.Port = Instance->GetBasePort();
+
+ // TODO: base URI? Would need to know what host name / IP to use
+
+ return true;
+}
+
+bool
+Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
+{
+ std::unique_ptr<StorageServerInstance> Instance;
+
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (auto It = m_ProvisioningModules.find(ModuleId); It != m_ProvisioningModules.end())
+ {
+ OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId);
+
+ ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId);
+
+ return false;
+ }
+
+ if (auto It = m_Instances.find(ModuleId); It == m_Instances.end())
+ {
+ ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId);
+
+ // Not found, OutReason should be empty
+ return false;
+ }
+ else
+ {
+ Instance = std::move(It->second);
+ m_Instances.erase(It);
+ m_DeprovisioningModules.emplace(ModuleId);
+ }
+ }
+
+ // The module is deprovisioned outside the lock to avoid blocking other operations.
+ //
+ // To ensure that no new provisioning can occur while we're deprovisioning,
+ // we add the module ID to m_DeprovisioningModules and remove it once
+ // deprovisioning is complete.
+
+ auto _ = MakeGuard([&] {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_DeprovisioningModules.erase(ModuleId);
+ });
+
+ Instance->Deprovision();
+
+ return true;
+}
+
+bool
+Hub::Find(std::string_view ModuleId, StorageServerInstance** OutInstance)
+{
+ RwLock::SharedLockScope _(m_Lock);
+ if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end())
+ {
+ if (OutInstance)
+ {
+ *OutInstance = It->second.get();
+ }
+ return true;
+ }
+ else if (OutInstance)
+ {
+ *OutInstance = nullptr;
+ }
+ return false;
+}
+
+void
+Hub::EnumerateModules(std::function<void(StorageServerInstance&)> Callback)
+{
+ RwLock::SharedLockScope _(m_Lock);
+ for (auto& It : m_Instances)
+ {
+ Callback(*It.second);
+ }
+}
+
+int
+Hub::GetInstanceCount()
+{
+ RwLock::SharedLockScope _(m_Lock);
+ return gsl::narrow_cast<int>(m_Instances.size());
+}
+
+void
+Hub::UpdateCapacityMetrics()
+{
+ m_HostMetrics = GetSystemMetrics();
+
+ // Update per-instance metrics
+}
+
+void
+Hub::UpdateStats()
+{
+ m_Lock.WithSharedLock([this] { m_MaxInstanceCount = Max(m_MaxInstanceCount, gsl::narrow_cast<int>(m_Instances.size())); });
+}
+
+bool
+Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
+{
+ if (m_DeprovisioningModules.find(std::string(ModuleId)) != m_DeprovisioningModules.end())
+ {
+ OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId);
+
+ return false;
+ }
+
+ if (m_ProvisioningModules.find(std::string(ModuleId)) != m_ProvisioningModules.end())
+ {
+ OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId);
+
+ return false;
+ }
+
+ if (gsl::narrow_cast<int>(m_Instances.size()) >= m_InstanceLimit)
+ {
+ OutReason = fmt::format("instance limit exceeded ({})", m_InstanceLimit);
+
+ return false;
+ }
+
+ // TODO: handle additional resource metrics
+
+ return true;
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+} // namespace zen