// Copyright Epic Games, Inc. All Rights Reserved. #include "storageserverinstance.h" #include "hydration.h" #include #include #include #include namespace zen { StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId) : m_Config(Config) , m_ModuleId(ModuleId) , m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer) { m_BaseDir = RunEnvironment.CreateChildDir(ModuleId); m_TempDir = Config.HydrationTempPath / ModuleId; } StorageServerInstance::~StorageServerInstance() { } void StorageServerInstance::SpawnServerProcess() { ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); m_ServerInstance.ResetDeadProcess(); m_ServerInstance.SetServerExecutablePath(GetRunningExecutablePath()); m_ServerInstance.SetDataDir(m_BaseDir); #if ZEN_PLATFORM_WINDOWS m_ServerInstance.SetJobObject(m_JobObject); #endif ExtendableStringBuilder<256> AdditionalOptions; AdditionalOptions << "--allow-port-probing=false"; if (m_Config.HttpThreadCount != 0) { AdditionalOptions << " --http-threads=" << m_Config.HttpThreadCount; } if (m_Config.CoreLimit != 0) { AdditionalOptions << " --corelimit=" << m_Config.CoreLimit; } if (!m_Config.ConfigPath.empty()) { AdditionalOptions << " --config=\"" << MakeSafeAbsolutePath(m_Config.ConfigPath).string() << "\""; } m_ServerInstance.SpawnServerAndWaitUntilReady(m_Config.BasePort, AdditionalOptions.ToView()); ZEN_DEBUG("Storage server instance for module '{}' started, listening on port {}", m_ModuleId, m_Config.BasePort); m_ServerInstance.EnableShutdownOnDestroy(); } void StorageServerInstance::GetProcessMetrics(ProcessMetrics& OutMetrics) const { OutMetrics.MemoryBytes = m_MemoryBytes.load(); OutMetrics.KernelTimeMs = m_KernelTimeMs.load(); OutMetrics.UserTimeMs = m_UserTimeMs.load(); OutMetrics.WorkingSetSize = m_WorkingSetSize.load(); OutMetrics.PeakWorkingSetSize = m_PeakWorkingSetSize.load(); OutMetrics.PagefileUsage = m_PagefileUsage.load(); OutMetrics.PeakPagefileUsage = m_PeakPagefileUsage.load(); } void StorageServerInstance::ProvisionLocked() { if (m_ServerInstance.IsRunning()) { ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId); return; } ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir); try { Hydrate(); SpawnServerProcess(); } catch (const std::exception& Ex) { ZEN_WARN("Failed spawning server instance for module '{}', at '{}' during provisioning. Reason: {}", m_ModuleId, m_BaseDir, Ex.what()); throw; } } void StorageServerInstance::DeprovisionLocked() { if (m_ServerInstance.IsRunning()) { // m_ServerInstance.Shutdown() never throws. m_ServerInstance.Shutdown(); } // Crashed or Hibernated: process already dead; skip Shutdown. // Dehydrate preserves instance state for future re-provisioning. Failure means saved state // may be stale or absent, but the process is already dead so the slot can still be released. // Swallow the exception and proceed with cleanup rather than leaving the module stuck. try { Dehydrate(); } catch (const std::exception& Ex) { ZEN_WARN("Dehydration of module {} failed during deprovisioning, current state not saved. Reason: {}", m_ModuleId, Ex.what()); } } void StorageServerInstance::HibernateLocked() { // Signal server to shut down, but keep data around for later wake if (!m_ServerInstance.IsRunning()) { return; } // m_ServerInstance.Shutdown() never throws. m_ServerInstance.Shutdown(); } void StorageServerInstance::WakeLocked() { // Start server in-place using existing data if (m_ServerInstance.IsRunning()) { return; } try { SpawnServerProcess(); } catch (const std::exception& Ex) { ZEN_WARN("Failed spawning server instance for module '{}', at '{}' during waking. Reason: {}", m_ModuleId, m_BaseDir, Ex.what()); throw; } } void StorageServerInstance::Hydrate() { HydrationConfig Config{.ServerStateDir = m_BaseDir, .TempDir = m_TempDir, .ModuleId = m_ModuleId, .TargetSpecification = m_Config.HydrationTargetSpecification}; std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } void StorageServerInstance::Dehydrate() { HydrationConfig Config{.ServerStateDir = m_BaseDir, .TempDir = m_TempDir, .ModuleId = m_ModuleId, .TargetSpecification = m_Config.HydrationTargetSpecification}; std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(); } StorageServerInstance::SharedLockedPtr::SharedLockedPtr() : m_Lock(nullptr), m_Instance(nullptr) { } StorageServerInstance::SharedLockedPtr::SharedLockedPtr(RwLock& Lock, StorageServerInstance* Instance, bool Wait) : m_Lock(nullptr) , m_Instance(nullptr) { ZEN_ASSERT(Instance != nullptr); if (Wait) { Lock.AcquireShared(); m_Lock = &Lock; m_Instance = Instance; } else { if (Lock.TryAcquireShared()) { m_Lock = &Lock; m_Instance = Instance; } } } StorageServerInstance::SharedLockedPtr::SharedLockedPtr(SharedLockedPtr&& Rhs) : m_Lock(Rhs.m_Lock), m_Instance(Rhs.m_Instance) { Rhs.m_Lock = nullptr; Rhs.m_Instance = nullptr; } StorageServerInstance::SharedLockedPtr::~SharedLockedPtr() { if (m_Lock != nullptr) { m_Lock->ReleaseShared(); m_Lock = nullptr; } m_Instance = nullptr; } StorageServerInstance::SharedLockedPtr& StorageServerInstance::SharedLockedPtr::operator=(SharedLockedPtr&& Rhs) { if (m_Lock) { m_Lock->ReleaseShared(); m_Lock = nullptr; m_Instance = nullptr; } m_Lock = Rhs.m_Lock; m_Instance = Rhs.m_Instance; Rhs.m_Lock = nullptr; Rhs.m_Instance = nullptr; return *this; } std::string_view StorageServerInstance::SharedLockedPtr::GetModuleId() const { ZEN_ASSERT(m_Instance != nullptr); return m_Instance->m_ModuleId; } bool StorageServerInstance::SharedLockedPtr::IsRunning() const { ZEN_ASSERT(m_Instance != nullptr); return m_Instance->m_ServerInstance.IsRunning(); } void StorageServerInstance::UpdateMetricsLocked() { if (m_ServerInstance.IsRunning()) { ProcessMetrics Metrics; zen::GetProcessMetrics(m_ServerInstance.GetProcessHandle(), Metrics); m_MemoryBytes.store(Metrics.MemoryBytes); m_KernelTimeMs.store(Metrics.KernelTimeMs); m_UserTimeMs.store(Metrics.UserTimeMs); m_WorkingSetSize.store(Metrics.WorkingSetSize); m_PeakWorkingSetSize.store(Metrics.PeakWorkingSetSize); m_PagefileUsage.store(Metrics.PagefileUsage); m_PeakPagefileUsage.store(Metrics.PeakPagefileUsage); } // TODO: Resource metrics... } #if ZEN_WITH_TESTS void StorageServerInstance::SharedLockedPtr::TerminateForTesting() const { ZEN_ASSERT(m_Instance != nullptr); m_Instance->m_ServerInstance.Terminate(); } #endif StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr() : m_Lock(nullptr), m_Instance(nullptr) { } StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr(RwLock& Lock, StorageServerInstance* Instance, bool Wait) : m_Lock(nullptr) , m_Instance(nullptr) { ZEN_ASSERT(Instance != nullptr); if (Wait) { Lock.AcquireExclusive(); m_Lock = &Lock; m_Instance = Instance; } else { if (Lock.TryAcquireExclusive()) { m_Lock = &Lock; m_Instance = Instance; } } } StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr(ExclusiveLockedPtr&& Rhs) : m_Lock(Rhs.m_Lock), m_Instance(Rhs.m_Instance) { Rhs.m_Lock = nullptr; Rhs.m_Instance = nullptr; } StorageServerInstance::ExclusiveLockedPtr::~ExclusiveLockedPtr() { if (m_Lock != nullptr) { m_Lock->ReleaseExclusive(); m_Lock = nullptr; } m_Instance = nullptr; } StorageServerInstance::ExclusiveLockedPtr& StorageServerInstance::ExclusiveLockedPtr::operator=(ExclusiveLockedPtr&& Rhs) { if (m_Lock) { m_Lock->ReleaseExclusive(); m_Lock = nullptr; m_Instance = nullptr; } m_Lock = Rhs.m_Lock; m_Instance = Rhs.m_Instance; Rhs.m_Lock = nullptr; Rhs.m_Instance = nullptr; return *this; } std::string_view StorageServerInstance::ExclusiveLockedPtr::GetModuleId() const { ZEN_ASSERT(m_Instance != nullptr); return m_Instance->m_ModuleId; } bool StorageServerInstance::ExclusiveLockedPtr::IsRunning() const { ZEN_ASSERT(m_Instance != nullptr); return m_Instance->m_ServerInstance.IsRunning(); } void StorageServerInstance::ExclusiveLockedPtr::Provision() { ZEN_ASSERT(m_Instance != nullptr); m_Instance->ProvisionLocked(); } void StorageServerInstance::ExclusiveLockedPtr::Deprovision() { ZEN_ASSERT(m_Instance != nullptr); m_Instance->DeprovisionLocked(); } void StorageServerInstance::ExclusiveLockedPtr::Hibernate() { ZEN_ASSERT(m_Instance != nullptr); m_Instance->HibernateLocked(); } void StorageServerInstance::ExclusiveLockedPtr::Wake() { ZEN_ASSERT(m_Instance != nullptr); m_Instance->WakeLocked(); } } // namespace zen