aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-01-21 09:38:16 +0100
committerGitHub Enterprise <[email protected]>2026-01-21 09:38:16 +0100
commite8d162c293fbdf9a40a1369b60b80fa286aceb0f (patch)
tree0cdb2a913e5f4d6f2c151f36edba8fd5b2ca4f89 /src/zenserver
parentbuilds multipart upload (#722) (diff)
downloadzen-e8d162c293fbdf9a40a1369b60b80fa286aceb0f.tar.xz
zen-e8d162c293fbdf9a40a1369b60b80fa286aceb0f.zip
zen hub (#574)
Initial implementation of zenserver "hub" mode. This is an experimental feature. zenserver can be started in hub mode by specifying `hub` as the first argument to zenserver
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/hub/README.md28
-rw-r--r--src/zenserver/hub/hubservice.cpp867
-rw-r--r--src/zenserver/hub/hubservice.h42
-rw-r--r--src/zenserver/hub/hydration.cpp119
-rw-r--r--src/zenserver/hub/hydration.h40
-rw-r--r--src/zenserver/hub/zenhubserver.cpp303
-rw-r--r--src/zenserver/hub/zenhubserver.h92
-rw-r--r--src/zenserver/main.cpp4
-rw-r--r--src/zenserver/xmake.lua61
9 files changed, 1546 insertions, 10 deletions
diff --git a/src/zenserver/hub/README.md b/src/zenserver/hub/README.md
new file mode 100644
index 000000000..322be3649
--- /dev/null
+++ b/src/zenserver/hub/README.md
@@ -0,0 +1,28 @@
+# Zen Server Hub
+
+The Zen Server can act in a "hub" mode. In this mode, the only services offered are the basic health
+and diagnostic services alongside an API to provision and deprovision Storage server instances.
+
+## Generic Server API
+
+GET `/health` - returns an `OK!` payload when all enabled services are up and responding
+
+## Hub API
+
+GET `{moduleid}` - alphanumeric identifier to identify a dataset (typically associated with a content plug-in module)
+
+GET `/hub/status` - obtain a summary of the currently live instances
+
+GET `/hub/modules/{moduleid}` - retrieve information about a module
+
+POST `/hub/modules/{moduleid}/provision` - provision service for module
+
+POST `/hub/modules/{moduleid}/deprovision` - deprovision service for module
+
+GET `/hub/stats` - retrieve stats for service
+
+## Hub Configuration
+
+The hub service can use Consul to provide status updates
+
+The hub service can emit telemetry to an Open Telemetry collector
diff --git a/src/zenserver/hub/hubservice.cpp b/src/zenserver/hub/hubservice.cpp
new file mode 100644
index 000000000..4d9da3a57
--- /dev/null
+++ b/src/zenserver/hub/hubservice.cpp
@@ -0,0 +1,867 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "hubservice.h"
+
+#include "hydration.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/system.h>
+#include <zenutil/zenserverprocess.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <EASTL/fixed_vector.h>
+#include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <unordered_map>
+#include <unordered_set>
+
+namespace zen {
+
+///////////////////////////////////////////////////////////////////////////
+
+/**
+ * A timeline of events with sequence IDs and timestamps. Used to
+ * track significant events for broadcasting to listeners.
+ */
+class EventTimeline
+{
+public:
+ EventTimeline() { m_Events.reserve(1024); }
+
+ ~EventTimeline() {}
+
+ EventTimeline(const EventTimeline&) = delete;
+ EventTimeline& operator=(const EventTimeline&) = delete;
+
+ void RecordEvent(std::string_view EventTag, CbObject EventMetadata)
+ {
+ const uint64_t SequenceId = m_NextEventId++;
+ const auto Now = std::chrono::steady_clock::now();
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Events.emplace_back(SequenceId, EventTag, Now, std::move(EventMetadata));
+ }
+
+ struct EventRecord
+ {
+ uint64_t SequenceId;
+ std::string Tag;
+ std::chrono::steady_clock::time_point Timestamp;
+ CbObject EventMetadata;
+
+ EventRecord(uint64_t InSequenceId,
+ std::string_view InTag,
+ std::chrono::steady_clock::time_point InTimestamp,
+ CbObject InEventMetadata = CbObject())
+ : SequenceId(InSequenceId)
+ , Tag(InTag)
+ , Timestamp(InTimestamp)
+ , EventMetadata(InEventMetadata)
+ {
+ }
+ };
+
+ /**
+ * Iterate over events that have a SequenceId greater than SinceEventId
+ *
+ * @param Callback A callable that takes a const EventRecord&
+ * @param SinceEventId The SequenceId to compare against
+ */
+ void IterateEventsSince(auto&& Callback, uint64_t SinceEventId)
+ {
+ // Hold the lock for as short a time as possible
+ eastl::fixed_vector<EventRecord, 128> EventsToProcess;
+ m_Lock.WithSharedLock([&] {
+ for (auto& Event : m_Events)
+ {
+ if (Event.SequenceId > SinceEventId)
+ {
+ EventsToProcess.push_back(Event);
+ }
+ }
+ });
+
+ // Now invoke the callback outside the lock
+ for (auto& Event : EventsToProcess)
+ {
+ Callback(Event);
+ }
+ }
+
+ /**
+ * Trim events up to (and including) the given SequenceId. Intended
+ * to be used for cleaning up events which are not longer interesting.
+ *
+ * @param UpToEventId The SequenceId up to which events should be removed
+ */
+ void TrimEventsUpTo(uint64_t UpToEventId)
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ auto It = std::remove_if(m_Events.begin(), m_Events.end(), [UpToEventId](const EventRecord& Event) {
+ return Event.SequenceId <= UpToEventId;
+ });
+ m_Events.erase(It, m_Events.end());
+ }
+
+private:
+ std::atomic<uint64_t> m_NextEventId{0};
+
+ RwLock m_Lock;
+ std::vector<EventRecord> m_Events;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+struct ResourceMetrics
+{
+ uint64_t DiskUsageBytes = 0;
+ uint64_t MemoryUsageBytes = 0;
+};
+
+/**
+ * Storage Server Instance
+ *
+ * This class manages the lifecycle of a storage server instance, and
+ * provides functions to query its state. There should be one instance
+ * per module ID.
+ */
+struct StorageServerInstance
+{
+ StorageServerInstance(ZenServerEnvironment& RunEnvironment,
+ std::string_view ModuleId,
+ std::filesystem::path FileHydrationPath,
+ std::filesystem::path HydrationTempPath);
+ ~StorageServerInstance();
+
+ void Provision();
+ void Deprovision();
+
+ void Hibernate();
+ void Wake();
+
+ const ResourceMetrics& GetResourceMetrics() const { return m_ResourceMetrics; }
+
+ inline std::string_view GetModuleId() const { return m_ModuleId; }
+ inline bool IsProvisioned() const { return m_IsProvisioned.load(); }
+
+ inline uint16_t GetBasePort() const { return m_ServerInstance.GetBasePort(); }
+
+private:
+ RwLock m_Lock;
+ std::string m_ModuleId;
+ std::atomic<bool> m_IsProvisioned{false};
+ std::atomic<bool> m_IsHibernated{false};
+ ZenServerInstance m_ServerInstance;
+ std::filesystem::path m_BaseDir;
+ std::filesystem::path m_TempDir;
+ std::filesystem::path m_HydrationPath;
+ ResourceMetrics m_ResourceMetrics;
+
+ void SpawnServerProcess();
+
+ void Hydrate();
+ void Dehydrate();
+};
+
+StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment,
+ std::string_view ModuleId,
+ std::filesystem::path FileHydrationPath,
+ std::filesystem::path HydrationTempPath)
+: m_ModuleId(ModuleId)
+, m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer)
+, m_HydrationPath(FileHydrationPath)
+{
+ m_BaseDir = RunEnvironment.CreateChildDir(ModuleId);
+ m_TempDir = HydrationTempPath / ModuleId;
+}
+
+StorageServerInstance::~StorageServerInstance()
+{
+}
+
+void
+StorageServerInstance::SpawnServerProcess()
+{
+ ZEN_ASSERT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId);
+
+ m_ServerInstance.SetServerExecutablePath(GetRunningExecutablePath());
+ m_ServerInstance.SetDataDir(m_BaseDir);
+ const uint16_t BasePort = m_ServerInstance.SpawnServerAndWaitUntilReady();
+
+ ZEN_DEBUG("Storage server instance for module '{}' started, listening on port {}", m_ModuleId, BasePort);
+
+ m_ServerInstance.EnableShutdownOnDestroy();
+}
+
+void
+StorageServerInstance::Provision()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (m_IsProvisioned)
+ {
+ ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId);
+
+ return;
+ }
+
+ if (m_IsHibernated)
+ {
+ Wake();
+ }
+ else
+ {
+ ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir);
+
+ Hydrate();
+
+ SpawnServerProcess();
+ }
+
+ m_IsProvisioned = true;
+}
+
+void
+StorageServerInstance::Deprovision()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (!m_IsProvisioned)
+ {
+ ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned", m_ModuleId);
+
+ return;
+ }
+
+ ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId);
+
+ m_ServerInstance.Shutdown();
+
+ Dehydrate();
+
+ m_IsProvisioned = false;
+}
+
+void
+StorageServerInstance::Hibernate()
+{
+ // Signal server to shut down, but keep data around for later wake
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (!m_IsProvisioned)
+ {
+ ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned", m_ModuleId);
+
+ return;
+ }
+
+ if (m_IsHibernated)
+ {
+ ZEN_WARN("Storage server instance for module '{}' is already hibernated", m_ModuleId);
+
+ return;
+ }
+
+ if (!m_ServerInstance.IsRunning())
+ {
+ ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not running", m_ModuleId);
+
+ // This is an unexpected state. Should consider the instance invalid?
+
+ return;
+ }
+
+ try
+ {
+ m_ServerInstance.Shutdown();
+
+ m_IsHibernated = true;
+ m_IsProvisioned = false;
+
+ return;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", m_ModuleId, Ex.what());
+ }
+}
+
+void
+StorageServerInstance::Wake()
+{
+ // Start server in-place using existing data
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (!m_IsHibernated)
+ {
+ ZEN_WARN("Attempted to wake storage server instance for module '{}' which is not hibernated", m_ModuleId);
+
+ return;
+ }
+
+ ZEN_ASSERT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId);
+
+ try
+ {
+ SpawnServerProcess();
+ m_IsHibernated = false;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", m_ModuleId, Ex.what());
+
+ // TODO: this instance should be marked as invalid
+ }
+}
+
+void
+StorageServerInstance::Hydrate()
+{
+ HydrationConfig Config{.ServerStateDir = m_BaseDir,
+ .TempDir = m_TempDir,
+ .ModuleId = m_ModuleId,
+ .TargetSpecification = WideToUtf8(m_HydrationPath.native())};
+
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateFileHydrator();
+
+ Hydrator->Configure(Config);
+ Hydrator->Hydrate();
+}
+
+void
+StorageServerInstance::Dehydrate()
+{
+ HydrationConfig Config{.ServerStateDir = m_BaseDir,
+ .TempDir = m_TempDir,
+ .ModuleId = m_ModuleId,
+ .TargetSpecification = WideToUtf8(m_HydrationPath.native())};
+
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateFileHydrator();
+
+ Hydrator->Configure(Config);
+ Hydrator->Dehydrate();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+struct HttpHubService::Impl
+{
+ Impl(const Impl&) = delete;
+ Impl& operator=(const Impl&) = delete;
+
+ Impl();
+ ~Impl();
+
+ void Initialize(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir)
+ {
+ m_RunEnvironment.InitializeForHub(HubBaseDir, ChildBaseDir);
+ m_FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage");
+ ZEN_INFO("using file hydration path: '{}'", m_FileHydrationPath);
+
+ m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp");
+ ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath);
+
+ // This is necessary to ensure the hub assigns a distinct port range.
+ // We need to do this primarily because otherwise automated tests will
+ // fail as the test runner will create processes in the default range.
+ // We should probably make this configurable or dynamic for maximum
+ // flexibility, and to allow running multiple hubs on the same host if
+ // necessary.
+ m_RunEnvironment.SetNextPortNumber(21000);
+ }
+
+ void Cleanup()
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Instances.clear();
+ }
+
+ struct ProvisionedInstanceInfo
+ {
+ std::string BaseUri;
+ uint16_t Port;
+ };
+
+ /**
+ * Provision a storage server instance for the given module ID.
+ *
+ * @param ModuleId The ID of the module to provision.
+ * @param OutInfo If successful, information about the provisioned instance will be returned here.
+ * @param OutReason If unsuccessful, the reason will be returned here.
+ */
+ bool Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason)
+ {
+ StorageServerInstance* Instance = nullptr;
+ bool IsNewInstance = false;
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_Instances.find(std::string(ModuleId)); It == m_Instances.end())
+ {
+ std::string Reason;
+ if (!CanProvisionInstance(ModuleId, /* out */ Reason))
+ {
+ ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason);
+
+ OutReason = Reason;
+
+ return false;
+ }
+
+ IsNewInstance = true;
+ auto NewInstance =
+ std::make_unique<StorageServerInstance>(m_RunEnvironment, ModuleId, m_FileHydrationPath, m_HydrationTempPath);
+ Instance = NewInstance.get();
+ m_Instances.emplace(std::string(ModuleId), std::move(NewInstance));
+
+ ZEN_INFO("Created new storage server instance for module '{}'", ModuleId);
+ }
+ else
+ {
+ Instance = It->second.get();
+ }
+
+ m_ProvisioningModules.emplace(std::string(ModuleId));
+ }
+
+ ZEN_ASSERT(Instance != nullptr);
+
+ auto RemoveProvisioningModule = MakeGuard([&] {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_ProvisioningModules.erase(std::string(ModuleId));
+ });
+
+ // NOTE: this is done while not holding the lock, as provisioning may take time
+ // and we don't want to block other operations. We track which modules are being
+ // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision
+ // those modules while in this state.
+
+ UpdateStats();
+
+ try
+ {
+ Instance->Provision();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
+ if (IsNewInstance)
+ {
+ // Clean up
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Instances.erase(std::string(ModuleId));
+ }
+ return false;
+ }
+
+ OutInfo.Port = Instance->GetBasePort();
+
+ // TODO: base URI? Would need to know what host name / IP to use
+
+ return true;
+ }
+
+ /**
+ * Deprovision a storage server instance for the given module ID.
+ *
+ * @param ModuleId The ID of the module to deprovision.
+ * @param OutReason If unsuccessful, the reason will be returned here.
+ * @return true if the instance was found and deprovisioned, false otherwise.
+ */
+ bool Deprovision(const std::string& ModuleId, std::string& OutReason)
+ {
+ std::unique_ptr<StorageServerInstance> Instance;
+
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (auto It = m_ProvisioningModules.find(ModuleId); It != m_ProvisioningModules.end())
+ {
+ OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId);
+
+ ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId);
+
+ return false;
+ }
+
+ if (auto It = m_Instances.find(ModuleId); It == m_Instances.end())
+ {
+ ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId);
+
+ // Not found, OutReason should be empty
+ return false;
+ }
+ else
+ {
+ Instance = std::move(It->second);
+ m_Instances.erase(It);
+ m_DeprovisioningModules.emplace(ModuleId);
+ }
+ }
+
+ // The module is deprovisioned outside the lock to avoid blocking other operations.
+ //
+ // To ensure that no new provisioning can occur while we're deprovisioning,
+ // we add the module ID to m_DeprovisioningModules and remove it once
+ // deprovisioning is complete.
+
+ auto _ = MakeGuard([&] {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_DeprovisioningModules.erase(ModuleId);
+ });
+
+ Instance->Deprovision();
+
+ return true;
+ }
+
+ /**
+ * Find a storage server instance for the given module ID.
+ *
+ * Beware that as this returns a raw pointer to the instance, the caller must ensure
+ * that the instance is not deprovisioned while in use.
+ *
+ * @param ModuleId The ID of the module to find.
+ * @param OutInstance If found, the instance will be returned here.
+ * @return true if the instance was found, false otherwise.
+ */
+ bool Find(std::string_view ModuleId, StorageServerInstance** OutInstance = nullptr)
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end())
+ {
+ if (OutInstance)
+ {
+ *OutInstance = It->second.get();
+ }
+ return true;
+ }
+ else if (OutInstance)
+ {
+ *OutInstance = nullptr;
+ }
+ return false;
+ }
+
+ /**
+ * Enumerate all storage server instances.
+ *
+ * @param Callback The callback to invoke for each instance. Note that you should
+ * not do anything heavyweight in the callback as it is invoked while holding
+ * a shared lock.
+ */
+ void EnumerateModules(auto&& Callback)
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ for (auto& It : m_Instances)
+ {
+ Callback(*It.second);
+ }
+ }
+
+ int GetInstanceCount()
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ return gsl::narrow_cast<int>(m_Instances.size());
+ }
+
+ inline int GetInstanceLimit() { return m_InstanceLimit; }
+ inline int GetMaxInstanceCount() { return m_MaxInstanceCount; }
+
+private:
+ ZenServerEnvironment m_RunEnvironment;
+ std::filesystem::path m_FileHydrationPath;
+ std::filesystem::path m_HydrationTempPath;
+ RwLock m_Lock;
+ std::unordered_map<std::string, std::unique_ptr<StorageServerInstance>> m_Instances;
+ std::unordered_set<std::string> m_DeprovisioningModules;
+ std::unordered_set<std::string> m_ProvisioningModules;
+ int m_MaxInstanceCount = 0;
+ void UpdateStats();
+
+ // Capacity tracking
+
+ int m_InstanceLimit = 1000;
+ ResourceMetrics m_ResourceLimits;
+ SystemMetrics m_HostMetrics;
+
+ void UpdateCapacityMetrics();
+ bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason);
+};
+
+HttpHubService::Impl::Impl()
+{
+ m_HostMetrics = zen::GetSystemMetrics();
+ m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024;
+ m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024;
+}
+
+HttpHubService::Impl::~Impl()
+{
+ try
+ {
+ ZEN_INFO("Hub service shutting down, deprovisioning any current instances");
+
+ m_Lock.WithExclusiveLock([this] {
+ for (auto& [ModuleId, Instance] : m_Instances)
+ {
+ Instance->Deprovision();
+ }
+ m_Instances.clear();
+ });
+ }
+ catch (const std::exception& e)
+ {
+ ZEN_WARN("Exception during hub service shutdown: {}", e.what());
+ }
+}
+
+void
+HttpHubService::Impl::UpdateCapacityMetrics()
+{
+ m_HostMetrics = zen::GetSystemMetrics();
+
+ // Update per-instance metrics
+}
+
+void
+HttpHubService::Impl::UpdateStats()
+{
+ m_Lock.WithSharedLock([this] { m_MaxInstanceCount = Max(m_MaxInstanceCount, gsl::narrow_cast<int>(m_Instances.size())); });
+}
+
+bool
+HttpHubService::Impl::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
+{
+ if (m_DeprovisioningModules.find(std::string(ModuleId)) != m_DeprovisioningModules.end())
+ {
+ OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId);
+
+ return false;
+ }
+
+ if (m_ProvisioningModules.find(std::string(ModuleId)) != m_ProvisioningModules.end())
+ {
+ OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId);
+
+ return false;
+ }
+
+ if (gsl::narrow_cast<int>(m_Instances.size()) >= m_InstanceLimit)
+ {
+ OutReason = fmt::format("instance limit exceeded ({})", m_InstanceLimit);
+
+ return false;
+ }
+
+ // TODO: handle additional resource metrics
+
+ return true;
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir) : m_Impl(std::make_unique<Impl>())
+{
+ using namespace std::literals;
+
+ m_Impl->Initialize(HubBaseDir, ChildBaseDir);
+
+ m_Router.AddMatcher("moduleid", [](std::string_view Str) -> bool {
+ for (const auto C : Str)
+ {
+ if (std::isalnum(C) || C == '-')
+ {
+ // fine
+ }
+ else
+ {
+ // not fine
+ return false;
+ }
+ }
+
+ return true;
+ });
+
+ m_Router.RegisterRoute(
+ "status",
+ [this](HttpRouterRequest& Req) {
+ CbObjectWriter Obj;
+ Obj.BeginArray("modules");
+ m_Impl->EnumerateModules([&Obj](StorageServerInstance& Instance) {
+ Obj.BeginObject();
+ Obj << "moduleId" << Instance.GetModuleId();
+ Obj << "provisioned" << Instance.IsProvisioned();
+ Obj.EndObject();
+ });
+ Obj.EndArray();
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "modules/{moduleid}",
+ [this](HttpRouterRequest& Req) {
+ std::string_view ModuleId = Req.GetCapture(1);
+
+ if (Req.ServerRequest().RequestVerb() == HttpVerb::kDelete)
+ {
+ HandleModuleDelete(Req.ServerRequest(), ModuleId);
+ }
+ else
+ {
+ HandleModuleGet(Req.ServerRequest(), ModuleId);
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kDelete);
+
+ m_Router.RegisterRoute(
+ "modules/{moduleid}/provision",
+ [this](HttpRouterRequest& Req) {
+ std::string_view ModuleId = Req.GetCapture(1);
+
+ std::string FailureReason = "unknown";
+ HttpResponseCode ResponseCode = HttpResponseCode::OK;
+
+ try
+ {
+ Impl::ProvisionedInstanceInfo Info;
+ if (m_Impl->Provision(ModuleId, /* out */ Info, /* out */ FailureReason))
+ {
+ CbObjectWriter Obj;
+ Obj << "moduleId" << ModuleId;
+ Obj << "baseUri" << Info.BaseUri;
+ Obj << "port" << Info.Port;
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+
+ return;
+ }
+ else
+ {
+ ResponseCode = HttpResponseCode::BadRequest;
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Exception while provisioning module '{}': {}", ModuleId, Ex.what());
+
+ FailureReason = Ex.what();
+ ResponseCode = HttpResponseCode::InternalServerError;
+ }
+
+ Req.ServerRequest().WriteResponse(ResponseCode, HttpContentType::kText, FailureReason);
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "modules/{moduleid}/deprovision",
+ [this](HttpRouterRequest& Req) {
+ std::string_view ModuleId = Req.GetCapture(1);
+ std::string FailureReason = "unknown";
+
+ try
+ {
+ if (!m_Impl->Deprovision(std::string(ModuleId), /* out */ FailureReason))
+ {
+ if (FailureReason.empty())
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound);
+ }
+ else
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason);
+ }
+ }
+
+ CbObjectWriter Obj;
+ Obj << "moduleId" << ModuleId;
+
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Exception while deprovisioning module '{}': {}", ModuleId, Ex.what());
+
+ FailureReason = Ex.what();
+ }
+
+ Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, FailureReason);
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "stats",
+ [this](HttpRouterRequest& Req) {
+ CbObjectWriter Obj;
+ Obj << "currentInstanceCount" << m_Impl->GetInstanceCount();
+ Obj << "maxInstanceCount" << m_Impl->GetMaxInstanceCount();
+ Obj << "instanceLimit" << m_Impl->GetInstanceLimit();
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK);
+ },
+ HttpVerb::kGet);
+}
+
+HttpHubService::~HttpHubService()
+{
+}
+
+const char*
+HttpHubService::BaseUri() const
+{
+ return "/hub/";
+}
+
+void
+HttpHubService::SetNotificationEndpoint(std::string_view UpstreamNotificationEndpoint, std::string_view InstanceId)
+{
+ ZEN_UNUSED(UpstreamNotificationEndpoint, InstanceId);
+ // TODO: store these for use in notifications, on some interval/criteria which is currently TBD
+}
+
+void
+HttpHubService::HandleRequest(zen::HttpServerRequest& Request)
+{
+ m_Router.HandleRequest(Request);
+}
+
+void
+HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId)
+{
+ StorageServerInstance* Instance = nullptr;
+ if (!m_Impl->Find(ModuleId, &Instance))
+ {
+ Request.WriteResponse(HttpResponseCode::NotFound);
+ return;
+ }
+
+ CbObjectWriter Obj;
+ Obj << "moduleId" << Instance->GetModuleId();
+ Obj << "provisioned" << Instance->IsProvisioned();
+ Request.WriteResponse(HttpResponseCode::OK, Obj.Save());
+}
+
+void
+HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId)
+{
+ StorageServerInstance* Instance = nullptr;
+ if (!m_Impl->Find(ModuleId, &Instance))
+ {
+ Request.WriteResponse(HttpResponseCode::NotFound);
+ return;
+ }
+
+ // TODO: deprovision and nuke all related storage
+
+ CbObjectWriter Obj;
+ Obj << "moduleId" << Instance->GetModuleId();
+ Obj << "provisioned" << Instance->IsProvisioned();
+ Request.WriteResponse(HttpResponseCode::OK, Obj.Save());
+}
+
+} // namespace zen
diff --git a/src/zenserver/hub/hubservice.h b/src/zenserver/hub/hubservice.h
new file mode 100644
index 000000000..1a5a8c57c
--- /dev/null
+++ b/src/zenserver/hub/hubservice.h
@@ -0,0 +1,42 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenhttp/httpserver.h>
+
+#include "hydration.h"
+
+namespace zen {
+
+/** ZenServer Hub Service
+ *
+ * Manages a set of storage servers on the behalf of external clients. For
+ * use in UEFN content worker style scenarios.
+ *
+ */
+class HttpHubService : public zen::HttpService
+{
+public:
+ HttpHubService(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir);
+ ~HttpHubService();
+
+ HttpHubService(const HttpHubService&) = delete;
+ HttpHubService& operator=(const HttpHubService&) = delete;
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+
+ void SetNotificationEndpoint(std::string_view UpstreamNotificationEndpoint, std::string_view InstanceId);
+
+private:
+ HttpRequestRouter m_Router;
+
+ struct Impl;
+
+ std::unique_ptr<Impl> m_Impl;
+
+ void HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId);
+ void HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId);
+};
+
+} // namespace zen
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
new file mode 100644
index 000000000..52c17fe1a
--- /dev/null
+++ b/src/zenserver/hub/hydration.cpp
@@ -0,0 +1,119 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "hydration.h"
+
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+
+namespace zen {
+
+///////////////////////////////////////////////////////////////////////////
+
+struct FileHydrator : public HydrationStrategyBase
+{
+ virtual void Configure(const HydrationConfig& Config) override;
+ virtual void Hydrate() override;
+ virtual void Dehydrate() override;
+
+private:
+ HydrationConfig m_Config;
+ std::filesystem::path m_StorageModuleRootDir;
+};
+
+void
+FileHydrator::Configure(const HydrationConfig& Config)
+{
+ m_Config = Config;
+
+ std::filesystem::path ConfigPath(Utf8ToWide(m_Config.TargetSpecification));
+
+ if (!std::filesystem::exists(ConfigPath))
+ {
+ throw std::invalid_argument(fmt::format("Target does not exist: '{}'", ConfigPath.string()));
+ }
+
+ m_StorageModuleRootDir = ConfigPath / m_Config.ModuleId;
+
+ CreateDirectories(m_StorageModuleRootDir);
+}
+
+void
+FileHydrator::Hydrate()
+{
+ ZEN_INFO("Hydrating state from '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir);
+
+ // Ensure target is clean
+ ZEN_DEBUG("Wiping server state at '{}'", m_Config.ServerStateDir);
+ const bool ForceRemoveReadOnlyFiles = true;
+ CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+
+ bool WipeServerState = false;
+
+ try
+ {
+ ZEN_DEBUG("Copying '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir);
+ CopyTree(m_StorageModuleRootDir, m_Config.ServerStateDir, {.EnableClone = true});
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_Config.ServerStateDir);
+
+ // We don't do the clean right here to avoid potentially running into double-throws
+ WipeServerState = true;
+ }
+
+ if (WipeServerState)
+ {
+ ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+ }
+
+ // Note that we leave the storage state intact until next dehydration replaces the content
+}
+
+void
+FileHydrator::Dehydrate()
+{
+ ZEN_INFO("Dehydrating state from '{}' to '{}'", m_Config.ServerStateDir, m_StorageModuleRootDir);
+
+ const std::filesystem::path TargetDir = m_StorageModuleRootDir;
+
+ // Ensure target is clean. This could be replaced with an atomic copy at a later date
+ // (i.e copy into a temporary directory name and rename it once complete)
+
+ ZEN_DEBUG("Cleaning storage root '{}'", TargetDir);
+ const bool ForceRemoveReadOnlyFiles = true;
+ CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles);
+
+ bool CopySuccess = true;
+
+ try
+ {
+ ZEN_DEBUG("Copying '{}' to '{}'", m_Config.ServerStateDir, TargetDir);
+ CopyTree(m_Config.ServerStateDir, TargetDir, {.EnableClone = true});
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_StorageModuleRootDir);
+
+ // We don't do the clean right here to avoid potentially running into double-throws
+ CopySuccess = false;
+ }
+
+ if (!CopySuccess)
+ {
+ ZEN_DEBUG("Removing partially copied state from '{}'", TargetDir);
+ CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles);
+ }
+
+ ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+}
+
+std::unique_ptr<HydrationStrategyBase>
+CreateFileHydrator()
+{
+ return std::make_unique<FileHydrator>();
+}
+
+} // namespace zen
diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h
new file mode 100644
index 000000000..f86f2accf
--- /dev/null
+++ b/src/zenserver/hub/hydration.h
@@ -0,0 +1,40 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zenhubserver.h"
+
+namespace zen {
+
+struct HydrationConfig
+{
+ // Location of server state to hydrate/dehydrate
+ std::filesystem::path ServerStateDir;
+ // Temporary directory available for use during hydration/dehydration
+ std::filesystem::path TempDir;
+ // Module ID of the server state being hydrated/dehydrated
+ std::string ModuleId;
+ // Back-end specific target specification (e.g. S3 bucket, file path, etc)
+ std::string TargetSpecification;
+};
+
+/**
+ * @brief State hydration strategy interface
+ *
+ * An instance of this interface is used to perform hydration OR
+ * dehydration of server state. It's expected to be used only once
+ * and not reused.
+ *
+ */
+struct HydrationStrategyBase
+{
+ virtual ~HydrationStrategyBase() = default;
+
+ virtual void Dehydrate() = 0;
+ virtual void Hydrate() = 0;
+ virtual void Configure(const HydrationConfig& Config) = 0;
+};
+
+std::unique_ptr<HydrationStrategyBase> CreateFileHydrator();
+
+} // namespace zen
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
new file mode 100644
index 000000000..7a4ba951d
--- /dev/null
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -0,0 +1,303 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zenhubserver.h"
+#include "hubservice.h"
+
+#include <zencore/fmtutils.h>
+#include <zencore/memory/llm.h>
+#include <zencore/memory/memorytrace.h>
+#include <zencore/memory/tagtrace.h>
+#include <zencore/scopeguard.h>
+#include <zencore/sentryintegration.h>
+#include <zencore/system.h>
+#include <zencore/windows.h>
+#include <zenhttp/httpapiservice.h>
+#include <zenutil/service.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <cxxopts.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+void
+ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
+{
+ Options.add_option("hub",
+ "",
+ "upstream-notification-endpoint",
+ "Endpoint URL for upstream notifications",
+ cxxopts::value<std::string>(m_ServerOptions.UpstreamNotificationEndpoint)->default_value(""),
+ "");
+
+ Options.add_option("hub",
+ "",
+ "instance-id",
+ "Instance ID for use in notifications",
+ cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""),
+ "");
+}
+
+void
+ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options)
+{
+ ZEN_UNUSED(Options);
+}
+
+void
+ZenHubServerConfigurator::ApplyOptions(cxxopts::Options& Options)
+{
+ ZEN_UNUSED(Options);
+}
+
+void
+ZenHubServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions)
+{
+ ZEN_UNUSED(LuaOptions);
+}
+
+void
+ZenHubServerConfigurator::ValidateOptions()
+{
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+ZenHubServer::ZenHubServer()
+{
+}
+
+ZenHubServer::~ZenHubServer()
+{
+ Cleanup();
+}
+
+int
+ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry)
+{
+ ZEN_TRACE_CPU("ZenHubServer::Initialize");
+ ZEN_MEMSCOPE(GetZenserverTag());
+
+ ZEN_INFO(ZEN_APP_NAME " initializing in HUB server mode");
+
+ const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry);
+ if (EffectiveBasePort < 0)
+ {
+ return EffectiveBasePort;
+ }
+
+ // This is a workaround to make sure we can have automated tests. Without
+ // this the ranges for different child zen hub processes could overlap with
+ // the main test range.
+ ZenServerEnvironment::SetBaseChildId(1000);
+
+ m_DebugOptionForcedCrash = ServerConfig.ShouldCrash;
+
+ InitializeState(ServerConfig);
+ InitializeServices(ServerConfig);
+ RegisterServices(ServerConfig);
+
+ ZenServerBase::Finalize();
+
+ return EffectiveBasePort;
+}
+
+void
+ZenHubServer::Cleanup()
+{
+ ZEN_TRACE_CPU("ZenStorageServer::Cleanup");
+ ZEN_INFO(ZEN_APP_NAME " cleaning up");
+ try
+ {
+ m_IoContext.stop();
+ if (m_IoRunner.joinable())
+ {
+ m_IoRunner.join();
+ }
+
+ if (m_Http)
+ {
+ m_Http->Close();
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what());
+ }
+}
+
+void
+ZenHubServer::InitializeState(const ZenHubServerConfig& ServerConfig)
+{
+ ZEN_UNUSED(ServerConfig);
+}
+
+void
+ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
+{
+ ZEN_UNUSED(ServerConfig);
+
+ ZEN_INFO("instantiating API service");
+ m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http);
+
+ ZEN_INFO("instantiating hub service");
+ m_HubService = std::make_unique<HttpHubService>(ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers");
+ m_HubService->SetNotificationEndpoint(ServerConfig.UpstreamNotificationEndpoint, ServerConfig.InstanceId);
+}
+
+void
+ZenHubServer::RegisterServices(const ZenHubServerConfig& ServerConfig)
+{
+ ZEN_UNUSED(ServerConfig);
+
+ if (m_HubService)
+ {
+ m_Http->RegisterService(*m_HubService);
+ }
+
+ if (m_ApiService)
+ {
+ m_Http->RegisterService(*m_ApiService);
+ }
+}
+
+void
+ZenHubServer::Run()
+{
+ if (m_ProcessMonitor.IsActive())
+ {
+ CheckOwnerPid();
+ }
+
+ if (!m_TestMode)
+ {
+ // clang-format off
+ ZEN_INFO(R"(__________ ___ ___ ___. )" "\n"
+ R"(\____ /____ ____ / | \ __ _\_ |__ )" "\n"
+ R"( / // __ \ / \ / ~ \ | \ __ \ )" "\n"
+ R"( / /\ ___/| | \ \ Y / | / \_\ \)" "\n"
+ R"(/_______ \___ >___| / \___|_ /|____/|___ /)" "\n"
+ R"( \/ \/ \/ \/ \/ )");
+ // clang-format on
+
+ ExtendableStringBuilder<256> BuildOptions;
+ GetBuildOptions(BuildOptions, '\n');
+ ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions);
+ }
+
+ ZEN_INFO(ZEN_APP_NAME " now running as HUB (pid: {})", GetCurrentProcessId());
+
+#if ZEN_PLATFORM_WINDOWS
+ if (zen::windows::IsRunningOnWine())
+ {
+ ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well");
+ }
+#endif
+
+#if ZEN_USE_SENTRY
+ ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED");
+ if (m_UseSentry)
+ {
+ SentryIntegration::ClearCaches();
+ }
+#endif
+
+ if (m_DebugOptionForcedCrash)
+ {
+ ZEN_DEBUG_BREAK();
+ }
+
+ const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode;
+
+ SetNewState(kRunning);
+
+ OnReady();
+
+ m_Http->Run(IsInteractiveMode);
+
+ SetNewState(kShuttingDown);
+
+ ZEN_INFO(ZEN_APP_NAME " exiting");
+}
+
+//////////////////////////////////////////////////////////////////////////////////
+
+ZenHubServerMain::ZenHubServerMain(ZenHubServerConfig& ServerOptions) : ZenServerMain(ServerOptions), m_ServerOptions(ServerOptions)
+{
+}
+
+void
+ZenHubServerMain::DoRun(ZenServerState::ZenServerEntry* Entry)
+{
+ ZenHubServer Server;
+ Server.SetDataRoot(m_ServerOptions.DataDir);
+ Server.SetContentRoot(m_ServerOptions.ContentDir);
+ Server.SetTestMode(m_ServerOptions.IsTest);
+ Server.SetDedicatedMode(m_ServerOptions.IsDedicated);
+
+ const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry);
+ if (EffectiveBasePort == -1)
+ {
+ // Server.Initialize has already logged what the issue is - just exit with failure code here.
+ std::exit(1);
+ }
+
+ Entry->EffectiveListenPort = uint16_t(EffectiveBasePort);
+ if (EffectiveBasePort != m_ServerOptions.BasePort)
+ {
+ ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort);
+ m_ServerOptions.BasePort = EffectiveBasePort;
+ }
+
+ std::unique_ptr<std::thread> ShutdownThread;
+ std::unique_ptr<NamedEvent> ShutdownEvent;
+
+ ExtendableStringBuilder<64> ShutdownEventName;
+ ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown";
+ ShutdownEvent.reset(new NamedEvent{ShutdownEventName});
+
+ // Monitor shutdown signals
+
+ ShutdownThread.reset(new std::thread{[&] {
+ SetCurrentThreadName("shutdown_mon");
+
+ ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId());
+
+ if (ShutdownEvent->Wait())
+ {
+ ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId());
+ Server.RequestExit(0);
+ }
+ else
+ {
+ ZEN_INFO("shutdown signal wait() failed");
+ }
+ }});
+
+ auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] {
+ ReportServiceStatus(ServiceStatus::Stopping);
+
+ if (ShutdownEvent)
+ {
+ ShutdownEvent->Set();
+ }
+ if (ShutdownThread && ShutdownThread->joinable())
+ {
+ ShutdownThread->join();
+ }
+ });
+
+ // If we have a parent process, establish the mechanisms we need
+ // to be able to communicate readiness with the parent
+
+ Server.SetIsReadyFunc([&] {
+ std::error_code Ec;
+ m_LockFile.Update(MakeLockData(true), Ec);
+ ReportServiceStatus(ServiceStatus::Running);
+ NotifyReady();
+ });
+
+ Server.Run();
+}
+
+} // namespace zen
diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h
new file mode 100644
index 000000000..ac14362f0
--- /dev/null
+++ b/src/zenserver/hub/zenhubserver.h
@@ -0,0 +1,92 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zenserver.h"
+
+namespace cxxopts {
+class Options;
+}
+namespace zen::LuaConfig {
+struct Options;
+}
+
+namespace zen {
+
+class HttpApiService;
+class HttpHubService;
+
+struct ZenHubServerConfig : public ZenServerConfig
+{
+ std::string UpstreamNotificationEndpoint;
+ std::string InstanceId; // For use in notifications
+};
+
+struct ZenHubServerConfigurator : public ZenServerConfiguratorBase
+{
+ ZenHubServerConfigurator(ZenHubServerConfig& ServerOptions) : ZenServerConfiguratorBase(ServerOptions), m_ServerOptions(ServerOptions)
+ {
+ }
+
+ ~ZenHubServerConfigurator() = default;
+
+private:
+ virtual void AddCliOptions(cxxopts::Options& Options) override;
+ virtual void AddConfigOptions(LuaConfig::Options& Options) override;
+ virtual void ApplyOptions(cxxopts::Options& Options) override;
+ virtual void OnConfigFileParsed(LuaConfig::Options& LuaOptions) override;
+ virtual void ValidateOptions() override;
+
+ ZenHubServerConfig& m_ServerOptions;
+};
+
+class ZenHubServerMain : public ZenServerMain
+{
+public:
+ ZenHubServerMain(ZenHubServerConfig& ServerOptions);
+ virtual void DoRun(ZenServerState::ZenServerEntry* Entry) override;
+
+ ZenHubServerMain(const ZenHubServerMain&) = delete;
+ ZenHubServerMain& operator=(const ZenHubServerMain&) = delete;
+
+ typedef ZenHubServerConfig Config;
+ typedef ZenHubServerConfigurator Configurator;
+
+private:
+ ZenHubServerConfig& m_ServerOptions;
+};
+
+class ZenHubServer : public ZenServerBase
+{
+ ZenHubServer& operator=(ZenHubServer&&) = delete;
+ ZenHubServer(ZenHubServer&&) = delete;
+
+public:
+ ZenHubServer();
+ ~ZenHubServer();
+
+ int Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry);
+ void Run();
+ void Cleanup();
+
+ void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; }
+ void SetTestMode(bool State) { m_TestMode = State; }
+ void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; }
+ void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; }
+
+private:
+ bool m_IsDedicatedMode = false;
+ bool m_TestMode = false;
+ std::filesystem::path m_DataRoot;
+ std::filesystem::path m_ContentRoot;
+ bool m_DebugOptionForcedCrash = false;
+
+ std::unique_ptr<HttpHubService> m_HubService;
+ std::unique_ptr<HttpApiService> m_ApiService;
+
+ void InitializeState(const ZenHubServerConfig& ServerConfig);
+ void InitializeServices(const ZenHubServerConfig& ServerConfig);
+ void RegisterServices(const ZenHubServerConfig& ServerConfig);
+};
+
+} // namespace zen
diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp
index 78bce7d06..996f96da8 100644
--- a/src/zenserver/main.cpp
+++ b/src/zenserver/main.cpp
@@ -25,6 +25,8 @@
#include "storage/storageconfig.h"
#include "storage/zenstorageserver.h"
+#include "hub/zenhubserver.h"
+
#if ZEN_PLATFORM_WINDOWS
# include <zencore/windows.h>
# include <zenutil/windows/windowsservice.h>
@@ -273,6 +275,8 @@ main(int argc, char* argv[])
exit(5);
#endif
break;
+ case kHub:
+ return AppMain<ZenHubServerMain>(argc, argv);
default:
case kStore:
return AppMain<ZenStorageServerMain>(argc, argv);
diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua
index fb65fa949..6ee80dc62 100644
--- a/src/zenserver/xmake.lua
+++ b/src/zenserver/xmake.lua
@@ -22,6 +22,7 @@ target("zenserver")
add_deps("sol2")
add_packages("json11")
add_packages("lua")
+ add_packages("consul")
if has_config("zenmimalloc") then
add_packages("mimalloc")
@@ -55,10 +56,7 @@ target("zenserver")
add_ldflags("-framework Security")
add_ldflags("-framework SystemConfiguration")
end
-
- add_options("compute")
- add_options("exec")
-
+
-- to work around some unfortunate Ctrl-C behaviour on Linux/Mac due to
-- our use of setsid() at startup we pass in `--no-detach` to zenserver
-- ensure that it recieves signals when the user requests termination
@@ -87,17 +85,60 @@ target("zenserver")
end
end)
+
after_build(function (target)
- if has_config("zensentry") then
- local crashpad_handler = "crashpad_handler"
- if is_plat("windows") then
- crashpad_handler = "crashpad_handler.exe"
+ local function copy_if_newer(src_file, dst_file, file_description)
+ if not os.exists(src_file) then
+ print("Source file '" .. src_file .. "' does not exist, cannot copy " .. file_description)
+ return
end
+ local should_copy = false
+ if not os.exists(dst_file) then
+ should_copy = true
+ else
+ local src_size = os.filesize(src_file)
+ local dst_size = os.filesize(dst_file)
+ local src_mtime = os.mtime(src_file)
+ local dst_mtime = os.mtime(dst_file)
+
+ if src_size ~= dst_size or src_mtime > dst_mtime then
+ should_copy = true
+ end
+ end
+
+ if should_copy then
+ os.cp(src_file, dst_file)
+ print("Copied '" .. file_description .. "' to output directory")
+ end
+ end
+
+ if has_config("zensentry") then
local pkg = target:pkg("sentry-native")
if pkg then
local installdir = pkg:installdir()
- os.cp(path.join(installdir, "bin/" .. crashpad_handler), target:targetdir())
- print("Copied " .. crashpad_handler .. " to output directory")
+
+ local crashpad_handler = "crashpad_handler"
+ if is_plat("windows") then
+ crashpad_handler = "crashpad_handler.exe"
+ end
+
+ local crashpad_handler_path = path.join(installdir, "bin/" .. crashpad_handler)
+ copy_if_newer(crashpad_handler_path, path.join(target:targetdir(), crashpad_handler), crashpad_handler)
+
+ if is_plat("windows") then
+ local crashpad_wer_path = path.join(installdir, "bin/crashpad_wer.dll")
+ copy_if_newer(crashpad_wer_path, path.join(target:targetdir(), "crashpad_wer.dll"), "crashpad_wer.dll")
+ end
+ end
+ end
+
+ local consul_pkg = target:pkg("consul")
+ if consul_pkg then
+ local installdir = consul_pkg:installdir()
+ local consul_bin = "consul"
+ if is_plat("windows") then
+ consul_bin = "consul.exe"
end
+ copy_if_newer(path.join(installdir, "bin", consul_bin), path.join(target:targetdir(), consul_bin), consul_bin)
end
end)