// Copyright Epic Games, Inc. All Rights Reserved. #include "storageserverinstance.h" #include "hydration.h" #include #include #include #include namespace zen { StorageServerInstance::StorageServerInstance(ZenServerEnvironment& RunEnvironment, const Configuration& Config, std::string_view ModuleId) : m_Config(Config) , m_ModuleId(ModuleId) , m_ServerInstance(RunEnvironment, ZenServerInstance::ServerMode::kStorageServer) { } StorageServerInstance::~StorageServerInstance() { } void StorageServerInstance::SpawnServerProcess() { ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); m_ServerInstance.ResetDeadProcess(); m_ServerInstance.SetServerExecutablePath(GetRunningExecutablePath()); m_ServerInstance.SetDataDir(m_Config.StateDir); #if ZEN_PLATFORM_WINDOWS m_ServerInstance.SetJobObject(m_JobObject); #endif ExtendableStringBuilder<256> AdditionalOptions; AdditionalOptions << "--allow-port-probing=false"; if (m_Config.HttpThreadCount != 0) { AdditionalOptions << " --http-threads=" << m_Config.HttpThreadCount; } if (m_Config.CoreLimit != 0) { AdditionalOptions << " --corelimit=" << m_Config.CoreLimit; } if (!m_Config.ConfigPath.empty()) { AdditionalOptions << " --config=\"" << MakeSafeAbsolutePath(m_Config.ConfigPath).string() << "\""; } if (!m_Config.Malloc.empty()) { AdditionalOptions << " --malloc=" << m_Config.Malloc; } if (!m_Config.Trace.empty()) { AdditionalOptions << " --trace=" << m_Config.Trace; } if (!m_Config.TraceHost.empty()) { AdditionalOptions << " --tracehost=" << m_Config.TraceHost; } if (!m_Config.TraceFile.empty()) { constexpr std::string_view ModuleIdPattern = "{moduleid}"; constexpr std::string_view PortPattern = "{port}"; std::string ResolvedTraceFile = m_Config.TraceFile; for (size_t Pos = ResolvedTraceFile.find(ModuleIdPattern); Pos != std::string::npos; Pos = ResolvedTraceFile.find(ModuleIdPattern, Pos)) { ResolvedTraceFile.replace(Pos, ModuleIdPattern.length(), m_ModuleId); } std::string PortStr = fmt::format("{}", m_Config.BasePort); for (size_t Pos = ResolvedTraceFile.find(PortPattern); Pos != std::string::npos; Pos = ResolvedTraceFile.find(PortPattern, Pos)) { ResolvedTraceFile.replace(Pos, PortPattern.length(), PortStr); } AdditionalOptions << " --tracefile=\"" << ResolvedTraceFile << "\""; } m_ServerInstance.SpawnServerAndWaitUntilReady(m_Config.BasePort, AdditionalOptions.ToView()); ZEN_DEBUG("Storage server instance for module '{}' started, listening on port {}", m_ModuleId, m_Config.BasePort); m_ServerInstance.EnableShutdownOnDestroy(); } ProcessMetrics StorageServerInstance::GetProcessMetrics() const { ProcessMetrics Metrics; if (m_ServerInstance.IsRunning()) { zen::GetProcessMetrics(m_ServerInstance.GetProcessHandle(), Metrics); } return Metrics; } void StorageServerInstance::ProvisionLocked() { if (m_ServerInstance.IsRunning()) { ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId); return; } ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_Config.StateDir); try { Hydrate(); SpawnServerProcess(); } catch (const std::exception& Ex) { ZEN_WARN("Failed spawning server instance for module '{}', at '{}' during provisioning. Reason: {}", m_ModuleId, m_Config.StateDir, Ex.what()); throw; } } void StorageServerInstance::DeprovisionLocked() { if (m_ServerInstance.IsRunning()) { // m_ServerInstance.Shutdown() never throws. m_ServerInstance.Shutdown(); } // Crashed or Hibernated: process already dead; skip Shutdown. // Dehydrate preserves instance state for future re-provisioning. Failure means saved state // may be stale or absent, but the process is already dead so the slot can still be released. // Swallow the exception and proceed with cleanup rather than leaving the module stuck. try { Dehydrate(); } catch (const std::exception& Ex) { ZEN_WARN("Dehydration of module {} failed during deprovisioning, current state not saved. Reason: {}", m_ModuleId, Ex.what()); } } void StorageServerInstance::ObliterateLocked() { if (m_ServerInstance.IsRunning()) { // m_ServerInstance.Shutdown() never throws. m_ServerInstance.Shutdown(); } std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag); std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Obliterate(); } void StorageServerInstance::HibernateLocked() { // Signal server to shut down, but keep data around for later wake if (!m_ServerInstance.IsRunning()) { return; } // m_ServerInstance.Shutdown() never throws. m_ServerInstance.Shutdown(); } void StorageServerInstance::WakeLocked() { // Start server in-place using existing data if (m_ServerInstance.IsRunning()) { return; } try { SpawnServerProcess(); } catch (const std::exception& Ex) { ZEN_WARN("Failed spawning server instance for module '{}', at '{}' during waking. Reason: {}", m_ModuleId, m_Config.StateDir, Ex.what()); throw; } } void StorageServerInstance::Hydrate() { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag); std::unique_ptr Hydrator = CreateHydrator(Config); m_HydrationState = Hydrator->Hydrate(); } void StorageServerInstance::Dehydrate() { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag); std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(m_HydrationState); } HydrationConfig StorageServerInstance::MakeHydrationConfig(std::atomic& AbortFlag, std::atomic& PauseFlag) { HydrationConfig Config{.ServerStateDir = m_Config.StateDir, .TempDir = m_Config.TempDir, .ModuleId = m_ModuleId, .TargetSpecification = m_Config.HydrationTargetSpecification, .Options = m_Config.HydrationOptions}; if (m_Config.OptionalWorkerPool) { Config.Threading.emplace( HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalWorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag}); } return Config; } StorageServerInstance::SharedLockedPtr::SharedLockedPtr() : m_Lock(nullptr), m_Instance(nullptr) { } StorageServerInstance::SharedLockedPtr::SharedLockedPtr(RwLock& Lock, StorageServerInstance* Instance, bool Wait) : m_Lock(nullptr) , m_Instance(nullptr) { ZEN_ASSERT(Instance != nullptr); if (Wait) { Lock.AcquireShared(); m_Lock = &Lock; m_Instance = Instance; } else { if (Lock.TryAcquireShared()) { m_Lock = &Lock; m_Instance = Instance; } } } StorageServerInstance::SharedLockedPtr::SharedLockedPtr(SharedLockedPtr&& Rhs) : m_Lock(Rhs.m_Lock), m_Instance(Rhs.m_Instance) { Rhs.m_Lock = nullptr; Rhs.m_Instance = nullptr; } StorageServerInstance::SharedLockedPtr::~SharedLockedPtr() { if (m_Lock != nullptr) { m_Lock->ReleaseShared(); m_Lock = nullptr; } m_Instance = nullptr; } StorageServerInstance::SharedLockedPtr& StorageServerInstance::SharedLockedPtr::operator=(SharedLockedPtr&& Rhs) { if (m_Lock) { m_Lock->ReleaseShared(); m_Lock = nullptr; m_Instance = nullptr; } m_Lock = Rhs.m_Lock; m_Instance = Rhs.m_Instance; Rhs.m_Lock = nullptr; Rhs.m_Instance = nullptr; return *this; } std::string_view StorageServerInstance::SharedLockedPtr::GetModuleId() const { ZEN_ASSERT(m_Instance != nullptr); return m_Instance->m_ModuleId; } bool StorageServerInstance::SharedLockedPtr::IsRunning() const { ZEN_ASSERT(m_Instance != nullptr); return m_Instance->m_ServerInstance.IsRunning(); } #if ZEN_WITH_TESTS void StorageServerInstance::SharedLockedPtr::TerminateForTesting() const { ZEN_ASSERT(m_Instance != nullptr); m_Instance->m_ServerInstance.Terminate(); } #endif StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr() : m_Lock(nullptr), m_Instance(nullptr) { } StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr(RwLock& Lock, StorageServerInstance* Instance, bool Wait) : m_Lock(nullptr) , m_Instance(nullptr) { ZEN_ASSERT(Instance != nullptr); if (Wait) { Lock.AcquireExclusive(); m_Lock = &Lock; m_Instance = Instance; } else { if (Lock.TryAcquireExclusive()) { m_Lock = &Lock; m_Instance = Instance; } } } StorageServerInstance::ExclusiveLockedPtr::ExclusiveLockedPtr(ExclusiveLockedPtr&& Rhs) : m_Lock(Rhs.m_Lock), m_Instance(Rhs.m_Instance) { Rhs.m_Lock = nullptr; Rhs.m_Instance = nullptr; } StorageServerInstance::ExclusiveLockedPtr::~ExclusiveLockedPtr() { if (m_Lock != nullptr) { m_Lock->ReleaseExclusive(); m_Lock = nullptr; } m_Instance = nullptr; } StorageServerInstance::ExclusiveLockedPtr& StorageServerInstance::ExclusiveLockedPtr::operator=(ExclusiveLockedPtr&& Rhs) { if (m_Lock) { m_Lock->ReleaseExclusive(); m_Lock = nullptr; m_Instance = nullptr; } m_Lock = Rhs.m_Lock; m_Instance = Rhs.m_Instance; Rhs.m_Lock = nullptr; Rhs.m_Instance = nullptr; return *this; } std::string_view StorageServerInstance::ExclusiveLockedPtr::GetModuleId() const { ZEN_ASSERT(m_Instance != nullptr); return m_Instance->m_ModuleId; } bool StorageServerInstance::ExclusiveLockedPtr::IsRunning() const { ZEN_ASSERT(m_Instance != nullptr); return m_Instance->m_ServerInstance.IsRunning(); } void StorageServerInstance::ExclusiveLockedPtr::Provision() { ZEN_ASSERT(m_Instance != nullptr); m_Instance->ProvisionLocked(); } void StorageServerInstance::ExclusiveLockedPtr::Deprovision() { ZEN_ASSERT(m_Instance != nullptr); m_Instance->DeprovisionLocked(); } void StorageServerInstance::ExclusiveLockedPtr::Obliterate() { ZEN_ASSERT(m_Instance != nullptr); m_Instance->ObliterateLocked(); } void StorageServerInstance::ExclusiveLockedPtr::Hibernate() { ZEN_ASSERT(m_Instance != nullptr); m_Instance->HibernateLocked(); } void StorageServerInstance::ExclusiveLockedPtr::Wake() { ZEN_ASSERT(m_Instance != nullptr); m_Instance->WakeLocked(); } } // namespace zen