// Copyright Epic Games, Inc. All Rights Reserved. #include "hubservice.h" #include "hydration.h" #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #include #include namespace zen { /////////////////////////////////////////////////////////////////////////// /** * A timeline of events with sequence IDs and timestamps. Used to * track significant events for broadcasting to listeners. */ class EventTimeline { public: EventTimeline() { m_Events.reserve(1024); } ~EventTimeline() {} EventTimeline(const EventTimeline&) = delete; EventTimeline& operator=(const EventTimeline&) = delete; void RecordEvent(std::string_view EventTag, CbObject EventMetadata) { const uint64_t SequenceId = m_NextEventId++; const auto Now = std::chrono::steady_clock::now(); RwLock::ExclusiveLockScope _(m_Lock); m_Events.emplace_back(SequenceId, EventTag, Now, std::move(EventMetadata)); } struct EventRecord { uint64_t SequenceId; std::string Tag; std::chrono::steady_clock::time_point Timestamp; CbObject EventMetadata; EventRecord(uint64_t InSequenceId, std::string_view InTag, std::chrono::steady_clock::time_point InTimestamp, CbObject InEventMetadata = CbObject()) : SequenceId(InSequenceId) , Tag(InTag) , Timestamp(InTimestamp) , EventMetadata(InEventMetadata) { } }; /** * Iterate over events that have a SequenceId greater than SinceEventId * * @param Callback A callable that takes a const EventRecord& * @param SinceEventId The SequenceId to compare against */ void IterateEventsSince(auto&& Callback, uint64_t SinceEventId) { // Hold the lock for as short a time as possible eastl::fixed_vector EventsToProcess; m_Lock.WithSharedLock([&] { for (auto& Event : m_Events) { if (Event.SequenceId > SinceEventId) { EventsToProcess.push_back(Event); } } }); // Now invoke the callback outside the lock for (auto& Event : EventsToProcess) { Callback(Event); } } /** * Trim events up to (and including) the given SequenceId. Intended * to be used for cleaning up events which are not longer interesting. * * @param UpToEventId The SequenceId up to which events should be removed */ void TrimEventsUpTo(uint64_t UpToEventId) { RwLock::ExclusiveLockScope _(m_Lock); auto It = std::remove_if(m_Events.begin(), m_Events.end(), [UpToEventId](const EventRecord& Event) { return Event.SequenceId <= UpToEventId; }); m_Events.erase(It, m_Events.end()); } private: std::atomic m_NextEventId{0}; RwLock m_Lock; std::vector m_Events; }; ////////////////////////////////////////////////////////////////////////// struct ResourceMetrics { uint64_t DiskUsageBytes = 0; uint64_t MemoryUsageBytes = 0; }; /** * Storage Server Instance * * This class manages the lifecycle of a storage server instance, and * provides functions to query its state. There should be one instance * per module ID. */ struct StorageServerInstance { StorageServerInstance(ZenServerEnvironment& RunEnvironment, std::string_view ModuleId, std::filesystem::path FileHydrationPath, std::filesystem::path HydrationTempPath); ~StorageServerInstance(); void Provision(); void Deprovision(); void Hibernate(); void Wake(); const ResourceMetrics& GetResourceMetrics() const { return m_ResourceMetrics; } inline std::string_view GetModuleId() const { return m_ModuleId; } inline bool IsProvisioned() const { return m_IsProvisioned.load(); } inline uint16_t GetBasePort() const { return m_ServerInstance.GetBasePort(); } #if ZEN_PLATFORM_WINDOWS void SetJobObject(JobObject* InJobObject) { m_JobObject = InJobObject; } #endif private: void WakeLocked(); RwLock m_Lock; std::string m_ModuleId; std::atomic m_IsProvisioned{false}; std::atomic m_IsHibernated{false}; ZenServerInstance m_ServerInstance; std::filesystem::path m_BaseDir; std::filesystem::path m_TempDir; std::filesystem::path m_HydrationPath; ResourceMetrics m_ResourceMetrics; #if ZEN_PLATFORM_WINDOWS JobObject* m_JobObject = nullptr; #endif void SpawnServerProcess(); void Hydrate(); void Dehydrate(); }; StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, std::string_view ModuleId, std::filesystem::path FileHydrationPath, std::filesystem::path HydrationTempPath) : m_ModuleId(ModuleId) , m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer) , m_HydrationPath(FileHydrationPath) { m_BaseDir = RunEnvironment.CreateChildDir(ModuleId); m_TempDir = HydrationTempPath / ModuleId; } StorageServerInstance::~StorageServerInstance() { } void StorageServerInstance::SpawnServerProcess() { ZEN_ASSERT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); m_ServerInstance.SetServerExecutablePath(GetRunningExecutablePath()); m_ServerInstance.SetDataDir(m_BaseDir); #if ZEN_PLATFORM_WINDOWS m_ServerInstance.SetJobObject(m_JobObject); #endif const uint16_t BasePort = m_ServerInstance.SpawnServerAndWaitUntilReady(); ZEN_DEBUG("Storage server instance for module '{}' started, listening on port {}", m_ModuleId, BasePort); m_ServerInstance.EnableShutdownOnDestroy(); } void StorageServerInstance::Provision() { RwLock::ExclusiveLockScope _(m_Lock); if (m_IsProvisioned) { ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId); return; } if (m_IsHibernated) { WakeLocked(); } else { ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir); Hydrate(); SpawnServerProcess(); } m_IsProvisioned = true; } void StorageServerInstance::Deprovision() { RwLock::ExclusiveLockScope _(m_Lock); if (!m_IsProvisioned) { ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned", m_ModuleId); return; } ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId); m_ServerInstance.Shutdown(); Dehydrate(); m_IsProvisioned = false; } void StorageServerInstance::Hibernate() { // Signal server to shut down, but keep data around for later wake RwLock::ExclusiveLockScope _(m_Lock); if (!m_IsProvisioned) { ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned", m_ModuleId); return; } if (m_IsHibernated) { ZEN_WARN("Storage server instance for module '{}' is already hibernated", m_ModuleId); return; } if (!m_ServerInstance.IsRunning()) { ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not running", m_ModuleId); // This is an unexpected state. Should consider the instance invalid? return; } try { m_ServerInstance.Shutdown(); m_IsHibernated = true; m_IsProvisioned = false; return; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", m_ModuleId, Ex.what()); } } void StorageServerInstance::Wake() { RwLock::ExclusiveLockScope _(m_Lock); WakeLocked(); } void StorageServerInstance::WakeLocked() { // Start server in-place using existing data if (!m_IsHibernated) { ZEN_WARN("Attempted to wake storage server instance for module '{}' which is not hibernated", m_ModuleId); return; } ZEN_ASSERT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); try { SpawnServerProcess(); m_IsHibernated = false; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", m_ModuleId, Ex.what()); // TODO: this instance should be marked as invalid } } void StorageServerInstance::Hydrate() { HydrationConfig Config{.ServerStateDir = m_BaseDir, .TempDir = m_TempDir, .ModuleId = m_ModuleId, .TargetSpecification = WideToUtf8(m_HydrationPath.native())}; std::unique_ptr Hydrator = CreateFileHydrator(); Hydrator->Configure(Config); Hydrator->Hydrate(); } void StorageServerInstance::Dehydrate() { HydrationConfig Config{.ServerStateDir = m_BaseDir, .TempDir = m_TempDir, .ModuleId = m_ModuleId, .TargetSpecification = WideToUtf8(m_HydrationPath.native())}; std::unique_ptr Hydrator = CreateFileHydrator(); Hydrator->Configure(Config); Hydrator->Dehydrate(); } ////////////////////////////////////////////////////////////////////////// struct HttpHubService::Impl { Impl(const Impl&) = delete; Impl& operator=(const Impl&) = delete; Impl(); ~Impl(); void Initialize(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir) { m_RunEnvironment.InitializeForHub(HubBaseDir, ChildBaseDir); m_FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); ZEN_INFO("using file hydration path: '{}'", m_FileHydrationPath); m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp"); ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath); // This is necessary to ensure the hub assigns a distinct port range. // We need to do this primarily because otherwise automated tests will // fail as the test runner will create processes in the default range. // We should probably make this configurable or dynamic for maximum // flexibility, and to allow running multiple hubs on the same host if // necessary. m_RunEnvironment.SetNextPortNumber(21000); #if ZEN_PLATFORM_WINDOWS if (m_UseJobObject) { m_JobObject.Initialize(); if (m_JobObject.IsValid()) { ZEN_INFO("Job object initialized for hub service child process management"); } else { ZEN_WARN("Failed to initialize job object; child processes will not be auto-terminated on hub crash"); } } #endif } void Cleanup() { RwLock::ExclusiveLockScope _(m_Lock); m_Instances.clear(); } struct ProvisionedInstanceInfo { std::string BaseUri; uint16_t Port; }; /** * Provision a storage server instance for the given module ID. * * @param ModuleId The ID of the module to provision. * @param OutInfo If successful, information about the provisioned instance will be returned here. * @param OutReason If unsuccessful, the reason will be returned here. */ bool Provision(std::string_view ModuleId, ProvisionedInstanceInfo& OutInfo, std::string& OutReason) { StorageServerInstance* Instance = nullptr; bool IsNewInstance = false; { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Instances.find(std::string(ModuleId)); It == m_Instances.end()) { std::string Reason; if (!CanProvisionInstance(ModuleId, /* out */ Reason)) { ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason); OutReason = Reason; return false; } IsNewInstance = true; auto NewInstance = std::make_unique(m_RunEnvironment, ModuleId, m_FileHydrationPath, m_HydrationTempPath); #if ZEN_PLATFORM_WINDOWS if (m_JobObject.IsValid()) { NewInstance->SetJobObject(&m_JobObject); } #endif Instance = NewInstance.get(); m_Instances.emplace(std::string(ModuleId), std::move(NewInstance)); ZEN_INFO("Created new storage server instance for module '{}'", ModuleId); } else { Instance = It->second.get(); } m_ProvisioningModules.emplace(std::string(ModuleId)); } ZEN_ASSERT(Instance != nullptr); auto RemoveProvisioningModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_ProvisioningModules.erase(std::string(ModuleId)); }); // NOTE: this is done while not holding the lock, as provisioning may take time // and we don't want to block other operations. We track which modules are being // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision // those modules while in this state. UpdateStats(); try { Instance->Provision(); } catch (const std::exception& Ex) { ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); if (IsNewInstance) { // Clean up RwLock::ExclusiveLockScope _(m_Lock); m_Instances.erase(std::string(ModuleId)); } return false; } OutInfo.Port = Instance->GetBasePort(); // TODO: base URI? Would need to know what host name / IP to use return true; } /** * Deprovision a storage server instance for the given module ID. * * @param ModuleId The ID of the module to deprovision. * @param OutReason If unsuccessful, the reason will be returned here. * @return true if the instance was found and deprovisioned, false otherwise. */ bool Deprovision(const std::string& ModuleId, std::string& OutReason) { std::unique_ptr Instance; { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_ProvisioningModules.find(ModuleId); It != m_ProvisioningModules.end()) { OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId); ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId); return false; } if (auto It = m_Instances.find(ModuleId); It == m_Instances.end()) { ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); // Not found, OutReason should be empty return false; } else { Instance = std::move(It->second); m_Instances.erase(It); m_DeprovisioningModules.emplace(ModuleId); } } // The module is deprovisioned outside the lock to avoid blocking other operations. // // To ensure that no new provisioning can occur while we're deprovisioning, // we add the module ID to m_DeprovisioningModules and remove it once // deprovisioning is complete. auto _ = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_DeprovisioningModules.erase(ModuleId); }); Instance->Deprovision(); return true; } /** * Find a storage server instance for the given module ID. * * Beware that as this returns a raw pointer to the instance, the caller must ensure * that the instance is not deprovisioned while in use. * * @param ModuleId The ID of the module to find. * @param OutInstance If found, the instance will be returned here. * @return true if the instance was found, false otherwise. */ bool Find(std::string_view ModuleId, StorageServerInstance** OutInstance = nullptr) { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Instances.find(std::string(ModuleId)); It != m_Instances.end()) { if (OutInstance) { *OutInstance = It->second.get(); } return true; } else if (OutInstance) { *OutInstance = nullptr; } return false; } /** * Enumerate all storage server instances. * * @param Callback The callback to invoke for each instance. Note that you should * not do anything heavyweight in the callback as it is invoked while holding * a shared lock. */ void EnumerateModules(auto&& Callback) { RwLock::SharedLockScope _(m_Lock); for (auto& It : m_Instances) { Callback(*It.second); } } int GetInstanceCount() { RwLock::SharedLockScope _(m_Lock); return gsl::narrow_cast(m_Instances.size()); } inline int GetInstanceLimit() { return m_InstanceLimit; } inline int GetMaxInstanceCount() { return m_MaxInstanceCount; } bool m_UseJobObject = true; private: ZenServerEnvironment m_RunEnvironment; std::filesystem::path m_FileHydrationPath; std::filesystem::path m_HydrationTempPath; #if ZEN_PLATFORM_WINDOWS JobObject m_JobObject; #endif RwLock m_Lock; std::unordered_map> m_Instances; std::unordered_set m_DeprovisioningModules; std::unordered_set m_ProvisioningModules; int m_MaxInstanceCount = 0; void UpdateStats(); // Capacity tracking int m_InstanceLimit = 1000; ResourceMetrics m_ResourceLimits; SystemMetrics m_HostMetrics; void UpdateCapacityMetrics(); bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); }; HttpHubService::Impl::Impl() { m_HostMetrics = zen::GetSystemMetrics(); m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024; m_ResourceLimits.MemoryUsageBytes = 16ull * 1024 * 1024 * 1024; } HttpHubService::Impl::~Impl() { try { ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); m_Lock.WithExclusiveLock([this] { for (auto& [ModuleId, Instance] : m_Instances) { Instance->Deprovision(); } m_Instances.clear(); }); } catch (const std::exception& e) { ZEN_WARN("Exception during hub service shutdown: {}", e.what()); } } void HttpHubService::Impl::UpdateCapacityMetrics() { m_HostMetrics = zen::GetSystemMetrics(); // Update per-instance metrics } void HttpHubService::Impl::UpdateStats() { m_Lock.WithSharedLock([this] { m_MaxInstanceCount = Max(m_MaxInstanceCount, gsl::narrow_cast(m_Instances.size())); }); } bool HttpHubService::Impl::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) { if (m_DeprovisioningModules.find(std::string(ModuleId)) != m_DeprovisioningModules.end()) { OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId); return false; } if (m_ProvisioningModules.find(std::string(ModuleId)) != m_ProvisioningModules.end()) { OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId); return false; } if (gsl::narrow_cast(m_Instances.size()) >= m_InstanceLimit) { OutReason = fmt::format("instance limit exceeded ({})", m_InstanceLimit); return false; } // TODO: handle additional resource metrics return true; } /////////////////////////////////////////////////////////////////////////// HttpHubService::HttpHubService(std::filesystem::path HubBaseDir, std::filesystem::path ChildBaseDir) : m_Impl(std::make_unique()) { using namespace std::literals; m_Impl->Initialize(HubBaseDir, ChildBaseDir); m_Router.AddMatcher("moduleid", [](std::string_view Str) -> bool { for (const auto C : Str) { if (std::isalnum(C) || C == '-') { // fine } else { // not fine return false; } } return true; }); m_Router.RegisterRoute( "status", [this](HttpRouterRequest& Req) { CbObjectWriter Obj; Obj.BeginArray("modules"); m_Impl->EnumerateModules([&Obj](StorageServerInstance& Instance) { Obj.BeginObject(); Obj << "moduleId" << Instance.GetModuleId(); Obj << "provisioned" << Instance.IsProvisioned(); Obj.EndObject(); }); Obj.EndArray(); Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "modules/{moduleid}", [this](HttpRouterRequest& Req) { std::string_view ModuleId = Req.GetCapture(1); if (Req.ServerRequest().RequestVerb() == HttpVerb::kDelete) { HandleModuleDelete(Req.ServerRequest(), ModuleId); } else { HandleModuleGet(Req.ServerRequest(), ModuleId); } }, HttpVerb::kGet | HttpVerb::kDelete); m_Router.RegisterRoute( "modules/{moduleid}/provision", [this](HttpRouterRequest& Req) { std::string_view ModuleId = Req.GetCapture(1); std::string FailureReason = "unknown"; HttpResponseCode ResponseCode = HttpResponseCode::OK; try { Impl::ProvisionedInstanceInfo Info; if (m_Impl->Provision(ModuleId, /* out */ Info, /* out */ FailureReason)) { CbObjectWriter Obj; Obj << "moduleId" << ModuleId; Obj << "baseUri" << Info.BaseUri; Obj << "port" << Info.Port; Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); return; } else { ResponseCode = HttpResponseCode::BadRequest; } } catch (const std::exception& Ex) { ZEN_ERROR("Exception while provisioning module '{}': {}", ModuleId, Ex.what()); FailureReason = Ex.what(); ResponseCode = HttpResponseCode::InternalServerError; } Req.ServerRequest().WriteResponse(ResponseCode, HttpContentType::kText, FailureReason); }, HttpVerb::kPost); m_Router.RegisterRoute( "modules/{moduleid}/deprovision", [this](HttpRouterRequest& Req) { std::string_view ModuleId = Req.GetCapture(1); std::string FailureReason = "unknown"; try { if (!m_Impl->Deprovision(std::string(ModuleId), /* out */ FailureReason)) { if (FailureReason.empty()) { return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound); } else { return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason); } } CbObjectWriter Obj; Obj << "moduleId" << ModuleId; return Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); } catch (const std::exception& Ex) { ZEN_ERROR("Exception while deprovisioning module '{}': {}", ModuleId, Ex.what()); FailureReason = Ex.what(); } Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, FailureReason); }, HttpVerb::kPost); m_Router.RegisterRoute( "stats", [this](HttpRouterRequest& Req) { CbObjectWriter Obj; Obj << "currentInstanceCount" << m_Impl->GetInstanceCount(); Obj << "maxInstanceCount" << m_Impl->GetMaxInstanceCount(); Obj << "instanceLimit" << m_Impl->GetInstanceLimit(); Req.ServerRequest().WriteResponse(HttpResponseCode::OK); }, HttpVerb::kGet); } HttpHubService::~HttpHubService() { } void HttpHubService::SetUseJobObject(bool Enable) { m_Impl->m_UseJobObject = Enable; } const char* HttpHubService::BaseUri() const { return "/hub/"; } void HttpHubService::SetNotificationEndpoint(std::string_view UpstreamNotificationEndpoint, std::string_view InstanceId) { ZEN_UNUSED(UpstreamNotificationEndpoint, InstanceId); // TODO: store these for use in notifications, on some interval/criteria which is currently TBD } void HttpHubService::HandleRequest(zen::HttpServerRequest& Request) { m_Router.HandleRequest(Request); } void HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId) { StorageServerInstance* Instance = nullptr; if (!m_Impl->Find(ModuleId, &Instance)) { Request.WriteResponse(HttpResponseCode::NotFound); return; } CbObjectWriter Obj; Obj << "moduleId" << Instance->GetModuleId(); Obj << "provisioned" << Instance->IsProvisioned(); Request.WriteResponse(HttpResponseCode::OK, Obj.Save()); } void HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId) { StorageServerInstance* Instance = nullptr; if (!m_Impl->Find(ModuleId, &Instance)) { Request.WriteResponse(HttpResponseCode::NotFound); return; } // TODO: deprovision and nuke all related storage CbObjectWriter Obj; Obj << "moduleId" << Instance->GetModuleId(); Obj << "provisioned" << Instance->IsProvisioned(); Request.WriteResponse(HttpResponseCode::OK, Obj.Save()); } } // namespace zen