diff options
| author | Stefan Boberg <[email protected]> | 2026-01-21 09:38:16 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-01-21 09:38:16 +0100 |
| commit | e8d162c293fbdf9a40a1369b60b80fa286aceb0f (patch) | |
| tree | 0cdb2a913e5f4d6f2c151f36edba8fd5b2ca4f89 | |
| parent | builds multipart upload (#722) (diff) | |
| download | zen-e8d162c293fbdf9a40a1369b60b80fa286aceb0f.tar.xz zen-e8d162c293fbdf9a40a1369b60b80fa286aceb0f.zip | |
zen hub (#574)
Initial implementation of zenserver "hub" mode. This is an experimental feature.
zenserver can be started in hub mode by specifying `hub` as the first argument to zenserver
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | docs/NOTES.md | 8 | ||||
| -rw-r--r-- | src/zenserver-test/hub-tests.cpp | 252 | ||||
| -rw-r--r-- | src/zenserver/hub/README.md | 28 | ||||
| -rw-r--r-- | src/zenserver/hub/hubservice.cpp | 867 | ||||
| -rw-r--r-- | src/zenserver/hub/hubservice.h | 42 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.cpp | 119 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.h | 40 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 303 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.h | 92 | ||||
| -rw-r--r-- | src/zenserver/main.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/xmake.lua | 61 | ||||
| -rw-r--r-- | thirdparty/xmake.lua | 2 | ||||
| -rw-r--r-- | xmake.lua | 9 |
14 files changed, 1813 insertions, 15 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e87d1557..e1fc4be0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ - `.zen-tmp` - Feature: Added `--exclude-extensions` to `zen upload` and `zen diff` to set the file extensions to exclude. Each extension name is separated with ';' or ',' - Feature: Added `--result-path` option to `zen builds ls` to output structured to a file +- Feature: Added experimental zenserver "hub" mode which is used to manage a set of zenserver instances on a host - Improvement: On Windows, the asio HTTP back-end now uses TransmitFile to send data directly from files instead of using memory mapping. - Improvement: Optimized scavenge lookup performance - Improvement: Enable limit-overwrite behavior by default diff --git a/docs/NOTES.md b/docs/NOTES.md index 6edfdfee5..76476b0ea 100644 --- a/docs/NOTES.md +++ b/docs/NOTES.md @@ -10,3 +10,11 @@ We’ll likely want to *not* use `mimalloc` by default due to memory overheads, `doctest` has some thread local state which can unfortunately end up running after the main thread has exited and torn everything down. When it tries to free memory after main has exited things go bad. Currently this mostly ends up being an issue when running tests in the debugger. Some heuristics have been implemented to try and wait for all threads to exit before continuing shutting down but it does not feel like a proper solution. +# Hub + +## Data Obliteration + +We need to support data obliteration on a module level. This means removing any local state for a given +module id and also any cold data. + +Add ability to register service with Consul via REST API diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp new file mode 100644 index 000000000..42a5dcae4 --- /dev/null +++ b/src/zenserver-test/hub-tests.cpp @@ -0,0 +1,252 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#if ZEN_WITH_TESTS +# include "zenserver-test.h" +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/filesystem.h> +# include <zencore/stream.h> +# include <zencore/string.h> +# include <zencore/fmtutils.h> +# include <zencore/scopeguard.h> +# include <zenhttp/packageformat.h> +# include <zenremotestore/builds/buildstoragecache.h> +# include <zenutil/workerpools.h> +# include <zenutil/zenserverprocess.h> +# include <zenhttp/httpclient.h> +# include <zenutil/consul.h> + +namespace zen::tests::hub { + +using namespace std::literals; + +TEST_SUITE_BEGIN("hub.lifecycle"); + +TEST_CASE("hub.lifecycle.basic") +{ + { + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer); + + const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/hub/"); + + HttpClient::Response Result = Client.Get("status"); + CHECK(Result); + } +} + +TEST_CASE("hub.lifecycle.children") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer); + + const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE(PortNumber != 0); + + SUBCASE("spawn") + { + HttpClient Client(Instance.GetBaseUri() + "/hub/"); + + HttpClient::Response Result = Client.Get("status"); + REQUIRE(Result); + + { + Result = Client.Post("modules/abc/provision"); + REQUIRE(Result); + + CbObject AbcResult = Result.AsObject(); + CHECK(AbcResult["moduleId"].AsString() == "abc"sv); + const uint16_t AbcPort = AbcResult["port"].AsUInt16(0); + CHECK_NE(AbcPort, 0); + + // This should be a fresh instance with no contents + + HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort)); + + Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound); + + Result = AbcClient.Put("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567", + IoBufferBuilder::MakeFromMemory(MakeMemoryView("abcdef"sv))); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); + } + + { + Result = Client.Post("modules/def/provision"); + REQUIRE(Result); + + CbObject DefResult = Result.AsObject(); + CHECK(DefResult["moduleId"].AsString() == "def"sv); + const uint16_t DefPort = DefResult["port"].AsUInt16(0); + REQUIRE_NE(DefPort, 0); + + // This should be a fresh instance with no contents + + HttpClient DefClient(fmt::format("http://localhost:{}", DefPort)); + + Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound); + + Result = DefClient.Put("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567", + IoBufferBuilder::MakeFromMemory(MakeMemoryView("AbcDef"sv))); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); + } + + // this should be rejected because of the invalid module id + Result = Client.Post("modules/!!!!!/provision"); + CHECK(!Result); + + Result = Client.Post("modules/ghi/provision"); + REQUIRE(Result); + + // Tear down instances + + Result = Client.Post("modules/abc/deprovision"); + REQUIRE(Result); + + Result = Client.Post("modules/def/deprovision"); + REQUIRE(Result); + + Result = Client.Post("modules/ghi/deprovision"); + REQUIRE(Result); + + // re-provision to verify that (de)hydration preserved state + { + Result = Client.Post("modules/abc/provision"); + REQUIRE(Result); + + CbObject AbcResult = Result.AsObject(); + CHECK(AbcResult["moduleId"].AsString() == "abc"sv); + const uint16_t AbcPort = AbcResult["port"].AsUInt16(0); + REQUIRE_NE(AbcPort, 0); + + // This should contain the content from the previous run + + HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort)); + + Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + + CHECK_EQ(Result.AsText(), "abcdef"sv); + + Result = AbcClient.Put("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567", + IoBufferBuilder::MakeFromMemory(MakeMemoryView("ghijklmnop"sv))); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); + } + + { + Result = Client.Post("modules/def/provision"); + REQUIRE(Result); + + CbObject DefResult = Result.AsObject(); + CHECK(DefResult["moduleId"].AsString() == "def"sv); + const uint16_t DefPort = DefResult["port"].AsUInt16(0); + REQUIRE_NE(DefPort, 0); + + // This should contain the content from the previous run + + HttpClient DefClient(fmt::format("http://localhost:{}", DefPort)); + + Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + + CHECK_EQ(Result.AsText(), "AbcDef"sv); + + Result = DefClient.Put("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567", + IoBufferBuilder::MakeFromMemory(MakeMemoryView("GhijklmNop"sv))); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); + } + + Result = Client.Post("modules/abc/deprovision"); + REQUIRE(Result); + + Result = Client.Post("modules/def/deprovision"); + REQUIRE(Result); + + // re-provision to verify that (de)hydration preserved state, including + // state which was generated after the very first dehydration + { + Result = Client.Post("modules/abc/provision"); + REQUIRE(Result); + + CbObject AbcResult = Result.AsObject(); + CHECK(AbcResult["moduleId"].AsString() == "abc"sv); + const uint16_t AbcPort = AbcResult["port"].AsUInt16(0); + REQUIRE_NE(AbcPort, 0); + + // This should contain the content from the previous two runs + + HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort)); + + Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + + CHECK_EQ(Result.AsText(), "abcdef"sv); + + Result = AbcClient.Get("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + + CHECK_EQ(Result.AsText(), "ghijklmnop"sv); + } + + { + Result = Client.Post("modules/def/provision"); + REQUIRE(Result); + + CbObject DefResult = Result.AsObject(); + REQUIRE(DefResult["moduleId"].AsString() == "def"sv); + const uint16_t DefPort = DefResult["port"].AsUInt16(0); + REQUIRE_NE(DefPort, 0); + + // This should contain the content from the previous two runs + + HttpClient DefClient(fmt::format("http://localhost:{}", DefPort)); + + Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + + CHECK_EQ(Result.AsText(), "AbcDef"sv); + + Result = DefClient.Get("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + + CHECK_EQ(Result.AsText(), "GhijklmNop"sv); + } + + Result = Client.Post("modules/abc/deprovision"); + REQUIRE(Result); + + Result = Client.Post("modules/def/deprovision"); + REQUIRE(Result); + + // final sanity check that the hub is still responsive + Result = Client.Get("status"); + CHECK(Result); + } +} + +TEST_SUITE_END(); + +TEST_CASE("hub.consul.lifecycle") +{ + zen::consul::ConsulProcess ConsulProc; + ConsulProc.SpawnConsulAgent(); + + zen::consul::ConsulClient Client("http://localhost:8500/"); + Client.SetKeyValue("zen/hub/testkey", "testvalue"); + + std::string RetrievedValue = Client.GetKeyValue("zen/hub/testkey"); + CHECK_EQ(RetrievedValue, "testvalue"); + + Client.DeleteKey("zen/hub/testkey"); + + ConsulProc.StopConsulAgent(); +} + +} // namespace zen::tests::hub +#endif diff --git a/src/zenserver/hub/README.md b/src/zenserver/hub/README.md new file mode 100644 index 000000000..322be3649 --- /dev/null +++ b/src/zenserver/hub/README.md @@ -0,0 +1,28 @@ +# Zen Server Hub + +The Zen Server can act in a "hub" mode. In this mode, the only services offered are the basic health +and diagnostic services alongside an API to provision and deprovision Storage server instances. + +## Generic Server API + +GET `/health` - returns an `OK!` payload when all enabled services are up and responding + +## Hub API + +GET `{moduleid}` - alphanumeric identifier to identify a dataset (typically associated with a content plug-in module) + +GET `/hub/status` - obtain a summary of the currently live instances + +GET `/hub/modules/{moduleid}` - retrieve information about a module + +POST `/hub/modules/{moduleid}/provision` - provision service for module + +POST `/hub/modules/{moduleid}/deprovision` - deprovision service for module + +GET `/hub/stats` - retrieve stats for service + +## Hub Configuration + +The hub service can use Consul to provide status updates + +The hub service can emit telemetry to an Open Telemetry collector diff --git a/src/zenserver/hub/hubservice.cpp b/src/zenserver/hub/hubservice.cpp new file mode 100644 index 000000000..4d9da3a57 --- /dev/null +++ b/src/zenserver/hub/hubservice.cpp @@ -0,0 +1,867 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "hubservice.h" + +#include "hydration.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/system.h> +#include <zenutil/zenserverprocess.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <EASTL/fixed_vector.h> +#include <asio.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <unordered_map> +#include <unordered_set> + +namespace zen { + +/////////////////////////////////////////////////////////////////////////// + +/** + * A timeline of events with sequence IDs and timestamps. Used to + * track significant events for broadcasting to listeners. + */ +class EventTimeline +{ +public: + EventTimeline() { m_Events.reserve(1024); } + + ~EventTimeline() {} + + EventTimeline(const EventTimeline&) = delete; + EventTimeline& operator=(const EventTimeline&) = delete; + + void RecordEvent(std::string_view EventTag, CbObject EventMetadata) + { + const uint64_t SequenceId = m_NextEventId++; + const auto Now = std::chrono::steady_clock::now(); + RwLock::ExclusiveLockScope _(m_Lock); + m_Events.emplace_back(SequenceId, EventTag, Now, std::move(EventMetadata)); + } + + struct EventRecord + { + uint64_t SequenceId; + std::string Tag; + std::chrono::steady_clock::time_point Timestamp; + CbObject EventMetadata; + + EventRecord(uint64_t InSequenceId, + std::string_view InTag, + std::chrono::steady_clock::time_point InTimestamp, + CbObject InEventMetadata = CbObject()) + : SequenceId(InSequenceId) + , Tag(InTag) + , Timestamp(InTimestamp) + , EventMetadata(InEventMetadata) + { + } + }; + + /** + * Iterate over events that have a SequenceId greater than SinceEventId + * + * @param Callback A callable that takes a const EventRecord& + * @param SinceEventId The SequenceId to compare against + */ + void IterateEventsSince(auto&& Callback, uint64_t SinceEventId) + { + // Hold the lock for as short a time as possible + eastl::fixed_vector<EventRecord, 128> EventsToProcess; + m_Lock.WithSharedLock([&] { + for (auto& Event : m_Events) + { + if (Event.SequenceId > SinceEventId) + { + EventsToProcess.push_back(Event); + } + } + }); + + // Now invoke the callback outside the lock + for (auto& Event : EventsToProcess) + { + Callback(Event); + } + } + + /** + * Trim events up to (and including) the given SequenceId. Intended + * to be used for cleaning up events which are not longer interesting. + * + * @param UpToEventId The SequenceId up to which events should be removed + */ + void TrimEventsUpTo(uint64_t UpToEventId) + { + RwLock::ExclusiveLockScope _(m_Lock); + auto It = std::remove_if(m_Events.begin(), m_Events.end(), [UpToEventId](const EventRecord& Event) { + return Event.SequenceId <= UpToEventId; + }); + m_Events.erase(It, m_Events.end()); + } + +private: + std::atomic<uint64_t> m_NextEventId{0}; + + RwLock m_Lock; + std::vector<EventRecord> m_Events; +}; + +////////////////////////////////////////////////////////////////////////// + +struct ResourceMetrics +{ + uint64_t DiskUsageBytes = 0; + uint64_t MemoryUsageBytes = 0; +}; + +/** + * Storage Server Instance + * + * This class manages the lifecycle of a storage server instance, and + * provides functions to query its state. There should be one instance + * per module ID. + */ +struct StorageServerInstance +{ + StorageServerInstance(ZenServerEnvironment& RunEnvironment, + std::string_view ModuleId, + std::filesystem::path FileHydrationPath, + std::filesystem::path HydrationTempPath); + ~StorageServerInstance(); + + void Provision(); + void Deprovision(); + + void Hibernate(); + void Wake(); + + const ResourceMetrics& GetResourceMetrics() const { return m_ResourceMetrics; } + + inline std::string_view GetModuleId() const { return m_ModuleId; } + inline bool IsProvisioned() const { return m_IsProvisioned.load(); } + + inline uint16_t GetBasePort() const { return m_ServerInstance.GetBasePort(); } + +private: + RwLock m_Lock; + std::string m_ModuleId; + std::atomic<bool> m_IsProvisioned{false}; + std::atomic<bool> m_IsHibernated{false}; + ZenServerInstance m_ServerInstance; + std::filesystem::path m_BaseDir; + std::filesystem::path m_TempDir; + std::filesystem::path m_HydrationPath; + ResourceMetrics m_ResourceMetrics; + + void SpawnServerProcess(); + + void Hydrate(); + void Dehydrate(); +}; + +StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, + std::string_view ModuleId, + std::filesystem::path FileHydrationPath, + std::filesystem::path HydrationTempPath) +: m_ModuleId(ModuleId) +, m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer) +, m_HydrationPath(FileHydrationPath) +{ + m_BaseDir = RunEnvironment.CreateChildDir(ModuleId); + m_TempDir = HydrationTempPath / ModuleId; +} + +StorageServerInstance::~StorageServerInstance() +{ +} + +void +StorageServerInstance::SpawnServerProcess() +{ + ZEN_ASSERT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); + + m_ServerInstance.SetServerExecutablePath(GetRunningExecutablePath()); + m_ServerInstance.SetDataDir(m_BaseDir); + const uint16_t BasePort = m_ServerInstance.SpawnServerAndWaitUntilReady(); + + ZEN_DEBUG("Storage server instance for module '{}' started, listening on port {}", m_ModuleId, BasePort); + + m_ServerInstance.EnableShutdownOnDestroy(); +} + +void +StorageServerInstance::Provision() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + if (m_IsProvisioned) + { + ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId); + + return; + } + + if (m_IsHibernated) + { + Wake(); + } + else + { + ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir); + + Hydrate(); + + SpawnServerProcess(); + } + + m_IsProvisioned = true; +} + +void +StorageServerInstance::Deprovision() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + if (!m_IsProvisioned) + { + ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned", m_ModuleId); + + return; + } + + ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId); + + m_ServerInstance.Shutdown(); + + Dehydrate(); + + m_IsProvisioned = false; +} + +void +StorageServerInstance::Hibernate() +{ + // Signal server to shut down, but keep data around for later wake + + RwLock::ExclusiveLockScope _(m_Lock); + + if (!m_IsProvisioned) + { + ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned", m_ModuleId); + + return; + } + + if (m_IsHibernated) + { + ZEN_WARN("Storage server instance for module '{}' is already hibernated", m_ModuleId); + + return; + } + + if (!m_ServerInstance.IsRunning()) + { + ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not running", m_ModuleId); + + // This is an unexpected state. Should consider the instance invalid? + + return; + } + + try + { + m_ServerInstance.Shutdown(); + + m_IsHibernated = true; + m_IsProvisioned = false; + + return; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", m_ModuleId, Ex.what()); + } +} + +void +StorageServerInstance::Wake() +{ + // Start server in-place using existing data + + RwLock::ExclusiveLockScope _(m_Lock); + + if (!m_IsHibernated) + { + ZEN_WARN("Attempted to wake storage server instance for module '{}' which is not hibernated", m_ModuleId); + + return; + } + + ZEN_ASSERT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); + + try + { + SpawnServerProcess(); + m_IsHibernated = false; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", m_ModuleId, Ex.what()); + + // TODO: this instance should be marked as invalid + } +} + +void +StorageServerInstance::Hydrate() +{ + HydrationConfig Config{.ServerStateDir = m_BaseDir, + .TempDir = m_TempDir, + .ModuleId = m_ModuleId, + .TargetSpecification = WideToUtf8(m_HydrationPath.native())}; + + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateFileHydrator(); + + Hydrator->Configure(Config); + Hydrator->Hydrate(); +} + +void +StorageServerInstance::Dehydrate() +{ + HydrationConfig Config{.ServerStateDir = m_BaseDir, + .TempDir = m_TempDir, + .ModuleId = m_ModuleId, + .TargetSpecification = WideToUtf8(m_HydrationPath.native())}; + + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateFileHydrator(); + + Hydrator->Configure(Config); + Hydrator->Dehydrate(); +} + +////////////////////////////////////////////////////////////////////////// + +struct HttpHubService::Impl +{ + Impl(const Impl&) = delete; + Impl& operator=(const Impl&) = delete; + + Impl(); + ~Impl(); + + void Initialize(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir) + { + m_RunEnvironment.InitializeForHub(HubBaseDir, ChildBaseDir); + m_FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); + ZEN_INFO("using file hydration path: '{}'", m_FileHydrationPath); + + m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp"); + ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath); + + // This is necessary to ensure the hub assigns a distinct port range. + // We need to do this primarily because otherwise automated tests will + // fail as the test runner will create processes in the default range. + // We should probably make this configurable or dynamic for maximum + // flexibility, and to allow running multiple hubs on the same host if + // necessary. + m_RunEnvironment.SetNextPortNumber(21000); + } + + void Cleanup() + { + RwLock::ExclusiveLockScope _(m_Lock); + m_Instances.clear(); + } + + struct ProvisionedInstanceInfo + { + std::string BaseUri; + uint16_t Port; + }; + + /** + * Provision a storage server instance for the given module ID. + * + * @param ModuleId The ID of the module to provision. + * @param OutInfo If successful, information about the provisioned instance will be returned here. + * @param OutReason If unsuccessful, the reason will be returned here. + */ + bool Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason) + { + StorageServerInstance* Instance = nullptr; + bool IsNewInstance = false; + { + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_Instances.find(std::string(ModuleId)); It == m_Instances.end()) + { + std::string Reason; + if (!CanProvisionInstance(ModuleId, /* out */ Reason)) + { + ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason); + + OutReason = Reason; + + return false; + } + + IsNewInstance = true; + auto NewInstance = + std::make_unique<StorageServerInstance>(m_RunEnvironment, ModuleId, m_FileHydrationPath, m_HydrationTempPath); + Instance = NewInstance.get(); + m_Instances.emplace(std::string(ModuleId), std::move(NewInstance)); + + ZEN_INFO("Created new storage server instance for module '{}'", ModuleId); + } + else + { + Instance = It->second.get(); + } + + m_ProvisioningModules.emplace(std::string(ModuleId)); + } + + ZEN_ASSERT(Instance != nullptr); + + auto RemoveProvisioningModule = MakeGuard([&] { + RwLock::ExclusiveLockScope _(m_Lock); + m_ProvisioningModules.erase(std::string(ModuleId)); + }); + + // NOTE: this is done while not holding the lock, as provisioning may take time + // and we don't want to block other operations. We track which modules are being + // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision + // those modules while in this state. + + UpdateStats(); + + try + { + Instance->Provision(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); + if (IsNewInstance) + { + // Clean up + RwLock::ExclusiveLockScope _(m_Lock); + m_Instances.erase(std::string(ModuleId)); + } + return false; + } + + OutInfo.Port = Instance->GetBasePort(); + + // TODO: base URI? Would need to know what host name / IP to use + + return true; + } + + /** + * Deprovision a storage server instance for the given module ID. + * + * @param ModuleId The ID of the module to deprovision. + * @param OutReason If unsuccessful, the reason will be returned here. + * @return true if the instance was found and deprovisioned, false otherwise. + */ + bool Deprovision(const std::string& ModuleId, std::string& OutReason) + { + std::unique_ptr<StorageServerInstance> Instance; + + { + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_ProvisioningModules.find(ModuleId); It != m_ProvisioningModules.end()) + { + OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId); + + ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId); + + return false; + } + + if (auto It = m_Instances.find(ModuleId); It == m_Instances.end()) + { + ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); + + // Not found, OutReason should be empty + return false; + } + else + { + Instance = std::move(It->second); + m_Instances.erase(It); + m_DeprovisioningModules.emplace(ModuleId); + } + } + + // The module is deprovisioned outside the lock to avoid blocking other operations. + // + // To ensure that no new provisioning can occur while we're deprovisioning, + // we add the module ID to m_DeprovisioningModules and remove it once + // deprovisioning is complete. + + auto _ = MakeGuard([&] { + RwLock::ExclusiveLockScope _(m_Lock); + m_DeprovisioningModules.erase(ModuleId); + }); + + Instance->Deprovision(); + + return true; + } + + /** + * Find a storage server instance for the given module ID. + * + * Beware that as this returns a raw pointer to the instance, the caller must ensure + * that the instance is not deprovisioned while in use. + * + * @param ModuleId The ID of the module to find. + * @param OutInstance If found, the instance will be returned here. + * @return true if the instance was found, false otherwise. + */ + bool Find(std::string_view ModuleId, StorageServerInstance** OutInstance = nullptr) + { + RwLock::SharedLockScope _(m_Lock); + if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end()) + { + if (OutInstance) + { + *OutInstance = It->second.get(); + } + return true; + } + else if (OutInstance) + { + *OutInstance = nullptr; + } + return false; + } + + /** + * Enumerate all storage server instances. + * + * @param Callback The callback to invoke for each instance. Note that you should + * not do anything heavyweight in the callback as it is invoked while holding + * a shared lock. + */ + void EnumerateModules(auto&& Callback) + { + RwLock::SharedLockScope _(m_Lock); + for (auto& It : m_Instances) + { + Callback(*It.second); + } + } + + int GetInstanceCount() + { + RwLock::SharedLockScope _(m_Lock); + return gsl::narrow_cast<int>(m_Instances.size()); + } + + inline int GetInstanceLimit() { return m_InstanceLimit; } + inline int GetMaxInstanceCount() { return m_MaxInstanceCount; } + +private: + ZenServerEnvironment m_RunEnvironment; + std::filesystem::path m_FileHydrationPath; + std::filesystem::path m_HydrationTempPath; + RwLock m_Lock; + std::unordered_map<std::string, std::unique_ptr<StorageServerInstance>> m_Instances; + std::unordered_set<std::string> m_DeprovisioningModules; + std::unordered_set<std::string> m_ProvisioningModules; + int m_MaxInstanceCount = 0; + void UpdateStats(); + + // Capacity tracking + + int m_InstanceLimit = 1000; + ResourceMetrics m_ResourceLimits; + SystemMetrics m_HostMetrics; + + void UpdateCapacityMetrics(); + bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); +}; + +HttpHubService::Impl::Impl() +{ + m_HostMetrics = zen::GetSystemMetrics(); + m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; + m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024; +} + +HttpHubService::Impl::~Impl() +{ + try + { + ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); + + m_Lock.WithExclusiveLock([this] { + for (auto& [ModuleId, Instance] : m_Instances) + { + Instance->Deprovision(); + } + m_Instances.clear(); + }); + } + catch (const std::exception& e) + { + ZEN_WARN("Exception during hub service shutdown: {}", e.what()); + } +} + +void +HttpHubService::Impl::UpdateCapacityMetrics() +{ + m_HostMetrics = zen::GetSystemMetrics(); + + // Update per-instance metrics +} + +void +HttpHubService::Impl::UpdateStats() +{ + m_Lock.WithSharedLock([this] { m_MaxInstanceCount = Max(m_MaxInstanceCount, gsl::narrow_cast<int>(m_Instances.size())); }); +} + +bool +HttpHubService::Impl::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) +{ + if (m_DeprovisioningModules.find(std::string(ModuleId)) != m_DeprovisioningModules.end()) + { + OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId); + + return false; + } + + if (m_ProvisioningModules.find(std::string(ModuleId)) != m_ProvisioningModules.end()) + { + OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId); + + return false; + } + + if (gsl::narrow_cast<int>(m_Instances.size()) >= m_InstanceLimit) + { + OutReason = fmt::format("instance limit exceeded ({})", m_InstanceLimit); + + return false; + } + + // TODO: handle additional resource metrics + + return true; +} + +/////////////////////////////////////////////////////////////////////////// + +HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir) : m_Impl(std::make_unique<Impl>()) +{ + using namespace std::literals; + + m_Impl->Initialize(HubBaseDir, ChildBaseDir); + + m_Router.AddMatcher("moduleid", [](std::string_view Str) -> bool { + for (const auto C : Str) + { + if (std::isalnum(C) || C == '-') + { + // fine + } + else + { + // not fine + return false; + } + } + + return true; + }); + + m_Router.RegisterRoute( + "status", + [this](HttpRouterRequest& Req) { + CbObjectWriter Obj; + Obj.BeginArray("modules"); + m_Impl->EnumerateModules([&Obj](StorageServerInstance& Instance) { + Obj.BeginObject(); + Obj << "moduleId" << Instance.GetModuleId(); + Obj << "provisioned" << Instance.IsProvisioned(); + Obj.EndObject(); + }); + Obj.EndArray(); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "modules/{moduleid}", + [this](HttpRouterRequest& Req) { + std::string_view ModuleId = Req.GetCapture(1); + + if (Req.ServerRequest().RequestVerb() == HttpVerb::kDelete) + { + HandleModuleDelete(Req.ServerRequest(), ModuleId); + } + else + { + HandleModuleGet(Req.ServerRequest(), ModuleId); + } + }, + HttpVerb::kGet | HttpVerb::kDelete); + + m_Router.RegisterRoute( + "modules/{moduleid}/provision", + [this](HttpRouterRequest& Req) { + std::string_view ModuleId = Req.GetCapture(1); + + std::string FailureReason = "unknown"; + HttpResponseCode ResponseCode = HttpResponseCode::OK; + + try + { + Impl::ProvisionedInstanceInfo Info; + if (m_Impl->Provision(ModuleId, /* out */ Info, /* out */ FailureReason)) + { + CbObjectWriter Obj; + Obj << "moduleId" << ModuleId; + Obj << "baseUri" << Info.BaseUri; + Obj << "port" << Info.Port; + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + + return; + } + else + { + ResponseCode = HttpResponseCode::BadRequest; + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Exception while provisioning module '{}': {}", ModuleId, Ex.what()); + + FailureReason = Ex.what(); + ResponseCode = HttpResponseCode::InternalServerError; + } + + Req.ServerRequest().WriteResponse(ResponseCode, HttpContentType::kText, FailureReason); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "modules/{moduleid}/deprovision", + [this](HttpRouterRequest& Req) { + std::string_view ModuleId = Req.GetCapture(1); + std::string FailureReason = "unknown"; + + try + { + if (!m_Impl->Deprovision(std::string(ModuleId), /* out */ FailureReason)) + { + if (FailureReason.empty()) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound); + } + else + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason); + } + } + + CbObjectWriter Obj; + Obj << "moduleId" << ModuleId; + + return Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Exception while deprovisioning module '{}': {}", ModuleId, Ex.what()); + + FailureReason = Ex.what(); + } + + Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, FailureReason); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "stats", + [this](HttpRouterRequest& Req) { + CbObjectWriter Obj; + Obj << "currentInstanceCount" << m_Impl->GetInstanceCount(); + Obj << "maxInstanceCount" << m_Impl->GetMaxInstanceCount(); + Obj << "instanceLimit" << m_Impl->GetInstanceLimit(); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kGet); +} + +HttpHubService::~HttpHubService() +{ +} + +const char* +HttpHubService::BaseUri() const +{ + return "/hub/"; +} + +void +HttpHubService::SetNotificationEndpoint(std::string_view UpstreamNotificationEndpoint, std::string_view InstanceId) +{ + ZEN_UNUSED(UpstreamNotificationEndpoint, InstanceId); + // TODO: store these for use in notifications, on some interval/criteria which is currently TBD +} + +void +HttpHubService::HandleRequest(zen::HttpServerRequest& Request) +{ + m_Router.HandleRequest(Request); +} + +void +HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId) +{ + StorageServerInstance* Instance = nullptr; + if (!m_Impl->Find(ModuleId, &Instance)) + { + Request.WriteResponse(HttpResponseCode::NotFound); + return; + } + + CbObjectWriter Obj; + Obj << "moduleId" << Instance->GetModuleId(); + Obj << "provisioned" << Instance->IsProvisioned(); + Request.WriteResponse(HttpResponseCode::OK, Obj.Save()); +} + +void +HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId) +{ + StorageServerInstance* Instance = nullptr; + if (!m_Impl->Find(ModuleId, &Instance)) + { + Request.WriteResponse(HttpResponseCode::NotFound); + return; + } + + // TODO: deprovision and nuke all related storage + + CbObjectWriter Obj; + Obj << "moduleId" << Instance->GetModuleId(); + Obj << "provisioned" << Instance->IsProvisioned(); + Request.WriteResponse(HttpResponseCode::OK, Obj.Save()); +} + +} // namespace zen diff --git a/src/zenserver/hub/hubservice.h b/src/zenserver/hub/hubservice.h new file mode 100644 index 000000000..1a5a8c57c --- /dev/null +++ b/src/zenserver/hub/hubservice.h @@ -0,0 +1,42 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhttp/httpserver.h> + +#include "hydration.h" + +namespace zen { + +/** ZenServer Hub Service + * + * Manages a set of storage servers on the behalf of external clients. For + * use in UEFN content worker style scenarios. + * + */ +class HttpHubService : public zen::HttpService +{ +public: + HttpHubService(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir); + ~HttpHubService(); + + HttpHubService(const HttpHubService&) = delete; + HttpHubService& operator=(const HttpHubService&) = delete; + + virtual const char* BaseUri() const override; + virtual void HandleRequest(zen::HttpServerRequest& Request) override; + + void SetNotificationEndpoint(std::string_view UpstreamNotificationEndpoint, std::string_view InstanceId); + +private: + HttpRequestRouter m_Router; + + struct Impl; + + std::unique_ptr<Impl> m_Impl; + + void HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId); + void HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId); +}; + +} // namespace zen diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp new file mode 100644 index 000000000..52c17fe1a --- /dev/null +++ b/src/zenserver/hub/hydration.cpp @@ -0,0 +1,119 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "hydration.h" + +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> + +namespace zen { + +/////////////////////////////////////////////////////////////////////////// + +struct FileHydrator : public HydrationStrategyBase +{ + virtual void Configure(const HydrationConfig& Config) override; + virtual void Hydrate() override; + virtual void Dehydrate() override; + +private: + HydrationConfig m_Config; + std::filesystem::path m_StorageModuleRootDir; +}; + +void +FileHydrator::Configure(const HydrationConfig& Config) +{ + m_Config = Config; + + std::filesystem::path ConfigPath(Utf8ToWide(m_Config.TargetSpecification)); + + if (!std::filesystem::exists(ConfigPath)) + { + throw std::invalid_argument(fmt::format("Target does not exist: '{}'", ConfigPath.string())); + } + + m_StorageModuleRootDir = ConfigPath / m_Config.ModuleId; + + CreateDirectories(m_StorageModuleRootDir); +} + +void +FileHydrator::Hydrate() +{ + ZEN_INFO("Hydrating state from '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir); + + // Ensure target is clean + ZEN_DEBUG("Wiping server state at '{}'", m_Config.ServerStateDir); + const bool ForceRemoveReadOnlyFiles = true; + CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + + bool WipeServerState = false; + + try + { + ZEN_DEBUG("Copying '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir); + CopyTree(m_StorageModuleRootDir, m_Config.ServerStateDir, {.EnableClone = true}); + } + catch (std::exception& Ex) + { + ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_Config.ServerStateDir); + + // We don't do the clean right here to avoid potentially running into double-throws + WipeServerState = true; + } + + if (WipeServerState) + { + ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); + CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + } + + // Note that we leave the storage state intact until next dehydration replaces the content +} + +void +FileHydrator::Dehydrate() +{ + ZEN_INFO("Dehydrating state from '{}' to '{}'", m_Config.ServerStateDir, m_StorageModuleRootDir); + + const std::filesystem::path TargetDir = m_StorageModuleRootDir; + + // Ensure target is clean. This could be replaced with an atomic copy at a later date + // (i.e copy into a temporary directory name and rename it once complete) + + ZEN_DEBUG("Cleaning storage root '{}'", TargetDir); + const bool ForceRemoveReadOnlyFiles = true; + CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles); + + bool CopySuccess = true; + + try + { + ZEN_DEBUG("Copying '{}' to '{}'", m_Config.ServerStateDir, TargetDir); + CopyTree(m_Config.ServerStateDir, TargetDir, {.EnableClone = true}); + } + catch (std::exception& Ex) + { + ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_StorageModuleRootDir); + + // We don't do the clean right here to avoid potentially running into double-throws + CopySuccess = false; + } + + if (!CopySuccess) + { + ZEN_DEBUG("Removing partially copied state from '{}'", TargetDir); + CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles); + } + + ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); + CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); +} + +std::unique_ptr<HydrationStrategyBase> +CreateFileHydrator() +{ + return std::make_unique<FileHydrator>(); +} + +} // namespace zen diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h new file mode 100644 index 000000000..f86f2accf --- /dev/null +++ b/src/zenserver/hub/hydration.h @@ -0,0 +1,40 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zenhubserver.h" + +namespace zen { + +struct HydrationConfig +{ + // Location of server state to hydrate/dehydrate + std::filesystem::path ServerStateDir; + // Temporary directory available for use during hydration/dehydration + std::filesystem::path TempDir; + // Module ID of the server state being hydrated/dehydrated + std::string ModuleId; + // Back-end specific target specification (e.g. S3 bucket, file path, etc) + std::string TargetSpecification; +}; + +/** + * @brief State hydration strategy interface + * + * An instance of this interface is used to perform hydration OR + * dehydration of server state. It's expected to be used only once + * and not reused. + * + */ +struct HydrationStrategyBase +{ + virtual ~HydrationStrategyBase() = default; + + virtual void Dehydrate() = 0; + virtual void Hydrate() = 0; + virtual void Configure(const HydrationConfig& Config) = 0; +}; + +std::unique_ptr<HydrationStrategyBase> CreateFileHydrator(); + +} // namespace zen diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp new file mode 100644 index 000000000..7a4ba951d --- /dev/null +++ b/src/zenserver/hub/zenhubserver.cpp @@ -0,0 +1,303 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenhubserver.h" +#include "hubservice.h" + +#include <zencore/fmtutils.h> +#include <zencore/memory/llm.h> +#include <zencore/memory/memorytrace.h> +#include <zencore/memory/tagtrace.h> +#include <zencore/scopeguard.h> +#include <zencore/sentryintegration.h> +#include <zencore/system.h> +#include <zencore/windows.h> +#include <zenhttp/httpapiservice.h> +#include <zenutil/service.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cxxopts.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +void +ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) +{ + Options.add_option("hub", + "", + "upstream-notification-endpoint", + "Endpoint URL for upstream notifications", + cxxopts::value<std::string>(m_ServerOptions.UpstreamNotificationEndpoint)->default_value(""), + ""); + + Options.add_option("hub", + "", + "instance-id", + "Instance ID for use in notifications", + cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""), + ""); +} + +void +ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) +{ + ZEN_UNUSED(Options); +} + +void +ZenHubServerConfigurator::ApplyOptions(cxxopts::Options& Options) +{ + ZEN_UNUSED(Options); +} + +void +ZenHubServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions) +{ + ZEN_UNUSED(LuaOptions); +} + +void +ZenHubServerConfigurator::ValidateOptions() +{ +} + +/////////////////////////////////////////////////////////////////////////// + +ZenHubServer::ZenHubServer() +{ +} + +ZenHubServer::~ZenHubServer() +{ + Cleanup(); +} + +int +ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry) +{ + ZEN_TRACE_CPU("ZenHubServer::Initialize"); + ZEN_MEMSCOPE(GetZenserverTag()); + + ZEN_INFO(ZEN_APP_NAME " initializing in HUB server mode"); + + const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry); + if (EffectiveBasePort < 0) + { + return EffectiveBasePort; + } + + // This is a workaround to make sure we can have automated tests. Without + // this the ranges for different child zen hub processes could overlap with + // the main test range. + ZenServerEnvironment::SetBaseChildId(1000); + + m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; + + InitializeState(ServerConfig); + InitializeServices(ServerConfig); + RegisterServices(ServerConfig); + + ZenServerBase::Finalize(); + + return EffectiveBasePort; +} + +void +ZenHubServer::Cleanup() +{ + ZEN_TRACE_CPU("ZenStorageServer::Cleanup"); + ZEN_INFO(ZEN_APP_NAME " cleaning up"); + try + { + m_IoContext.stop(); + if (m_IoRunner.joinable()) + { + m_IoRunner.join(); + } + + if (m_Http) + { + m_Http->Close(); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); + } +} + +void +ZenHubServer::InitializeState(const ZenHubServerConfig& ServerConfig) +{ + ZEN_UNUSED(ServerConfig); +} + +void +ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) +{ + ZEN_UNUSED(ServerConfig); + + ZEN_INFO("instantiating API service"); + m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http); + + ZEN_INFO("instantiating hub service"); + m_HubService = std::make_unique<HttpHubService>(ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers"); + m_HubService->SetNotificationEndpoint(ServerConfig.UpstreamNotificationEndpoint, ServerConfig.InstanceId); +} + +void +ZenHubServer::RegisterServices(const ZenHubServerConfig& ServerConfig) +{ + ZEN_UNUSED(ServerConfig); + + if (m_HubService) + { + m_Http->RegisterService(*m_HubService); + } + + if (m_ApiService) + { + m_Http->RegisterService(*m_ApiService); + } +} + +void +ZenHubServer::Run() +{ + if (m_ProcessMonitor.IsActive()) + { + CheckOwnerPid(); + } + + if (!m_TestMode) + { + // clang-format off + ZEN_INFO(R"(__________ ___ ___ ___. )" "\n" + R"(\____ /____ ____ / | \ __ _\_ |__ )" "\n" + R"( / // __ \ / \ / ~ \ | \ __ \ )" "\n" + R"( / /\ ___/| | \ \ Y / | / \_\ \)" "\n" + R"(/_______ \___ >___| / \___|_ /|____/|___ /)" "\n" + R"( \/ \/ \/ \/ \/ )"); + // clang-format on + + ExtendableStringBuilder<256> BuildOptions; + GetBuildOptions(BuildOptions, '\n'); + ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions); + } + + ZEN_INFO(ZEN_APP_NAME " now running as HUB (pid: {})", GetCurrentProcessId()); + +#if ZEN_PLATFORM_WINDOWS + if (zen::windows::IsRunningOnWine()) + { + ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well"); + } +#endif + +#if ZEN_USE_SENTRY + ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); + if (m_UseSentry) + { + SentryIntegration::ClearCaches(); + } +#endif + + if (m_DebugOptionForcedCrash) + { + ZEN_DEBUG_BREAK(); + } + + const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode; + + SetNewState(kRunning); + + OnReady(); + + m_Http->Run(IsInteractiveMode); + + SetNewState(kShuttingDown); + + ZEN_INFO(ZEN_APP_NAME " exiting"); +} + +////////////////////////////////////////////////////////////////////////////////// + +ZenHubServerMain::ZenHubServerMain(ZenHubServerConfig& ServerOptions) : ZenServerMain(ServerOptions), m_ServerOptions(ServerOptions) +{ +} + +void +ZenHubServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) +{ + ZenHubServer Server; + Server.SetDataRoot(m_ServerOptions.DataDir); + Server.SetContentRoot(m_ServerOptions.ContentDir); + Server.SetTestMode(m_ServerOptions.IsTest); + Server.SetDedicatedMode(m_ServerOptions.IsDedicated); + + const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry); + if (EffectiveBasePort == -1) + { + // Server.Initialize has already logged what the issue is - just exit with failure code here. + std::exit(1); + } + + Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); + if (EffectiveBasePort != m_ServerOptions.BasePort) + { + ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); + m_ServerOptions.BasePort = EffectiveBasePort; + } + + std::unique_ptr<std::thread> ShutdownThread; + std::unique_ptr<NamedEvent> ShutdownEvent; + + ExtendableStringBuilder<64> ShutdownEventName; + ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown"; + ShutdownEvent.reset(new NamedEvent{ShutdownEventName}); + + // Monitor shutdown signals + + ShutdownThread.reset(new std::thread{[&] { + SetCurrentThreadName("shutdown_mon"); + + ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId()); + + if (ShutdownEvent->Wait()) + { + ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId()); + Server.RequestExit(0); + } + else + { + ZEN_INFO("shutdown signal wait() failed"); + } + }}); + + auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] { + ReportServiceStatus(ServiceStatus::Stopping); + + if (ShutdownEvent) + { + ShutdownEvent->Set(); + } + if (ShutdownThread && ShutdownThread->joinable()) + { + ShutdownThread->join(); + } + }); + + // If we have a parent process, establish the mechanisms we need + // to be able to communicate readiness with the parent + + Server.SetIsReadyFunc([&] { + std::error_code Ec; + m_LockFile.Update(MakeLockData(true), Ec); + ReportServiceStatus(ServiceStatus::Running); + NotifyReady(); + }); + + Server.Run(); +} + +} // namespace zen diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h new file mode 100644 index 000000000..ac14362f0 --- /dev/null +++ b/src/zenserver/hub/zenhubserver.h @@ -0,0 +1,92 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zenserver.h" + +namespace cxxopts { +class Options; +} +namespace zen::LuaConfig { +struct Options; +} + +namespace zen { + +class HttpApiService; +class HttpHubService; + +struct ZenHubServerConfig : public ZenServerConfig +{ + std::string UpstreamNotificationEndpoint; + std::string InstanceId; // For use in notifications +}; + +struct ZenHubServerConfigurator : public ZenServerConfiguratorBase +{ + ZenHubServerConfigurator(ZenHubServerConfig& ServerOptions) : ZenServerConfiguratorBase(ServerOptions), m_ServerOptions(ServerOptions) + { + } + + ~ZenHubServerConfigurator() = default; + +private: + virtual void AddCliOptions(cxxopts::Options& Options) override; + virtual void AddConfigOptions(LuaConfig::Options& Options) override; + virtual void ApplyOptions(cxxopts::Options& Options) override; + virtual void OnConfigFileParsed(LuaConfig::Options& LuaOptions) override; + virtual void ValidateOptions() override; + + ZenHubServerConfig& m_ServerOptions; +}; + +class ZenHubServerMain : public ZenServerMain +{ +public: + ZenHubServerMain(ZenHubServerConfig& ServerOptions); + virtual void DoRun(ZenServerState::ZenServerEntry* Entry) override; + + ZenHubServerMain(const ZenHubServerMain&) = delete; + ZenHubServerMain& operator=(const ZenHubServerMain&) = delete; + + typedef ZenHubServerConfig Config; + typedef ZenHubServerConfigurator Configurator; + +private: + ZenHubServerConfig& m_ServerOptions; +}; + +class ZenHubServer : public ZenServerBase +{ + ZenHubServer& operator=(ZenHubServer&&) = delete; + ZenHubServer(ZenHubServer&&) = delete; + +public: + ZenHubServer(); + ~ZenHubServer(); + + int Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry); + void Run(); + void Cleanup(); + + void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } + void SetTestMode(bool State) { m_TestMode = State; } + void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } + void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } + +private: + bool m_IsDedicatedMode = false; + bool m_TestMode = false; + std::filesystem::path m_DataRoot; + std::filesystem::path m_ContentRoot; + bool m_DebugOptionForcedCrash = false; + + std::unique_ptr<HttpHubService> m_HubService; + std::unique_ptr<HttpApiService> m_ApiService; + + void InitializeState(const ZenHubServerConfig& ServerConfig); + void InitializeServices(const ZenHubServerConfig& ServerConfig); + void RegisterServices(const ZenHubServerConfig& ServerConfig); +}; + +} // namespace zen diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index 78bce7d06..996f96da8 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -25,6 +25,8 @@ #include "storage/storageconfig.h" #include "storage/zenstorageserver.h" +#include "hub/zenhubserver.h" + #if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> # include <zenutil/windows/windowsservice.h> @@ -273,6 +275,8 @@ main(int argc, char* argv[]) exit(5); #endif break; + case kHub: + return AppMain<ZenHubServerMain>(argc, argv); default: case kStore: return AppMain<ZenStorageServerMain>(argc, argv); diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua index fb65fa949..6ee80dc62 100644 --- a/src/zenserver/xmake.lua +++ b/src/zenserver/xmake.lua @@ -22,6 +22,7 @@ target("zenserver") add_deps("sol2") add_packages("json11") add_packages("lua") + add_packages("consul") if has_config("zenmimalloc") then add_packages("mimalloc") @@ -55,10 +56,7 @@ target("zenserver") add_ldflags("-framework Security") add_ldflags("-framework SystemConfiguration") end - - add_options("compute") - add_options("exec") - + -- to work around some unfortunate Ctrl-C behaviour on Linux/Mac due to -- our use of setsid() at startup we pass in `--no-detach` to zenserver -- ensure that it recieves signals when the user requests termination @@ -87,17 +85,60 @@ target("zenserver") end end) + after_build(function (target) - if has_config("zensentry") then - local crashpad_handler = "crashpad_handler" - if is_plat("windows") then - crashpad_handler = "crashpad_handler.exe" + local function copy_if_newer(src_file, dst_file, file_description) + if not os.exists(src_file) then + print("Source file '" .. src_file .. "' does not exist, cannot copy " .. file_description) + return end + local should_copy = false + if not os.exists(dst_file) then + should_copy = true + else + local src_size = os.filesize(src_file) + local dst_size = os.filesize(dst_file) + local src_mtime = os.mtime(src_file) + local dst_mtime = os.mtime(dst_file) + + if src_size ~= dst_size or src_mtime > dst_mtime then + should_copy = true + end + end + + if should_copy then + os.cp(src_file, dst_file) + print("Copied '" .. file_description .. "' to output directory") + end + end + + if has_config("zensentry") then local pkg = target:pkg("sentry-native") if pkg then local installdir = pkg:installdir() - os.cp(path.join(installdir, "bin/" .. crashpad_handler), target:targetdir()) - print("Copied " .. crashpad_handler .. " to output directory") + + local crashpad_handler = "crashpad_handler" + if is_plat("windows") then + crashpad_handler = "crashpad_handler.exe" + end + + local crashpad_handler_path = path.join(installdir, "bin/" .. crashpad_handler) + copy_if_newer(crashpad_handler_path, path.join(target:targetdir(), crashpad_handler), crashpad_handler) + + if is_plat("windows") then + local crashpad_wer_path = path.join(installdir, "bin/crashpad_wer.dll") + copy_if_newer(crashpad_wer_path, path.join(target:targetdir(), "crashpad_wer.dll"), "crashpad_wer.dll") + end + end + end + + local consul_pkg = target:pkg("consul") + if consul_pkg then + local installdir = consul_pkg:installdir() + local consul_bin = "consul" + if is_plat("windows") then + consul_bin = "consul.exe" end + copy_if_newer(path.join(installdir, "bin", consul_bin), path.join(target:targetdir(), consul_bin), consul_bin) end end) diff --git a/thirdparty/xmake.lua b/thirdparty/xmake.lua index 35bbf87bf..f079d803d 100644 --- a/thirdparty/xmake.lua +++ b/thirdparty/xmake.lua @@ -37,7 +37,7 @@ target('rpmalloc') set_group('thirdparty') set_languages("c17", "cxx20") if is_os("windows") then - add_cflags("/experimental:c11atomics") + add_cflags("/experimental:c11atomics", {force=true}) end add_defines("RPMALLOC_FIRST_CLASS_HEAPS=1", "ENABLE_STATISTICS=1", "ENABLE_OVERRIDE=0") add_files("rpmalloc/rpmalloc.c") @@ -73,6 +73,8 @@ add_requires("zlib", {system = false}) add_defines("EASTL_STD_ITERATOR_CATEGORY_ENABLED", "EASTL_DEPRECATIONS_FOR_2024_APRIL=EA_DISABLED") add_requires("eastl", {system = false}) +add_requires("consul", {system = false}) -- for hub tests + if has_config("zenmimalloc") and not use_asan then add_requires("mimalloc", {system = false}) end @@ -109,7 +111,6 @@ if is_plat("linux") and os.getenv("UE_TOOLCHAIN_DIR") then add_ldflags("$(projectdir)/thirdparty/ue-libcxx/lib64/libc++abi.a") set_toolset("objcopy", "$(env UE_TOOLCHAIN_DIR)/bin/llvm-objcopy") end - if has_config("zensentry") and not use_asan then if is_plat("linux") then add_requires("sentry-native 0.7.6") @@ -119,7 +120,6 @@ if has_config("zensentry") and not use_asan then add_requires("sentry-native 0.7.6", {configs = {backend = "crashpad"}}) end end - --add_rules("c++.unity_build") if is_mode("release") then @@ -168,8 +168,9 @@ if is_os("windows") then add_cxxflags("/experimental:deterministic") -- (more) deterministic compiler output add_ldflags("/PDBALTPATH:%_PDB%") -- deterministic pdb reference in exe - add_cxxflags("/Zc:preprocessor") -- Enable preprocessor conformance mode - add_cxxflags("/Zc:u8EscapeEncoding") -- Enable UTF-8 encoding for u8 string literals + add_cxxflags("/Zc:preprocessor") -- Enable preprocessor conformance mode + add_cxxflags("/Zc:u8EscapeEncoding") -- Enable UTF-8 encoding for u8 string literals + add_cxxflags("/Zc:inline") -- Enforce inline semantics -- add_ldflags("/MAP") end |