diff options
| author | Stefan Boberg <[email protected]> | 2026-01-21 09:38:16 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-01-21 09:38:16 +0100 |
| commit | e8d162c293fbdf9a40a1369b60b80fa286aceb0f (patch) | |
| tree | 0cdb2a913e5f4d6f2c151f36edba8fd5b2ca4f89 /src | |
| parent | builds multipart upload (#722) (diff) | |
| download | zen-e8d162c293fbdf9a40a1369b60b80fa286aceb0f.tar.xz zen-e8d162c293fbdf9a40a1369b60b80fa286aceb0f.zip | |
zen hub (#574)
Initial implementation of zenserver "hub" mode. This is an experimental feature.
zenserver can be started in hub mode by specifying `hub` as the first argument to zenserver
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver-test/hub-tests.cpp | 252 | ||||
| -rw-r--r-- | src/zenserver/hub/README.md | 28 | ||||
| -rw-r--r-- | src/zenserver/hub/hubservice.cpp | 867 | ||||
| -rw-r--r-- | src/zenserver/hub/hubservice.h | 42 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.cpp | 119 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.h | 40 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 303 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.h | 92 | ||||
| -rw-r--r-- | src/zenserver/main.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/xmake.lua | 61 |
10 files changed, 1798 insertions, 10 deletions
diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp new file mode 100644 index 000000000..42a5dcae4 --- /dev/null +++ b/src/zenserver-test/hub-tests.cpp @@ -0,0 +1,252 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#if ZEN_WITH_TESTS +# include "zenserver-test.h" +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/filesystem.h> +# include <zencore/stream.h> +# include <zencore/string.h> +# include <zencore/fmtutils.h> +# include <zencore/scopeguard.h> +# include <zenhttp/packageformat.h> +# include <zenremotestore/builds/buildstoragecache.h> +# include <zenutil/workerpools.h> +# include <zenutil/zenserverprocess.h> +# include <zenhttp/httpclient.h> +# include <zenutil/consul.h> + +namespace zen::tests::hub { + +using namespace std::literals; + +TEST_SUITE_BEGIN("hub.lifecycle"); + +TEST_CASE("hub.lifecycle.basic") +{ + { + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer); + + const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/hub/"); + + HttpClient::Response Result = Client.Get("status"); + CHECK(Result); + } +} + +TEST_CASE("hub.lifecycle.children") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer); + + const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE(PortNumber != 0); + + SUBCASE("spawn") + { + HttpClient Client(Instance.GetBaseUri() + "/hub/"); + + HttpClient::Response Result = Client.Get("status"); + REQUIRE(Result); + + { + Result = Client.Post("modules/abc/provision"); + REQUIRE(Result); + + CbObject AbcResult = Result.AsObject(); + CHECK(AbcResult["moduleId"].AsString() == "abc"sv); + const uint16_t AbcPort = AbcResult["port"].AsUInt16(0); + CHECK_NE(AbcPort, 0); + + // This should be a fresh instance with no contents + + HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort)); + + Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound); + + Result = AbcClient.Put("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567", + IoBufferBuilder::MakeFromMemory(MakeMemoryView("abcdef"sv))); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); + } + + { + Result = Client.Post("modules/def/provision"); + REQUIRE(Result); + + CbObject DefResult = Result.AsObject(); + CHECK(DefResult["moduleId"].AsString() == "def"sv); + const uint16_t DefPort = DefResult["port"].AsUInt16(0); + REQUIRE_NE(DefPort, 0); + + // This should be a fresh instance with no contents + + HttpClient DefClient(fmt::format("http://localhost:{}", DefPort)); + + Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound); + + Result = DefClient.Put("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567", + IoBufferBuilder::MakeFromMemory(MakeMemoryView("AbcDef"sv))); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); + } + + // this should be rejected because of the invalid module id + Result = Client.Post("modules/!!!!!/provision"); + CHECK(!Result); + + Result = Client.Post("modules/ghi/provision"); + REQUIRE(Result); + + // Tear down instances + + Result = Client.Post("modules/abc/deprovision"); + REQUIRE(Result); + + Result = Client.Post("modules/def/deprovision"); + REQUIRE(Result); + + Result = Client.Post("modules/ghi/deprovision"); + REQUIRE(Result); + + // re-provision to verify that (de)hydration preserved state + { + Result = Client.Post("modules/abc/provision"); + REQUIRE(Result); + + CbObject AbcResult = Result.AsObject(); + CHECK(AbcResult["moduleId"].AsString() == "abc"sv); + const uint16_t AbcPort = AbcResult["port"].AsUInt16(0); + REQUIRE_NE(AbcPort, 0); + + // This should contain the content from the previous run + + HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort)); + + Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + + CHECK_EQ(Result.AsText(), "abcdef"sv); + + Result = AbcClient.Put("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567", + IoBufferBuilder::MakeFromMemory(MakeMemoryView("ghijklmnop"sv))); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); + } + + { + Result = Client.Post("modules/def/provision"); + REQUIRE(Result); + + CbObject DefResult = Result.AsObject(); + CHECK(DefResult["moduleId"].AsString() == "def"sv); + const uint16_t DefPort = DefResult["port"].AsUInt16(0); + REQUIRE_NE(DefPort, 0); + + // This should contain the content from the previous run + + HttpClient DefClient(fmt::format("http://localhost:{}", DefPort)); + + Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + + CHECK_EQ(Result.AsText(), "AbcDef"sv); + + Result = DefClient.Put("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567", + IoBufferBuilder::MakeFromMemory(MakeMemoryView("GhijklmNop"sv))); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); + } + + Result = Client.Post("modules/abc/deprovision"); + REQUIRE(Result); + + Result = Client.Post("modules/def/deprovision"); + REQUIRE(Result); + + // re-provision to verify that (de)hydration preserved state, including + // state which was generated after the very first dehydration + { + Result = Client.Post("modules/abc/provision"); + REQUIRE(Result); + + CbObject AbcResult = Result.AsObject(); + CHECK(AbcResult["moduleId"].AsString() == "abc"sv); + const uint16_t AbcPort = AbcResult["port"].AsUInt16(0); + REQUIRE_NE(AbcPort, 0); + + // This should contain the content from the previous two runs + + HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort)); + + Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + + CHECK_EQ(Result.AsText(), "abcdef"sv); + + Result = AbcClient.Get("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + + CHECK_EQ(Result.AsText(), "ghijklmnop"sv); + } + + { + Result = Client.Post("modules/def/provision"); + REQUIRE(Result); + + CbObject DefResult = Result.AsObject(); + REQUIRE(DefResult["moduleId"].AsString() == "def"sv); + const uint16_t DefPort = DefResult["port"].AsUInt16(0); + REQUIRE_NE(DefPort, 0); + + // This should contain the content from the previous two runs + + HttpClient DefClient(fmt::format("http://localhost:{}", DefPort)); + + Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + + CHECK_EQ(Result.AsText(), "AbcDef"sv); + + Result = DefClient.Get("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + + CHECK_EQ(Result.AsText(), "GhijklmNop"sv); + } + + Result = Client.Post("modules/abc/deprovision"); + REQUIRE(Result); + + Result = Client.Post("modules/def/deprovision"); + REQUIRE(Result); + + // final sanity check that the hub is still responsive + Result = Client.Get("status"); + CHECK(Result); + } +} + +TEST_SUITE_END(); + +TEST_CASE("hub.consul.lifecycle") +{ + zen::consul::ConsulProcess ConsulProc; + ConsulProc.SpawnConsulAgent(); + + zen::consul::ConsulClient Client("http://localhost:8500/"); + Client.SetKeyValue("zen/hub/testkey", "testvalue"); + + std::string RetrievedValue = Client.GetKeyValue("zen/hub/testkey"); + CHECK_EQ(RetrievedValue, "testvalue"); + + Client.DeleteKey("zen/hub/testkey"); + + ConsulProc.StopConsulAgent(); +} + +} // namespace zen::tests::hub +#endif diff --git a/src/zenserver/hub/README.md b/src/zenserver/hub/README.md new file mode 100644 index 000000000..322be3649 --- /dev/null +++ b/src/zenserver/hub/README.md @@ -0,0 +1,28 @@ +# Zen Server Hub + +The Zen Server can act in a "hub" mode. In this mode, the only services offered are the basic health +and diagnostic services alongside an API to provision and deprovision Storage server instances. + +## Generic Server API + +GET `/health` - returns an `OK!` payload when all enabled services are up and responding + +## Hub API + +GET `{moduleid}` - alphanumeric identifier to identify a dataset (typically associated with a content plug-in module) + +GET `/hub/status` - obtain a summary of the currently live instances + +GET `/hub/modules/{moduleid}` - retrieve information about a module + +POST `/hub/modules/{moduleid}/provision` - provision service for module + +POST `/hub/modules/{moduleid}/deprovision` - deprovision service for module + +GET `/hub/stats` - retrieve stats for service + +## Hub Configuration + +The hub service can use Consul to provide status updates + +The hub service can emit telemetry to an Open Telemetry collector diff --git a/src/zenserver/hub/hubservice.cpp b/src/zenserver/hub/hubservice.cpp new file mode 100644 index 000000000..4d9da3a57 --- /dev/null +++ b/src/zenserver/hub/hubservice.cpp @@ -0,0 +1,867 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "hubservice.h" + +#include "hydration.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/system.h> +#include <zenutil/zenserverprocess.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <EASTL/fixed_vector.h> +#include <asio.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <unordered_map> +#include <unordered_set> + +namespace zen { + +/////////////////////////////////////////////////////////////////////////// + +/** + * A timeline of events with sequence IDs and timestamps. Used to + * track significant events for broadcasting to listeners. + */ +class EventTimeline +{ +public: + EventTimeline() { m_Events.reserve(1024); } + + ~EventTimeline() {} + + EventTimeline(const EventTimeline&) = delete; + EventTimeline& operator=(const EventTimeline&) = delete; + + void RecordEvent(std::string_view EventTag, CbObject EventMetadata) + { + const uint64_t SequenceId = m_NextEventId++; + const auto Now = std::chrono::steady_clock::now(); + RwLock::ExclusiveLockScope _(m_Lock); + m_Events.emplace_back(SequenceId, EventTag, Now, std::move(EventMetadata)); + } + + struct EventRecord + { + uint64_t SequenceId; + std::string Tag; + std::chrono::steady_clock::time_point Timestamp; + CbObject EventMetadata; + + EventRecord(uint64_t InSequenceId, + std::string_view InTag, + std::chrono::steady_clock::time_point InTimestamp, + CbObject InEventMetadata = CbObject()) + : SequenceId(InSequenceId) + , Tag(InTag) + , Timestamp(InTimestamp) + , EventMetadata(InEventMetadata) + { + } + }; + + /** + * Iterate over events that have a SequenceId greater than SinceEventId + * + * @param Callback A callable that takes a const EventRecord& + * @param SinceEventId The SequenceId to compare against + */ + void IterateEventsSince(auto&& Callback, uint64_t SinceEventId) + { + // Hold the lock for as short a time as possible + eastl::fixed_vector<EventRecord, 128> EventsToProcess; + m_Lock.WithSharedLock([&] { + for (auto& Event : m_Events) + { + if (Event.SequenceId > SinceEventId) + { + EventsToProcess.push_back(Event); + } + } + }); + + // Now invoke the callback outside the lock + for (auto& Event : EventsToProcess) + { + Callback(Event); + } + } + + /** + * Trim events up to (and including) the given SequenceId. Intended + * to be used for cleaning up events which are not longer interesting. + * + * @param UpToEventId The SequenceId up to which events should be removed + */ + void TrimEventsUpTo(uint64_t UpToEventId) + { + RwLock::ExclusiveLockScope _(m_Lock); + auto It = std::remove_if(m_Events.begin(), m_Events.end(), [UpToEventId](const EventRecord& Event) { + return Event.SequenceId <= UpToEventId; + }); + m_Events.erase(It, m_Events.end()); + } + +private: + std::atomic<uint64_t> m_NextEventId{0}; + + RwLock m_Lock; + std::vector<EventRecord> m_Events; +}; + +////////////////////////////////////////////////////////////////////////// + +struct ResourceMetrics +{ + uint64_t DiskUsageBytes = 0; + uint64_t MemoryUsageBytes = 0; +}; + +/** + * Storage Server Instance + * + * This class manages the lifecycle of a storage server instance, and + * provides functions to query its state. There should be one instance + * per module ID. + */ +struct StorageServerInstance +{ + StorageServerInstance(ZenServerEnvironment& RunEnvironment, + std::string_view ModuleId, + std::filesystem::path FileHydrationPath, + std::filesystem::path HydrationTempPath); + ~StorageServerInstance(); + + void Provision(); + void Deprovision(); + + void Hibernate(); + void Wake(); + + const ResourceMetrics& GetResourceMetrics() const { return m_ResourceMetrics; } + + inline std::string_view GetModuleId() const { return m_ModuleId; } + inline bool IsProvisioned() const { return m_IsProvisioned.load(); } + + inline uint16_t GetBasePort() const { return m_ServerInstance.GetBasePort(); } + +private: + RwLock m_Lock; + std::string m_ModuleId; + std::atomic<bool> m_IsProvisioned{false}; + std::atomic<bool> m_IsHibernated{false}; + ZenServerInstance m_ServerInstance; + std::filesystem::path m_BaseDir; + std::filesystem::path m_TempDir; + std::filesystem::path m_HydrationPath; + ResourceMetrics m_ResourceMetrics; + + void SpawnServerProcess(); + + void Hydrate(); + void Dehydrate(); +}; + +StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, + std::string_view ModuleId, + std::filesystem::path FileHydrationPath, + std::filesystem::path HydrationTempPath) +: m_ModuleId(ModuleId) +, m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer) +, m_HydrationPath(FileHydrationPath) +{ + m_BaseDir = RunEnvironment.CreateChildDir(ModuleId); + m_TempDir = HydrationTempPath / ModuleId; +} + +StorageServerInstance::~StorageServerInstance() +{ +} + +void +StorageServerInstance::SpawnServerProcess() +{ + ZEN_ASSERT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); + + m_ServerInstance.SetServerExecutablePath(GetRunningExecutablePath()); + m_ServerInstance.SetDataDir(m_BaseDir); + const uint16_t BasePort = m_ServerInstance.SpawnServerAndWaitUntilReady(); + + ZEN_DEBUG("Storage server instance for module '{}' started, listening on port {}", m_ModuleId, BasePort); + + m_ServerInstance.EnableShutdownOnDestroy(); +} + +void +StorageServerInstance::Provision() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + if (m_IsProvisioned) + { + ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId); + + return; + } + + if (m_IsHibernated) + { + Wake(); + } + else + { + ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir); + + Hydrate(); + + SpawnServerProcess(); + } + + m_IsProvisioned = true; +} + +void +StorageServerInstance::Deprovision() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + if (!m_IsProvisioned) + { + ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned", m_ModuleId); + + return; + } + + ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId); + + m_ServerInstance.Shutdown(); + + Dehydrate(); + + m_IsProvisioned = false; +} + +void +StorageServerInstance::Hibernate() +{ + // Signal server to shut down, but keep data around for later wake + + RwLock::ExclusiveLockScope _(m_Lock); + + if (!m_IsProvisioned) + { + ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned", m_ModuleId); + + return; + } + + if (m_IsHibernated) + { + ZEN_WARN("Storage server instance for module '{}' is already hibernated", m_ModuleId); + + return; + } + + if (!m_ServerInstance.IsRunning()) + { + ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not running", m_ModuleId); + + // This is an unexpected state. Should consider the instance invalid? + + return; + } + + try + { + m_ServerInstance.Shutdown(); + + m_IsHibernated = true; + m_IsProvisioned = false; + + return; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", m_ModuleId, Ex.what()); + } +} + +void +StorageServerInstance::Wake() +{ + // Start server in-place using existing data + + RwLock::ExclusiveLockScope _(m_Lock); + + if (!m_IsHibernated) + { + ZEN_WARN("Attempted to wake storage server instance for module '{}' which is not hibernated", m_ModuleId); + + return; + } + + ZEN_ASSERT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); + + try + { + SpawnServerProcess(); + m_IsHibernated = false; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", m_ModuleId, Ex.what()); + + // TODO: this instance should be marked as invalid + } +} + +void +StorageServerInstance::Hydrate() +{ + HydrationConfig Config{.ServerStateDir = m_BaseDir, + .TempDir = m_TempDir, + .ModuleId = m_ModuleId, + .TargetSpecification = WideToUtf8(m_HydrationPath.native())}; + + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateFileHydrator(); + + Hydrator->Configure(Config); + Hydrator->Hydrate(); +} + +void +StorageServerInstance::Dehydrate() +{ + HydrationConfig Config{.ServerStateDir = m_BaseDir, + .TempDir = m_TempDir, + .ModuleId = m_ModuleId, + .TargetSpecification = WideToUtf8(m_HydrationPath.native())}; + + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateFileHydrator(); + + Hydrator->Configure(Config); + Hydrator->Dehydrate(); +} + +////////////////////////////////////////////////////////////////////////// + +struct HttpHubService::Impl +{ + Impl(const Impl&) = delete; + Impl& operator=(const Impl&) = delete; + + Impl(); + ~Impl(); + + void Initialize(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir) + { + m_RunEnvironment.InitializeForHub(HubBaseDir, ChildBaseDir); + m_FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); + ZEN_INFO("using file hydration path: '{}'", m_FileHydrationPath); + + m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp"); + ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath); + + // This is necessary to ensure the hub assigns a distinct port range. + // We need to do this primarily because otherwise automated tests will + // fail as the test runner will create processes in the default range. + // We should probably make this configurable or dynamic for maximum + // flexibility, and to allow running multiple hubs on the same host if + // necessary. + m_RunEnvironment.SetNextPortNumber(21000); + } + + void Cleanup() + { + RwLock::ExclusiveLockScope _(m_Lock); + m_Instances.clear(); + } + + struct ProvisionedInstanceInfo + { + std::string BaseUri; + uint16_t Port; + }; + + /** + * Provision a storage server instance for the given module ID. + * + * @param ModuleId The ID of the module to provision. + * @param OutInfo If successful, information about the provisioned instance will be returned here. + * @param OutReason If unsuccessful, the reason will be returned here. + */ + bool Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason) + { + StorageServerInstance* Instance = nullptr; + bool IsNewInstance = false; + { + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_Instances.find(std::string(ModuleId)); It == m_Instances.end()) + { + std::string Reason; + if (!CanProvisionInstance(ModuleId, /* out */ Reason)) + { + ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason); + + OutReason = Reason; + + return false; + } + + IsNewInstance = true; + auto NewInstance = + std::make_unique<StorageServerInstance>(m_RunEnvironment, ModuleId, m_FileHydrationPath, m_HydrationTempPath); + Instance = NewInstance.get(); + m_Instances.emplace(std::string(ModuleId), std::move(NewInstance)); + + ZEN_INFO("Created new storage server instance for module '{}'", ModuleId); + } + else + { + Instance = It->second.get(); + } + + m_ProvisioningModules.emplace(std::string(ModuleId)); + } + + ZEN_ASSERT(Instance != nullptr); + + auto RemoveProvisioningModule = MakeGuard([&] { + RwLock::ExclusiveLockScope _(m_Lock); + m_ProvisioningModules.erase(std::string(ModuleId)); + }); + + // NOTE: this is done while not holding the lock, as provisioning may take time + // and we don't want to block other operations. We track which modules are being + // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision + // those modules while in this state. + + UpdateStats(); + + try + { + Instance->Provision(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); + if (IsNewInstance) + { + // Clean up + RwLock::ExclusiveLockScope _(m_Lock); + m_Instances.erase(std::string(ModuleId)); + } + return false; + } + + OutInfo.Port = Instance->GetBasePort(); + + // TODO: base URI? Would need to know what host name / IP to use + + return true; + } + + /** + * Deprovision a storage server instance for the given module ID. + * + * @param ModuleId The ID of the module to deprovision. + * @param OutReason If unsuccessful, the reason will be returned here. + * @return true if the instance was found and deprovisioned, false otherwise. + */ + bool Deprovision(const std::string& ModuleId, std::string& OutReason) + { + std::unique_ptr<StorageServerInstance> Instance; + + { + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_ProvisioningModules.find(ModuleId); It != m_ProvisioningModules.end()) + { + OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId); + + ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId); + + return false; + } + + if (auto It = m_Instances.find(ModuleId); It == m_Instances.end()) + { + ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); + + // Not found, OutReason should be empty + return false; + } + else + { + Instance = std::move(It->second); + m_Instances.erase(It); + m_DeprovisioningModules.emplace(ModuleId); + } + } + + // The module is deprovisioned outside the lock to avoid blocking other operations. + // + // To ensure that no new provisioning can occur while we're deprovisioning, + // we add the module ID to m_DeprovisioningModules and remove it once + // deprovisioning is complete. + + auto _ = MakeGuard([&] { + RwLock::ExclusiveLockScope _(m_Lock); + m_DeprovisioningModules.erase(ModuleId); + }); + + Instance->Deprovision(); + + return true; + } + + /** + * Find a storage server instance for the given module ID. + * + * Beware that as this returns a raw pointer to the instance, the caller must ensure + * that the instance is not deprovisioned while in use. + * + * @param ModuleId The ID of the module to find. + * @param OutInstance If found, the instance will be returned here. + * @return true if the instance was found, false otherwise. + */ + bool Find(std::string_view ModuleId, StorageServerInstance** OutInstance = nullptr) + { + RwLock::SharedLockScope _(m_Lock); + if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end()) + { + if (OutInstance) + { + *OutInstance = It->second.get(); + } + return true; + } + else if (OutInstance) + { + *OutInstance = nullptr; + } + return false; + } + + /** + * Enumerate all storage server instances. + * + * @param Callback The callback to invoke for each instance. Note that you should + * not do anything heavyweight in the callback as it is invoked while holding + * a shared lock. + */ + void EnumerateModules(auto&& Callback) + { + RwLock::SharedLockScope _(m_Lock); + for (auto& It : m_Instances) + { + Callback(*It.second); + } + } + + int GetInstanceCount() + { + RwLock::SharedLockScope _(m_Lock); + return gsl::narrow_cast<int>(m_Instances.size()); + } + + inline int GetInstanceLimit() { return m_InstanceLimit; } + inline int GetMaxInstanceCount() { return m_MaxInstanceCount; } + +private: + ZenServerEnvironment m_RunEnvironment; + std::filesystem::path m_FileHydrationPath; + std::filesystem::path m_HydrationTempPath; + RwLock m_Lock; + std::unordered_map<std::string, std::unique_ptr<StorageServerInstance>> m_Instances; + std::unordered_set<std::string> m_DeprovisioningModules; + std::unordered_set<std::string> m_ProvisioningModules; + int m_MaxInstanceCount = 0; + void UpdateStats(); + + // Capacity tracking + + int m_InstanceLimit = 1000; + ResourceMetrics m_ResourceLimits; + SystemMetrics m_HostMetrics; + + void UpdateCapacityMetrics(); + bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); +}; + +HttpHubService::Impl::Impl() +{ + m_HostMetrics = zen::GetSystemMetrics(); + m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; + m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024; +} + +HttpHubService::Impl::~Impl() +{ + try + { + ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); + + m_Lock.WithExclusiveLock([this] { + for (auto& [ModuleId, Instance] : m_Instances) + { + Instance->Deprovision(); + } + m_Instances.clear(); + }); + } + catch (const std::exception& e) + { + ZEN_WARN("Exception during hub service shutdown: {}", e.what()); + } +} + +void +HttpHubService::Impl::UpdateCapacityMetrics() +{ + m_HostMetrics = zen::GetSystemMetrics(); + + // Update per-instance metrics +} + +void +HttpHubService::Impl::UpdateStats() +{ + m_Lock.WithSharedLock([this] { m_MaxInstanceCount = Max(m_MaxInstanceCount, gsl::narrow_cast<int>(m_Instances.size())); }); +} + +bool +HttpHubService::Impl::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) +{ + if (m_DeprovisioningModules.find(std::string(ModuleId)) != m_DeprovisioningModules.end()) + { + OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId); + + return false; + } + + if (m_ProvisioningModules.find(std::string(ModuleId)) != m_ProvisioningModules.end()) + { + OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId); + + return false; + } + + if (gsl::narrow_cast<int>(m_Instances.size()) >= m_InstanceLimit) + { + OutReason = fmt::format("instance limit exceeded ({})", m_InstanceLimit); + + return false; + } + + // TODO: handle additional resource metrics + + return true; +} + +/////////////////////////////////////////////////////////////////////////// + +HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir) : m_Impl(std::make_unique<Impl>()) +{ + using namespace std::literals; + + m_Impl->Initialize(HubBaseDir, ChildBaseDir); + + m_Router.AddMatcher("moduleid", [](std::string_view Str) -> bool { + for (const auto C : Str) + { + if (std::isalnum(C) || C == '-') + { + // fine + } + else + { + // not fine + return false; + } + } + + return true; + }); + + m_Router.RegisterRoute( + "status", + [this](HttpRouterRequest& Req) { + CbObjectWriter Obj; + Obj.BeginArray("modules"); + m_Impl->EnumerateModules([&Obj](StorageServerInstance& Instance) { + Obj.BeginObject(); + Obj << "moduleId" << Instance.GetModuleId(); + Obj << "provisioned" << Instance.IsProvisioned(); + Obj.EndObject(); + }); + Obj.EndArray(); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "modules/{moduleid}", + [this](HttpRouterRequest& Req) { + std::string_view ModuleId = Req.GetCapture(1); + + if (Req.ServerRequest().RequestVerb() == HttpVerb::kDelete) + { + HandleModuleDelete(Req.ServerRequest(), ModuleId); + } + else + { + HandleModuleGet(Req.ServerRequest(), ModuleId); + } + }, + HttpVerb::kGet | HttpVerb::kDelete); + + m_Router.RegisterRoute( + "modules/{moduleid}/provision", + [this](HttpRouterRequest& Req) { + std::string_view ModuleId = Req.GetCapture(1); + + std::string FailureReason = "unknown"; + HttpResponseCode ResponseCode = HttpResponseCode::OK; + + try + { + Impl::ProvisionedInstanceInfo Info; + if (m_Impl->Provision(ModuleId, /* out */ Info, /* out */ FailureReason)) + { + CbObjectWriter Obj; + Obj << "moduleId" << ModuleId; + Obj << "baseUri" << Info.BaseUri; + Obj << "port" << Info.Port; + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + + return; + } + else + { + ResponseCode = HttpResponseCode::BadRequest; + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Exception while provisioning module '{}': {}", ModuleId, Ex.what()); + + FailureReason = Ex.what(); + ResponseCode = HttpResponseCode::InternalServerError; + } + + Req.ServerRequest().WriteResponse(ResponseCode, HttpContentType::kText, FailureReason); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "modules/{moduleid}/deprovision", + [this](HttpRouterRequest& Req) { + std::string_view ModuleId = Req.GetCapture(1); + std::string FailureReason = "unknown"; + + try + { + if (!m_Impl->Deprovision(std::string(ModuleId), /* out */ FailureReason)) + { + if (FailureReason.empty()) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound); + } + else + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason); + } + } + + CbObjectWriter Obj; + Obj << "moduleId" << ModuleId; + + return Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Exception while deprovisioning module '{}': {}", ModuleId, Ex.what()); + + FailureReason = Ex.what(); + } + + Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, FailureReason); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "stats", + [this](HttpRouterRequest& Req) { + CbObjectWriter Obj; + Obj << "currentInstanceCount" << m_Impl->GetInstanceCount(); + Obj << "maxInstanceCount" << m_Impl->GetMaxInstanceCount(); + Obj << "instanceLimit" << m_Impl->GetInstanceLimit(); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kGet); +} + +HttpHubService::~HttpHubService() +{ +} + +const char* +HttpHubService::BaseUri() const +{ + return "/hub/"; +} + +void +HttpHubService::SetNotificationEndpoint(std::string_view UpstreamNotificationEndpoint, std::string_view InstanceId) +{ + ZEN_UNUSED(UpstreamNotificationEndpoint, InstanceId); + // TODO: store these for use in notifications, on some interval/criteria which is currently TBD +} + +void +HttpHubService::HandleRequest(zen::HttpServerRequest& Request) +{ + m_Router.HandleRequest(Request); +} + +void +HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId) +{ + StorageServerInstance* Instance = nullptr; + if (!m_Impl->Find(ModuleId, &Instance)) + { + Request.WriteResponse(HttpResponseCode::NotFound); + return; + } + + CbObjectWriter Obj; + Obj << "moduleId" << Instance->GetModuleId(); + Obj << "provisioned" << Instance->IsProvisioned(); + Request.WriteResponse(HttpResponseCode::OK, Obj.Save()); +} + +void +HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId) +{ + StorageServerInstance* Instance = nullptr; + if (!m_Impl->Find(ModuleId, &Instance)) + { + Request.WriteResponse(HttpResponseCode::NotFound); + return; + } + + // TODO: deprovision and nuke all related storage + + CbObjectWriter Obj; + Obj << "moduleId" << Instance->GetModuleId(); + Obj << "provisioned" << Instance->IsProvisioned(); + Request.WriteResponse(HttpResponseCode::OK, Obj.Save()); +} + +} // namespace zen diff --git a/src/zenserver/hub/hubservice.h b/src/zenserver/hub/hubservice.h new file mode 100644 index 000000000..1a5a8c57c --- /dev/null +++ b/src/zenserver/hub/hubservice.h @@ -0,0 +1,42 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhttp/httpserver.h> + +#include "hydration.h" + +namespace zen { + +/** ZenServer Hub Service + * + * Manages a set of storage servers on the behalf of external clients. For + * use in UEFN content worker style scenarios. + * + */ +class HttpHubService : public zen::HttpService +{ +public: + HttpHubService(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir); + ~HttpHubService(); + + HttpHubService(const HttpHubService&) = delete; + HttpHubService& operator=(const HttpHubService&) = delete; + + virtual const char* BaseUri() const override; + virtual void HandleRequest(zen::HttpServerRequest& Request) override; + + void SetNotificationEndpoint(std::string_view UpstreamNotificationEndpoint, std::string_view InstanceId); + +private: + HttpRequestRouter m_Router; + + struct Impl; + + std::unique_ptr<Impl> m_Impl; + + void HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId); + void HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId); +}; + +} // namespace zen diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp new file mode 100644 index 000000000..52c17fe1a --- /dev/null +++ b/src/zenserver/hub/hydration.cpp @@ -0,0 +1,119 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "hydration.h" + +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> + +namespace zen { + +/////////////////////////////////////////////////////////////////////////// + +struct FileHydrator : public HydrationStrategyBase +{ + virtual void Configure(const HydrationConfig& Config) override; + virtual void Hydrate() override; + virtual void Dehydrate() override; + +private: + HydrationConfig m_Config; + std::filesystem::path m_StorageModuleRootDir; +}; + +void +FileHydrator::Configure(const HydrationConfig& Config) +{ + m_Config = Config; + + std::filesystem::path ConfigPath(Utf8ToWide(m_Config.TargetSpecification)); + + if (!std::filesystem::exists(ConfigPath)) + { + throw std::invalid_argument(fmt::format("Target does not exist: '{}'", ConfigPath.string())); + } + + m_StorageModuleRootDir = ConfigPath / m_Config.ModuleId; + + CreateDirectories(m_StorageModuleRootDir); +} + +void +FileHydrator::Hydrate() +{ + ZEN_INFO("Hydrating state from '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir); + + // Ensure target is clean + ZEN_DEBUG("Wiping server state at '{}'", m_Config.ServerStateDir); + const bool ForceRemoveReadOnlyFiles = true; + CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + + bool WipeServerState = false; + + try + { + ZEN_DEBUG("Copying '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir); + CopyTree(m_StorageModuleRootDir, m_Config.ServerStateDir, {.EnableClone = true}); + } + catch (std::exception& Ex) + { + ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_Config.ServerStateDir); + + // We don't do the clean right here to avoid potentially running into double-throws + WipeServerState = true; + } + + if (WipeServerState) + { + ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); + CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + } + + // Note that we leave the storage state intact until next dehydration replaces the content +} + +void +FileHydrator::Dehydrate() +{ + ZEN_INFO("Dehydrating state from '{}' to '{}'", m_Config.ServerStateDir, m_StorageModuleRootDir); + + const std::filesystem::path TargetDir = m_StorageModuleRootDir; + + // Ensure target is clean. This could be replaced with an atomic copy at a later date + // (i.e copy into a temporary directory name and rename it once complete) + + ZEN_DEBUG("Cleaning storage root '{}'", TargetDir); + const bool ForceRemoveReadOnlyFiles = true; + CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles); + + bool CopySuccess = true; + + try + { + ZEN_DEBUG("Copying '{}' to '{}'", m_Config.ServerStateDir, TargetDir); + CopyTree(m_Config.ServerStateDir, TargetDir, {.EnableClone = true}); + } + catch (std::exception& Ex) + { + ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_StorageModuleRootDir); + + // We don't do the clean right here to avoid potentially running into double-throws + CopySuccess = false; + } + + if (!CopySuccess) + { + ZEN_DEBUG("Removing partially copied state from '{}'", TargetDir); + CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles); + } + + ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); + CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); +} + +std::unique_ptr<HydrationStrategyBase> +CreateFileHydrator() +{ + return std::make_unique<FileHydrator>(); +} + +} // namespace zen diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h new file mode 100644 index 000000000..f86f2accf --- /dev/null +++ b/src/zenserver/hub/hydration.h @@ -0,0 +1,40 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zenhubserver.h" + +namespace zen { + +struct HydrationConfig +{ + // Location of server state to hydrate/dehydrate + std::filesystem::path ServerStateDir; + // Temporary directory available for use during hydration/dehydration + std::filesystem::path TempDir; + // Module ID of the server state being hydrated/dehydrated + std::string ModuleId; + // Back-end specific target specification (e.g. S3 bucket, file path, etc) + std::string TargetSpecification; +}; + +/** + * @brief State hydration strategy interface + * + * An instance of this interface is used to perform hydration OR + * dehydration of server state. It's expected to be used only once + * and not reused. + * + */ +struct HydrationStrategyBase +{ + virtual ~HydrationStrategyBase() = default; + + virtual void Dehydrate() = 0; + virtual void Hydrate() = 0; + virtual void Configure(const HydrationConfig& Config) = 0; +}; + +std::unique_ptr<HydrationStrategyBase> CreateFileHydrator(); + +} // namespace zen diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp new file mode 100644 index 000000000..7a4ba951d --- /dev/null +++ b/src/zenserver/hub/zenhubserver.cpp @@ -0,0 +1,303 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenhubserver.h" +#include "hubservice.h" + +#include <zencore/fmtutils.h> +#include <zencore/memory/llm.h> +#include <zencore/memory/memorytrace.h> +#include <zencore/memory/tagtrace.h> +#include <zencore/scopeguard.h> +#include <zencore/sentryintegration.h> +#include <zencore/system.h> +#include <zencore/windows.h> +#include <zenhttp/httpapiservice.h> +#include <zenutil/service.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cxxopts.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +void +ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) +{ + Options.add_option("hub", + "", + "upstream-notification-endpoint", + "Endpoint URL for upstream notifications", + cxxopts::value<std::string>(m_ServerOptions.UpstreamNotificationEndpoint)->default_value(""), + ""); + + Options.add_option("hub", + "", + "instance-id", + "Instance ID for use in notifications", + cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""), + ""); +} + +void +ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) +{ + ZEN_UNUSED(Options); +} + +void +ZenHubServerConfigurator::ApplyOptions(cxxopts::Options& Options) +{ + ZEN_UNUSED(Options); +} + +void +ZenHubServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions) +{ + ZEN_UNUSED(LuaOptions); +} + +void +ZenHubServerConfigurator::ValidateOptions() +{ +} + +/////////////////////////////////////////////////////////////////////////// + +ZenHubServer::ZenHubServer() +{ +} + +ZenHubServer::~ZenHubServer() +{ + Cleanup(); +} + +int +ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry) +{ + ZEN_TRACE_CPU("ZenHubServer::Initialize"); + ZEN_MEMSCOPE(GetZenserverTag()); + + ZEN_INFO(ZEN_APP_NAME " initializing in HUB server mode"); + + const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry); + if (EffectiveBasePort < 0) + { + return EffectiveBasePort; + } + + // This is a workaround to make sure we can have automated tests. Without + // this the ranges for different child zen hub processes could overlap with + // the main test range. + ZenServerEnvironment::SetBaseChildId(1000); + + m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; + + InitializeState(ServerConfig); + InitializeServices(ServerConfig); + RegisterServices(ServerConfig); + + ZenServerBase::Finalize(); + + return EffectiveBasePort; +} + +void +ZenHubServer::Cleanup() +{ + ZEN_TRACE_CPU("ZenStorageServer::Cleanup"); + ZEN_INFO(ZEN_APP_NAME " cleaning up"); + try + { + m_IoContext.stop(); + if (m_IoRunner.joinable()) + { + m_IoRunner.join(); + } + + if (m_Http) + { + m_Http->Close(); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); + } +} + +void +ZenHubServer::InitializeState(const ZenHubServerConfig& ServerConfig) +{ + ZEN_UNUSED(ServerConfig); +} + +void +ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) +{ + ZEN_UNUSED(ServerConfig); + + ZEN_INFO("instantiating API service"); + m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http); + + ZEN_INFO("instantiating hub service"); + m_HubService = std::make_unique<HttpHubService>(ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers"); + m_HubService->SetNotificationEndpoint(ServerConfig.UpstreamNotificationEndpoint, ServerConfig.InstanceId); +} + +void +ZenHubServer::RegisterServices(const ZenHubServerConfig& ServerConfig) +{ + ZEN_UNUSED(ServerConfig); + + if (m_HubService) + { + m_Http->RegisterService(*m_HubService); + } + + if (m_ApiService) + { + m_Http->RegisterService(*m_ApiService); + } +} + +void +ZenHubServer::Run() +{ + if (m_ProcessMonitor.IsActive()) + { + CheckOwnerPid(); + } + + if (!m_TestMode) + { + // clang-format off + ZEN_INFO(R"(__________ ___ ___ ___. )" "\n" + R"(\____ /____ ____ / | \ __ _\_ |__ )" "\n" + R"( / // __ \ / \ / ~ \ | \ __ \ )" "\n" + R"( / /\ ___/| | \ \ Y / | / \_\ \)" "\n" + R"(/_______ \___ >___| / \___|_ /|____/|___ /)" "\n" + R"( \/ \/ \/ \/ \/ )"); + // clang-format on + + ExtendableStringBuilder<256> BuildOptions; + GetBuildOptions(BuildOptions, '\n'); + ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions); + } + + ZEN_INFO(ZEN_APP_NAME " now running as HUB (pid: {})", GetCurrentProcessId()); + +#if ZEN_PLATFORM_WINDOWS + if (zen::windows::IsRunningOnWine()) + { + ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well"); + } +#endif + +#if ZEN_USE_SENTRY + ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); + if (m_UseSentry) + { + SentryIntegration::ClearCaches(); + } +#endif + + if (m_DebugOptionForcedCrash) + { + ZEN_DEBUG_BREAK(); + } + + const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode; + + SetNewState(kRunning); + + OnReady(); + + m_Http->Run(IsInteractiveMode); + + SetNewState(kShuttingDown); + + ZEN_INFO(ZEN_APP_NAME " exiting"); +} + +////////////////////////////////////////////////////////////////////////////////// + +ZenHubServerMain::ZenHubServerMain(ZenHubServerConfig& ServerOptions) : ZenServerMain(ServerOptions), m_ServerOptions(ServerOptions) +{ +} + +void +ZenHubServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) +{ + ZenHubServer Server; + Server.SetDataRoot(m_ServerOptions.DataDir); + Server.SetContentRoot(m_ServerOptions.ContentDir); + Server.SetTestMode(m_ServerOptions.IsTest); + Server.SetDedicatedMode(m_ServerOptions.IsDedicated); + + const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry); + if (EffectiveBasePort == -1) + { + // Server.Initialize has already logged what the issue is - just exit with failure code here. + std::exit(1); + } + + Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); + if (EffectiveBasePort != m_ServerOptions.BasePort) + { + ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); + m_ServerOptions.BasePort = EffectiveBasePort; + } + + std::unique_ptr<std::thread> ShutdownThread; + std::unique_ptr<NamedEvent> ShutdownEvent; + + ExtendableStringBuilder<64> ShutdownEventName; + ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown"; + ShutdownEvent.reset(new NamedEvent{ShutdownEventName}); + + // Monitor shutdown signals + + ShutdownThread.reset(new std::thread{[&] { + SetCurrentThreadName("shutdown_mon"); + + ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId()); + + if (ShutdownEvent->Wait()) + { + ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId()); + Server.RequestExit(0); + } + else + { + ZEN_INFO("shutdown signal wait() failed"); + } + }}); + + auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] { + ReportServiceStatus(ServiceStatus::Stopping); + + if (ShutdownEvent) + { + ShutdownEvent->Set(); + } + if (ShutdownThread && ShutdownThread->joinable()) + { + ShutdownThread->join(); + } + }); + + // If we have a parent process, establish the mechanisms we need + // to be able to communicate readiness with the parent + + Server.SetIsReadyFunc([&] { + std::error_code Ec; + m_LockFile.Update(MakeLockData(true), Ec); + ReportServiceStatus(ServiceStatus::Running); + NotifyReady(); + }); + + Server.Run(); +} + +} // namespace zen diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h new file mode 100644 index 000000000..ac14362f0 --- /dev/null +++ b/src/zenserver/hub/zenhubserver.h @@ -0,0 +1,92 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zenserver.h" + +namespace cxxopts { +class Options; +} +namespace zen::LuaConfig { +struct Options; +} + +namespace zen { + +class HttpApiService; +class HttpHubService; + +struct ZenHubServerConfig : public ZenServerConfig +{ + std::string UpstreamNotificationEndpoint; + std::string InstanceId; // For use in notifications +}; + +struct ZenHubServerConfigurator : public ZenServerConfiguratorBase +{ + ZenHubServerConfigurator(ZenHubServerConfig& ServerOptions) : ZenServerConfiguratorBase(ServerOptions), m_ServerOptions(ServerOptions) + { + } + + ~ZenHubServerConfigurator() = default; + +private: + virtual void AddCliOptions(cxxopts::Options& Options) override; + virtual void AddConfigOptions(LuaConfig::Options& Options) override; + virtual void ApplyOptions(cxxopts::Options& Options) override; + virtual void OnConfigFileParsed(LuaConfig::Options& LuaOptions) override; + virtual void ValidateOptions() override; + + ZenHubServerConfig& m_ServerOptions; +}; + +class ZenHubServerMain : public ZenServerMain +{ +public: + ZenHubServerMain(ZenHubServerConfig& ServerOptions); + virtual void DoRun(ZenServerState::ZenServerEntry* Entry) override; + + ZenHubServerMain(const ZenHubServerMain&) = delete; + ZenHubServerMain& operator=(const ZenHubServerMain&) = delete; + + typedef ZenHubServerConfig Config; + typedef ZenHubServerConfigurator Configurator; + +private: + ZenHubServerConfig& m_ServerOptions; +}; + +class ZenHubServer : public ZenServerBase +{ + ZenHubServer& operator=(ZenHubServer&&) = delete; + ZenHubServer(ZenHubServer&&) = delete; + +public: + ZenHubServer(); + ~ZenHubServer(); + + int Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry); + void Run(); + void Cleanup(); + + void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } + void SetTestMode(bool State) { m_TestMode = State; } + void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } + void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } + +private: + bool m_IsDedicatedMode = false; + bool m_TestMode = false; + std::filesystem::path m_DataRoot; + std::filesystem::path m_ContentRoot; + bool m_DebugOptionForcedCrash = false; + + std::unique_ptr<HttpHubService> m_HubService; + std::unique_ptr<HttpApiService> m_ApiService; + + void InitializeState(const ZenHubServerConfig& ServerConfig); + void InitializeServices(const ZenHubServerConfig& ServerConfig); + void RegisterServices(const ZenHubServerConfig& ServerConfig); +}; + +} // namespace zen diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index 78bce7d06..996f96da8 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -25,6 +25,8 @@ #include "storage/storageconfig.h" #include "storage/zenstorageserver.h" +#include "hub/zenhubserver.h" + #if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> # include <zenutil/windows/windowsservice.h> @@ -273,6 +275,8 @@ main(int argc, char* argv[]) exit(5); #endif break; + case kHub: + return AppMain<ZenHubServerMain>(argc, argv); default: case kStore: return AppMain<ZenStorageServerMain>(argc, argv); diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua index fb65fa949..6ee80dc62 100644 --- a/src/zenserver/xmake.lua +++ b/src/zenserver/xmake.lua @@ -22,6 +22,7 @@ target("zenserver") add_deps("sol2") add_packages("json11") add_packages("lua") + add_packages("consul") if has_config("zenmimalloc") then add_packages("mimalloc") @@ -55,10 +56,7 @@ target("zenserver") add_ldflags("-framework Security") add_ldflags("-framework SystemConfiguration") end - - add_options("compute") - add_options("exec") - + -- to work around some unfortunate Ctrl-C behaviour on Linux/Mac due to -- our use of setsid() at startup we pass in `--no-detach` to zenserver -- ensure that it recieves signals when the user requests termination @@ -87,17 +85,60 @@ target("zenserver") end end) + after_build(function (target) - if has_config("zensentry") then - local crashpad_handler = "crashpad_handler" - if is_plat("windows") then - crashpad_handler = "crashpad_handler.exe" + local function copy_if_newer(src_file, dst_file, file_description) + if not os.exists(src_file) then + print("Source file '" .. src_file .. "' does not exist, cannot copy " .. file_description) + return end + local should_copy = false + if not os.exists(dst_file) then + should_copy = true + else + local src_size = os.filesize(src_file) + local dst_size = os.filesize(dst_file) + local src_mtime = os.mtime(src_file) + local dst_mtime = os.mtime(dst_file) + + if src_size ~= dst_size or src_mtime > dst_mtime then + should_copy = true + end + end + + if should_copy then + os.cp(src_file, dst_file) + print("Copied '" .. file_description .. "' to output directory") + end + end + + if has_config("zensentry") then local pkg = target:pkg("sentry-native") if pkg then local installdir = pkg:installdir() - os.cp(path.join(installdir, "bin/" .. crashpad_handler), target:targetdir()) - print("Copied " .. crashpad_handler .. " to output directory") + + local crashpad_handler = "crashpad_handler" + if is_plat("windows") then + crashpad_handler = "crashpad_handler.exe" + end + + local crashpad_handler_path = path.join(installdir, "bin/" .. crashpad_handler) + copy_if_newer(crashpad_handler_path, path.join(target:targetdir(), crashpad_handler), crashpad_handler) + + if is_plat("windows") then + local crashpad_wer_path = path.join(installdir, "bin/crashpad_wer.dll") + copy_if_newer(crashpad_wer_path, path.join(target:targetdir(), "crashpad_wer.dll"), "crashpad_wer.dll") + end + end + end + + local consul_pkg = target:pkg("consul") + if consul_pkg then + local installdir = consul_pkg:installdir() + local consul_bin = "consul" + if is_plat("windows") then + consul_bin = "consul.exe" end + copy_if_newer(path.join(installdir, "bin", consul_bin), path.join(target:targetdir(), consul_bin), consul_bin) end end) |