// Copyright Epic Games, Inc. All Rights Reserved. #include "storageserverinstance.h" #include "hydration.h" #include #include #include #include namespace zen { StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId) : m_Config(Config) , m_ModuleId(ModuleId) , m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer) { m_BaseDir = RunEnvironment.CreateChildDir(ModuleId); m_TempDir = Config.HydrationTempPath / ModuleId; } StorageServerInstance::~StorageServerInstance() { } void StorageServerInstance::SpawnServerProcess() { ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); m_ServerInstance.SetServerExecutablePath(GetRunningExecutablePath()); m_ServerInstance.SetDataDir(m_BaseDir); #if ZEN_PLATFORM_WINDOWS m_ServerInstance.SetJobObject(m_JobObject); #endif ExtendableStringBuilder<256> AdditionalOptions; AdditionalOptions << "--allow-port-probing=false"; if (m_Config.HttpThreadCount != 0) { AdditionalOptions << " --http-threads=" << m_Config.HttpThreadCount; } if (m_Config.CoreLimit != 0) { AdditionalOptions << " --corelimit=" << m_Config.CoreLimit; } if (!m_Config.ConfigPath.empty()) { AdditionalOptions << " --config=\"" << MakeSafeAbsolutePath(m_Config.ConfigPath).string() << "\""; } m_ServerInstance.SpawnServerAndWaitUntilReady(m_Config.BasePort, AdditionalOptions.ToView()); ZEN_DEBUG("Storage server instance for module '{}' started, listening on port {}", m_ModuleId, m_Config.BasePort); m_ServerInstance.EnableShutdownOnDestroy(); } void StorageServerInstance::GetProcessMetrics(ProcessMetrics& OutMetrics) const { OutMetrics.MemoryBytes = m_MemoryBytes.load(); OutMetrics.KernelTimeMs = m_KernelTimeMs.load(); OutMetrics.UserTimeMs = m_UserTimeMs.load(); OutMetrics.WorkingSetSize = m_WorkingSetSize.load(); OutMetrics.PeakWorkingSetSize = m_PeakWorkingSetSize.load(); OutMetrics.PagefileUsage = m_PagefileUsage.load(); OutMetrics.PeakPagefileUsage = m_PeakPagefileUsage.load(); } void StorageServerInstance::ProvisionLocked() { if (m_State.load() == HubInstanceState::Provisioned) { ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId); return; } if (m_State.load() == HubInstanceState::Hibernated) { if (WakeLocked()) { return; } // Wake failed; proceed with a fresh provision (discards hibernated data) m_State = HubInstanceState::Unprovisioned; } else if (m_State.load() != HubInstanceState::Unprovisioned) { ZEN_WARN("Storage server instance for module '{}' is in unexpected state '{}', cannot provision", m_ModuleId, ToString(m_State.load())); return; } ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir); m_State = HubInstanceState::Provisioning; Hydrate(); SpawnServerProcess(); m_State = HubInstanceState::Provisioned; } void StorageServerInstance::DeprovisionLocked() { if (m_State.load() != HubInstanceState::Provisioned) { ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned (state: '{}')", m_ModuleId, ToString(m_State.load())); return; } ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId); m_State = HubInstanceState::Deprovisioning; m_ServerInstance.Shutdown(); Dehydrate(); m_State = HubInstanceState::Unprovisioned; } void StorageServerInstance::HibernateLocked() { // Signal server to shut down, but keep data around for later wake if (m_State.load() != HubInstanceState::Provisioned) { ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned (state: '{}')", m_ModuleId, ToString(m_State.load())); return; } if (!m_ServerInstance.IsRunning()) { ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not running", m_ModuleId); // This is an unexpected state. Should consider the instance invalid? return; } m_State = HubInstanceState::Hibernating; try { m_ServerInstance.Shutdown(); m_State = HubInstanceState::Hibernated; return; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", m_ModuleId, Ex.what()); m_State = HubInstanceState::Provisioned; // Shutdown failed; instance is still running } } bool StorageServerInstance::WakeLocked() { // Start server in-place using existing data if (m_State.load() != HubInstanceState::Hibernated) { ZEN_WARN("Attempted to wake storage server instance for module '{}' which is not hibernated", m_ModuleId); return true; // Instance is already usable (noop success) } ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); m_State = HubInstanceState::Waking; try { SpawnServerProcess(); m_State = HubInstanceState::Provisioned; return true; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", m_ModuleId, Ex.what()); m_State = HubInstanceState::Hibernated; return false; } } void StorageServerInstance::Hydrate() { HydrationConfig Config{.ServerStateDir = m_BaseDir, .TempDir = m_TempDir, .ModuleId = m_ModuleId, .TargetSpecification = m_Config.HydrationTargetSpecification}; std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } void StorageServerInstance::Dehydrate() { HydrationConfig Config{.ServerStateDir = m_BaseDir, .TempDir = m_TempDir, .ModuleId = m_ModuleId, .TargetSpecification = m_Config.HydrationTargetSpecification}; std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(); } StorageServerInstance::SharedLockedPtr::SharedLockedPtr() : m_Lock(nullptr), m_Instance(nullptr) { } StorageServerInstance::SharedLockedPtr::SharedLockedPtr(RwLock& Lock, StorageServerInstance* Instance, bool Wait) : m_Lock(nullptr) , m_Instance(nullptr) { ZEN_ASSERT(Instance != nullptr); if (Wait) { Lock.AcquireShared(); m_Lock = &Lock; m_Instance = Instance; } else { if (Lock.TryAcquireShared()) { m_Lock = &Lock; m_Instance = Instance; } } } StorageServerInstance::SharedLockedPtr::SharedLockedPtr(SharedLockedPtr&& Rhs) : m_Lock(Rhs.m_Lock), m_Instance(Rhs.m_Instance) { Rhs.m_Lock = nullptr; Rhs.m_Instance = nullptr; } StorageServerInstance::SharedLockedPtr::~SharedLockedPtr() { if (m_Lock != nullptr) { m_Lock->ReleaseShared(); m_Lock = nullptr; } m_Instance = nullptr; } StorageServerInstance::SharedLockedPtr& StorageServerInstance::SharedLockedPtr::operator=(SharedLockedPtr&& Rhs) { if (m_Lock) { m_Lock->ReleaseShared(); m_Lock = nullptr; m_Instance = nullptr; } m_Lock = Rhs.m_Lock; m_Instance = Rhs.m_Instance; Rhs.m_Lock = nullptr; Rhs.m_Instance = nullptr; return *this; } std::string_view StorageServerInstance::SharedLockedPtr::GetModuleId() const { ZEN_ASSERT(m_Instance != nullptr); return m_Instance->m_ModuleId; } bool StorageServerInstance::SharedLockedPtr::IsRunning() const { ZEN_ASSERT(m_Instance != nullptr); return m_Instance->m_State.load() == HubInstanceState::Provisioned && m_Instance->m_ServerInstance.IsRunning(); } void StorageServerInstance::UpdateMetricsLocked() { if (m_State.load() == HubInstanceState::Provisioned) { ProcessMetrics Metrics; zen::GetProcessMetrics(m_ServerInstance.GetProcessHandle(), Metrics); m_MemoryBytes.store(Metrics.MemoryBytes); m_KernelTimeMs.store(Metrics.KernelTimeMs); m_UserTimeMs.store(Metrics.UserTimeMs); m_WorkingSetSize.store(Metrics.WorkingSetSize); m_PeakWorkingSetSize.store(Metrics.PeakWorkingSetSize); m_PagefileUsage.store(Metrics.PagefileUsage); m_PeakPagefileUsage.store(Metrics.PeakPagefileUsage); } // TODO: Resource metrics... } StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr() : m_Lock(nullptr), m_Instance(nullptr) { } StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr(RwLock& Lock, StorageServerInstance* Instance, bool Wait) : m_Lock(nullptr) , m_Instance(nullptr) { ZEN_ASSERT(Instance != nullptr); if (Wait) { Lock.AcquireExclusive(); m_Lock = &Lock; m_Instance = Instance; } else { if (Lock.TryAcquireExclusive()) { m_Lock = &Lock; m_Instance = Instance; } } } StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr(ExclusiveLockedPtr&& Rhs) : m_Lock(Rhs.m_Lock), m_Instance(Rhs.m_Instance) { Rhs.m_Lock = nullptr; Rhs.m_Instance = nullptr; } StorageServerInstance::ExclusiveLockedPtr::~ExclusiveLockedPtr() { if (m_Lock != nullptr) { m_Lock->ReleaseExclusive(); m_Lock = nullptr; } m_Instance = nullptr; } StorageServerInstance::ExclusiveLockedPtr& StorageServerInstance::ExclusiveLockedPtr::operator=(ExclusiveLockedPtr&& Rhs) { if (m_Lock) { m_Lock->ReleaseExclusive(); m_Lock = nullptr; m_Instance = nullptr; } m_Lock = Rhs.m_Lock; m_Instance = Rhs.m_Instance; Rhs.m_Lock = nullptr; Rhs.m_Instance = nullptr; return *this; } std::string_view StorageServerInstance::ExclusiveLockedPtr::GetModuleId() const { ZEN_ASSERT(m_Instance != nullptr); return m_Instance->m_ModuleId; } bool StorageServerInstance::ExclusiveLockedPtr::IsRunning() const { ZEN_ASSERT(m_Instance != nullptr); return m_Instance->m_State.load() == HubInstanceState::Provisioned && m_Instance->m_ServerInstance.IsRunning(); } void StorageServerInstance::ExclusiveLockedPtr::Provision() { ZEN_ASSERT(m_Instance != nullptr); m_Instance->ProvisionLocked(); } void StorageServerInstance::ExclusiveLockedPtr::Deprovision() { ZEN_ASSERT(m_Instance != nullptr); m_Instance->DeprovisionLocked(); } void StorageServerInstance::ExclusiveLockedPtr::Hibernate() { ZEN_ASSERT(m_Instance != nullptr); m_Instance->HibernateLocked(); } bool StorageServerInstance::ExclusiveLockedPtr::Wake() { ZEN_ASSERT(m_Instance != nullptr); return m_Instance->WakeLocked(); } } // namespace zen