// Copyright Epic Games, Inc. All Rights Reserved. #include "zenserver.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_PLATFORM_WINDOWS # include #endif #if ZEN_PLATFORM_LINUX # include #endif #if ZEN_PLATFORM_MAC # include #endif ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #include #include ////////////////////////////////////////////////////////////////////////// #include "config.h" #include "diag/logging.h" #include namespace zen { static const FLLMTag& GetZenserverTag() { static FLLMTag _("zenserver"); return _; } namespace utils { extern std::atomic_uint32_t SignalCounter[NSIG]; } using namespace std::literals; namespace utils { asio::error_code ResolveHostname(asio::io_context& Ctx, std::string_view Host, std::string_view DefaultPort, std::vector& OutEndpoints) { std::string_view Port = DefaultPort; if (const size_t Idx = Host.find(":"); Idx != std::string_view::npos) { Port = Host.substr(Idx + 1); Host = Host.substr(0, Idx); } asio::ip::tcp::resolver Resolver(Ctx); asio::error_code ErrorCode; asio::ip::tcp::resolver::results_type Endpoints = Resolver.resolve(Host, Port, ErrorCode); if (!ErrorCode) { for (const asio::ip::tcp::endpoint Ep : Endpoints) { OutEndpoints.push_back(fmt::format("http://{}:{}", Ep.address().to_string(), Ep.port())); } } return ErrorCode; } } // namespace utils ////////////////////////////////////////////////////////////////////////// ZenServer::ZenServer() { } ZenServer::~ZenServer() { } void ZenServer::OnReady() { m_ServerEntry->SignalReady(); if (m_IsReadyFunc) { m_IsReadyFunc(); } } int ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry) { ZEN_MEMSCOPE(GetZenserverTag()); const std::string MutexName = fmt::format("zen_{}", ServerOptions.BasePort); if (NamedMutex::Exists(MutexName)) { ZEN_WARN("Mutex '{}' already exists - is another instance already running?", MutexName); return -1; } m_UseSentry = ServerOptions.NoSentry == false; m_ServerEntry = ServerEntry; m_DebugOptionForcedCrash = ServerOptions.ShouldCrash; m_IsPowerCycle = ServerOptions.IsPowerCycle; const int ParentPid = ServerOptions.OwnerPid; m_StartupScrubOptions = ServerOptions.ScrubOptions; if (ParentPid) { std::error_code Ec; ProcessHandle OwnerProcess; OwnerProcess.Initialize(ParentPid, /* out */ Ec); if (!OwnerProcess.IsValid()) { ZEN_WARN("Unable to initialize process handle for specified parent pid #{}. Reason: '{}'", ParentPid, Ec.message()); // If the pid is not reachable should we just shut down immediately? the intended owner process // could have been killed or somehow crashed already } else { ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid); } m_ProcessMonitor.AddPid(ParentPid); } // Initialize/check mutex based on base port if (m_ServerMutex.Create(MutexName) == false) { ThrowLastError(fmt::format("Failed to create mutex '{}'", MutexName).c_str()); } InitializeState(ServerOptions); m_JobQueue = MakeJobQueue(8, "backgroundjobs"); m_HealthService.SetHealthInfo({.DataRoot = m_DataRoot, .AbsLogPath = ServerOptions.AbsLogFile, .HttpServerClass = std::string(ServerOptions.HttpServerConfig.ServerClass), .BuildVersion = std::string(ZEN_CFG_VERSION_BUILD_STRING_FULL)}); // Ok so now we're configured, let's kick things off m_Http = CreateHttpServer(ServerOptions.HttpServerConfig); int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort, ServerOptions.DataDir); ZEN_ASSERT(EffectiveBasePort > 0); // Setup authentication manager { std::string EncryptionKey = ServerOptions.EncryptionKey; if (EncryptionKey.empty()) { EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456"; ZEN_WARN("using default encryption key"); } std::string EncryptionIV = ServerOptions.EncryptionIV; if (EncryptionIV.empty()) { EncryptionIV = "0123456789abcdef"; ZEN_WARN("using default encryption initialization vector"); } m_AuthMgr = AuthMgr::Create({.RootDirectory = m_DataRoot / "auth", .EncryptionKey = AesKey256Bit::FromString(EncryptionKey), .EncryptionIV = AesIV128Bit::FromString(EncryptionIV)}); for (const ZenOpenIdProviderConfig& OpenIdProvider : ServerOptions.AuthConfig.OpenIdProviders) { m_AuthMgr->AddOpenIdProvider({.Name = OpenIdProvider.Name, .Url = OpenIdProvider.Url, .ClientId = OpenIdProvider.ClientId}); } } m_AuthService = std::make_unique(*m_AuthMgr); m_Http->RegisterService(*m_AuthService); m_Http->RegisterService(m_HealthService); m_Http->RegisterService(m_StatsService); m_StatsReporter.Initialize(ServerOptions.StatsConfig); if (ServerOptions.StatsConfig.Enabled) { EnqueueStatsReportingTimer(); } m_Http->RegisterService(m_StatusService); m_StatusService.RegisterHandler("status", *this); // Initialize storage and services ZEN_INFO("initializing storage"); CidStoreConfiguration Config; Config.RootDirectory = m_DataRoot / "cas"; m_CidStore = std::make_unique(m_GcManager); m_CidStore->Initialize(Config); ZEN_INFO("instantiating project service"); m_ProjectStore = new ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue, *m_OpenProcessCache, ProjectStore::Configuration{}); m_HttpProjectService.reset(new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatusService, m_StatsService, *m_AuthMgr}); if (ServerOptions.WorksSpacesConfig.Enabled) { m_Workspaces.reset(new Workspaces()); m_HttpWorkspacesService.reset( new HttpWorkspacesService(m_StatusService, m_StatsService, {.SystemRootDir = ServerOptions.SystemRootDir, .AllowConfigurationChanges = ServerOptions.WorksSpacesConfig.AllowConfigurationChanges}, *m_Workspaces)); } if (ServerOptions.BuildStoreConfig.Enabled) { BuildStoreConfig BuildsCfg; BuildsCfg.RootDirectory = m_DataRoot / "builds"; BuildsCfg.MaxDiskSpaceLimit = ServerOptions.BuildStoreConfig.MaxDiskSpaceLimit; m_BuildStore = std::make_unique(std::move(BuildsCfg), m_GcManager); } if (ServerOptions.StructuredCacheConfig.Enabled) { InitializeStructuredCache(ServerOptions); } else { ZEN_INFO("NOT instantiating structured cache service"); } m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics #if ZEN_WITH_TESTS m_Http->RegisterService(m_TestingService); #endif if (m_HttpProjectService) { m_Http->RegisterService(*m_HttpProjectService); } if (m_HttpWorkspacesService) { m_Http->RegisterService(*m_HttpWorkspacesService); } m_FrontendService = std::make_unique(m_ContentRoot, m_StatusService); if (m_FrontendService) { m_Http->RegisterService(*m_FrontendService); } if (ServerOptions.ObjectStoreEnabled) { ObjectStoreConfig ObjCfg; ObjCfg.RootDirectory = m_DataRoot / "obj"; for (const auto& Bucket : ServerOptions.ObjectStoreConfig.Buckets) { ObjectStoreConfig::BucketConfig NewBucket{.Name = Bucket.Name}; NewBucket.Directory = Bucket.Directory.empty() ? (ObjCfg.RootDirectory / Bucket.Name) : Bucket.Directory; ObjCfg.Buckets.push_back(std::move(NewBucket)); } m_ObjStoreService = std::make_unique(m_StatusService, std::move(ObjCfg)); m_Http->RegisterService(*m_ObjStoreService); } if (ServerOptions.BuildStoreConfig.Enabled) { m_BuildStoreService = std::make_unique(m_StatusService, m_StatsService, *m_BuildStore); m_Http->RegisterService(*m_BuildStoreService); } #if ZEN_WITH_VFS m_VfsService = std::make_unique(m_StatusService); m_VfsService->AddService(Ref(m_ProjectStore)); m_VfsService->AddService(Ref(m_CacheStore)); m_Http->RegisterService(*m_VfsService); #endif ZEN_INFO("initializing GC, enabled '{}', interval {}, lightweight interval {}", ServerOptions.GcConfig.Enabled, NiceTimeSpanMs(ServerOptions.GcConfig.IntervalSeconds * 1000ull), NiceTimeSpanMs(ServerOptions.GcConfig.LightweightIntervalSeconds * 1000ull)); GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc", .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), .MaxProjectStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds), .MaxBuildStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.BuildStore.MaxDurationSeconds), .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, .Enabled = ServerOptions.GcConfig.Enabled, .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, .DiskSizeSoftLimit = ServerOptions.GcConfig.DiskSizeSoftLimit, .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites, .LightweightInterval = std::chrono::seconds(ServerOptions.GcConfig.LightweightIntervalSeconds), .UseGCVersion = ServerOptions.GcConfig.UseGCV2 ? GcVersion::kV2 : GcVersion::kV1_Deprecated, .CompactBlockUsageThresholdPercent = ServerOptions.GcConfig.CompactBlockUsageThresholdPercent, .Verbose = ServerOptions.GcConfig.Verbose, .SingleThreaded = ServerOptions.GcConfig.SingleThreaded, .AttachmentPassCount = ServerOptions.GcConfig.AttachmentPassCount}; m_GcScheduler.Initialize(GcConfig); // Create and register admin interface last to make sure all is properly initialized m_AdminService = std::make_unique(m_GcScheduler, *m_JobQueue, m_CacheStore.Get(), m_CidStore.get(), m_ProjectStore, m_BuildStore.get(), HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile, .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log", .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"}, ServerOptions); m_Http->RegisterService(*m_AdminService); return EffectiveBasePort; } void ZenServer::InitializeState(const ZenServerOptions& ServerOptions) { EnqueueSigIntTimer(); // Check root manifest to deal with schema versioning bool WipeState = false; std::string WipeReason = "Unspecified"; if (ServerOptions.IsCleanStart) { WipeState = true; WipeReason = "clean start requested"; } bool UpdateManifest = false; std::filesystem::path ManifestPath = m_DataRoot / "root_manifest"; Oid StateId = Oid::Zero; DateTime CreatedWhen{0}; if (!WipeState) { FileContents ManifestData = ReadFile(ManifestPath); if (ManifestData.ErrorCode) { if (ServerOptions.IsFirstRun) { ZEN_INFO("Initializing state at '{}'", m_DataRoot); UpdateManifest = true; } else { WipeState = true; WipeReason = fmt::format("No manifest present at '{}'", ManifestPath); } } else { IoBuffer Manifest = ManifestData.Flatten(); if (CbValidateError ValidationResult = ValidateCompactBinary(Manifest, CbValidateMode::All); ValidationResult != CbValidateError::None) { ZEN_WARN("Manifest validation failed: {}, state will be wiped", uint32_t(ValidationResult)); WipeState = true; WipeReason = fmt::format("Validation of manifest at '{}' failed: {}", ManifestPath, uint32_t(ValidationResult)); } else { m_RootManifest = LoadCompactBinaryObject(Manifest); const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0); StateId = m_RootManifest["state_id"].AsObjectId(); CreatedWhen = m_RootManifest["created"].AsDateTime(); if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION) { std::filesystem::path ManifestSkipSchemaChangePath = m_DataRoot / "root_manifest.ignore_schema_mismatch"; if (ManifestVersion != 0 && IsFile(ManifestSkipSchemaChangePath)) { ZEN_INFO( "Schema version {} found in '{}' does not match {}, ignoring mismatch due to existance of '{}' and updating " "schema version", ManifestVersion, ManifestPath, ZEN_CFG_SCHEMA_VERSION, ManifestSkipSchemaChangePath); UpdateManifest = true; } else { WipeState = true; WipeReason = fmt::format("Manifest schema version: {}, differs from required: {}", ManifestVersion, ZEN_CFG_SCHEMA_VERSION); } } } } } if (StateId == Oid::Zero) { StateId = Oid::NewOid(); UpdateManifest = true; } const DateTime Now = DateTime::Now(); if (CreatedWhen.GetTicks() == 0) { CreatedWhen = Now; UpdateManifest = true; } // Handle any state wipe if (WipeState) { ZEN_WARN("Wiping state at '{}' - reason: '{}'", m_DataRoot, WipeReason); std::error_code Ec; for (const std::filesystem::directory_entry& DirEntry : std::filesystem::directory_iterator{m_DataRoot, Ec}) { if (DirEntry.is_directory() && (DirEntry.path().filename() != "logs")) { ZEN_INFO("Deleting '{}'", DirEntry.path()); DeleteDirectories(DirEntry.path(), Ec); if (Ec) { ZEN_WARN("Delete of '{}' returned error: '{}'", DirEntry.path(), Ec.message()); } } } ZEN_INFO("Wiped all directories in data root"); UpdateManifest = true; } // Write manifest { CbObjectWriter Cbo; Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << CreatedWhen << "updated" << Now << "state_id" << StateId; m_RootManifest = Cbo.Save(); if (UpdateManifest) { TemporaryFile::SafeWriteFile(ManifestPath, m_RootManifest.GetBuffer().GetView()); } if (!ServerOptions.IsTest) { try { EmitCentralManifest(ServerOptions.SystemRootDir, StateId, m_RootManifest, ManifestPath); } catch (const std::exception& Ex) { ZEN_WARN("Unable to emit central manifest: ", Ex.what()); } } } // Write state marker { std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker"; static const std::string_view StateMarkerContent = "deleting this file will cause " ZEN_APP_NAME " to exit"sv; WriteFile(StateMarkerPath, IoBuffer(IoBuffer::Wrap, StateMarkerContent.data(), StateMarkerContent.size())); EnqueueStateMarkerTimer(); } EnqueueStateExitFlagTimer(); } void ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) { using namespace std::literals; ZEN_INFO("instantiating structured cache service"); ZenCacheStore::Configuration Config; Config.AllowAutomaticCreationOfNamespaces = true; Config.Logging = {.EnableWriteLog = ServerOptions.StructuredCacheConfig.WriteLogEnabled, .EnableAccessLog = ServerOptions.StructuredCacheConfig.AccessLogEnabled}; for (const auto& It : ServerOptions.StructuredCacheConfig.PerBucketConfigs) { const std::string& BucketName = It.first; const ZenStructuredCacheBucketConfig& ZenBucketConfig = It.second; ZenCacheDiskLayer::BucketConfiguration BucketConfig = {.MaxBlockSize = ZenBucketConfig.MaxBlockSize, .PayloadAlignment = ZenBucketConfig.PayloadAlignment, .MemCacheSizeThreshold = ZenBucketConfig.MemCacheSizeThreshold, .LargeObjectThreshold = ZenBucketConfig.LargeObjectThreshold}; Config.NamespaceConfig.DiskLayerConfig.BucketConfigMap.insert_or_assign(BucketName, BucketConfig); } Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MaxBlockSize = ServerOptions.StructuredCacheConfig.BucketConfig.MaxBlockSize, Config.NamespaceConfig.DiskLayerConfig.BucketConfig.PayloadAlignment = ServerOptions.StructuredCacheConfig.BucketConfig.PayloadAlignment, Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold = ServerOptions.StructuredCacheConfig.BucketConfig.MemCacheSizeThreshold, Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LargeObjectThreshold = ServerOptions.StructuredCacheConfig.BucketConfig.LargeObjectThreshold, Config.NamespaceConfig.DiskLayerConfig.MemCacheTargetFootprintBytes = ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes; Config.NamespaceConfig.DiskLayerConfig.MemCacheTrimIntervalSeconds = ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds; Config.NamespaceConfig.DiskLayerConfig.MemCacheMaxAgeSeconds = ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds; if (ServerOptions.IsDedicated) { Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LargeObjectThreshold = 128 * 1024 * 1024; } m_CacheStore = new ZenCacheStore(m_GcManager, *m_JobQueue, m_DataRoot / "cache", Config, m_GcManager.GetDiskWriteBlocker()); m_OpenProcessCache = std::make_unique(); const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; UpstreamCacheOptions UpstreamOptions; UpstreamOptions.ReadUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; UpstreamOptions.WriteUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; if (UpstreamConfig.UpstreamThreadCount < 32) { UpstreamOptions.ThreadCount = static_cast(UpstreamConfig.UpstreamThreadCount); } m_UpstreamCache = CreateUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); m_UpstreamService = std::make_unique(*m_UpstreamCache, *m_AuthMgr); m_UpstreamCache->Initialize(); if (ServerOptions.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) { // Zen upstream { std::vector ZenUrls = UpstreamConfig.ZenConfig.Urls; if (!UpstreamConfig.ZenConfig.Dns.empty()) { for (const std::string& Dns : UpstreamConfig.ZenConfig.Dns) { if (!Dns.empty()) { const asio::error_code Err = utils::ResolveHostname(m_IoContext, Dns, "8558"sv, ZenUrls); if (Err) { ZEN_ERROR("resolve of '{}' FAILED, reason '{}'", Dns, Err.message()); } } } } std::erase_if(ZenUrls, [](const auto& Url) { return Url.empty(); }); if (!ZenUrls.empty()) { const auto ZenEndpointName = UpstreamConfig.ZenConfig.Name.empty() ? "Zen"sv : UpstreamConfig.ZenConfig.Name; std::unique_ptr ZenEndpoint = UpstreamEndpoint::CreateZenEndpoint( {.Name = ZenEndpointName, .Urls = ZenUrls, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}); m_UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); } } // Jupiter upstream if (UpstreamConfig.JupiterConfig.Url.empty() == false) { std::string_view EndpointName = UpstreamConfig.JupiterConfig.Name.empty() ? "Jupiter"sv : UpstreamConfig.JupiterConfig.Name; auto Options = JupiterClientOptions{.Name = EndpointName, .ServiceUrl = UpstreamConfig.JupiterConfig.Url, .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace, .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; auto AuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl, .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId, .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret, .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider, .AccessToken = UpstreamConfig.JupiterConfig.AccessToken}; std::unique_ptr JupiterEndpoint = UpstreamEndpoint::CreateJupiterEndpoint(Options, AuthConfig, *m_AuthMgr); m_UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); } } m_StructuredCacheService = std::make_unique(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, *m_UpstreamCache, m_GcManager.GetDiskWriteBlocker(), *m_OpenProcessCache); m_Http->RegisterService(*m_StructuredCacheService); m_Http->RegisterService(*m_UpstreamService); m_StatsReporter.AddProvider(m_CacheStore.Get()); m_StatsReporter.AddProvider(m_CidStore.get()); } void ZenServer::Run() { if (m_ProcessMonitor.IsActive()) { CheckOwnerPid(); } if (!m_TestMode) { ZEN_INFO( "__________ _________ __ \n" "\\____ /____ ____ / _____// |_ ___________ ____ \n" " / // __ \\ / \\ \\_____ \\\\ __\\/ _ \\_ __ \\_/ __ \\ \n" " / /\\ ___/| | \\ / \\| | ( <_> ) | \\/\\ ___/ \n" "/_______ \\___ >___| / /_______ /|__| \\____/|__| \\___ >\n" " \\/ \\/ \\/ \\/ \\/ \n"); } ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", GetCurrentProcessId()); #if ZEN_PLATFORM_WINDOWS if (zen::windows::IsRunningOnWine()) { ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well"); } #endif #if ZEN_USE_SENTRY ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); if (m_UseSentry) { SentryIntegration::ClearCaches(); } #endif if (m_DebugOptionForcedCrash) { ZEN_DEBUG_BREAK(); } const bool IsInteractiveMode = IsInteractiveSession() && !m_TestMode; SetNewState(kRunning); OnReady(); if (!m_StartupScrubOptions.empty()) { using namespace std::literals; ZEN_INFO("triggering scrub with settings: '{}'", m_StartupScrubOptions); bool DoScrub = true; bool DoWait = false; GcScheduler::TriggerScrubParams ScrubParams; ForEachStrTok(m_StartupScrubOptions, ',', [&](std::string_view Token) { if (Token == "nocas"sv) { ScrubParams.SkipCas = true; } else if (Token == "nodelete"sv) { ScrubParams.SkipDelete = true; } else if (Token == "nogc"sv) { ScrubParams.SkipGc = true; } else if (Token == "no"sv) { DoScrub = false; } else if (Token == "wait"sv) { DoWait = true; } return true; }); if (DoScrub) { m_GcScheduler.TriggerScrub(ScrubParams); if (DoWait) { auto State = m_GcScheduler.Status(); while ((State != GcSchedulerStatus::kRunning) && (State != GcSchedulerStatus::kStopped)) { Sleep(500); State = m_GcScheduler.Status(); } ZEN_INFO("waiting for Scrub/GC to complete..."); while (State == GcSchedulerStatus::kRunning) { Sleep(500); State = m_GcScheduler.Status(); } ZEN_INFO("Scrub/GC completed"); } } } if (m_IsPowerCycle) { ZEN_INFO("Power cycle mode enabled -- shutting down"); RequestExit(0); } m_Http->Run(IsInteractiveMode); SetNewState(kShuttingDown); ZEN_INFO(ZEN_APP_NAME " exiting"); } void ZenServer::RequestExit(int ExitCode) { if (RequestApplicationExit(ExitCode)) { if (m_Http) { m_Http->RequestExit(); } } } void ZenServer::Cleanup() { ZEN_INFO(ZEN_APP_NAME " cleaning up"); try { m_IoContext.stop(); if (m_IoRunner.joinable()) { m_IoRunner.join(); } if (m_Http) { m_Http->Close(); } if (m_JobQueue) { m_JobQueue->Stop(); } m_StatsReporter.Shutdown(); m_GcScheduler.Shutdown(); Flush(); ShutdownWorkerPools(); m_AdminService.reset(); m_VfsService.reset(); m_ObjStoreService.reset(); m_FrontendService.reset(); m_BuildStoreService.reset(); m_BuildStore = {}; m_StructuredCacheService.reset(); m_UpstreamService.reset(); m_UpstreamCache.reset(); m_CacheStore = {}; m_OpenProcessCache.reset(); m_HttpWorkspacesService.reset(); m_Workspaces.reset(); m_HttpProjectService.reset(); m_ProjectStore = {}; m_CidStore.reset(); m_AuthService.reset(); m_AuthMgr.reset(); m_Http = {}; m_JobQueue.reset(); } catch (const std::exception& Ex) { ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); } } void ZenServer::EnsureIoRunner() { ZEN_MEMSCOPE(GetZenserverTag()); if (!m_IoRunner.joinable()) { m_IoRunner = std::thread{[this] { SetCurrentThreadName("timer_io"); m_IoContext.run(); }}; } } void ZenServer::EnqueueProcessMonitorTimer() { ZEN_MEMSCOPE(GetZenserverTag()); m_PidCheckTimer.expires_after(std::chrono::seconds(1)); m_PidCheckTimer.async_wait([this](const asio::error_code&) { CheckOwnerPid(); }); EnsureIoRunner(); } void ZenServer::EnqueueStateMarkerTimer() { ZEN_MEMSCOPE(GetZenserverTag()); m_StateMakerTimer.expires_after(std::chrono::seconds(5)); m_StateMakerTimer.async_wait([this](const asio::error_code&) { CheckStateMarker(); }); EnsureIoRunner(); } void ZenServer::EnqueueSigIntTimer() { ZEN_MEMSCOPE(GetZenserverTag()); m_SigIntTimer.expires_after(std::chrono::milliseconds(500)); m_SigIntTimer.async_wait([this](const asio::error_code&) { CheckSigInt(); }); EnsureIoRunner(); } void ZenServer::EnqueueStateExitFlagTimer() { ZEN_MEMSCOPE(GetZenserverTag()); m_StateExitFlagTimer.expires_after(std::chrono::milliseconds(500)); m_StateExitFlagTimer.async_wait([this](const asio::error_code&) { CheckStateExitFlag(); }); EnsureIoRunner(); } void ZenServer::EnqueueStatsReportingTimer() { ZEN_MEMSCOPE(GetZenserverTag()); m_StatsReportingTimer.expires_after(std::chrono::milliseconds(500)); m_StatsReportingTimer.async_wait([this](const asio::error_code& Ec) { if (!Ec) { m_StatsReporter.ReportStats(); EnqueueStatsReportingTimer(); } }); EnsureIoRunner(); } void ZenServer::CheckStateMarker() { ZEN_MEMSCOPE(GetZenserverTag()); std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker"; try { if (!IsFile(StateMarkerPath)) { ZEN_WARN("state marker at {} has been deleted, exiting", StateMarkerPath); RequestExit(1); return; } } catch (const std::exception& Ex) { ZEN_WARN("state marker at {} could not be checked, reason: '{}'", StateMarkerPath, Ex.what()); RequestExit(1); return; } EnqueueStateMarkerTimer(); } void ZenServer::CheckSigInt() { if (utils::SignalCounter[SIGINT] > 0) { ZEN_INFO("SIGINT triggered (Ctrl+C) for process {}, exiting", zen::GetCurrentProcessId()); RequestExit(128 + SIGINT); return; } if (utils::SignalCounter[SIGTERM] > 0) { ZEN_INFO("SIGTERM triggered for process {}, exiting", zen::GetCurrentProcessId()); RequestExit(128 + SIGTERM); return; } EnqueueSigIntTimer(); } void ZenServer::CheckStateExitFlag() { if (m_ServerEntry && m_ServerEntry->IsShutdownRequested()) { RequestExit(0); return; } EnqueueStateExitFlagTimer(); } bool ZenServer::UpdateProcessMonitor() { // Pick up any new "owner" processes std::set AddedPids; for (auto& PidEntry : m_ServerEntry->SponsorPids) { if (uint32_t ThisPid = PidEntry.load(std::memory_order_relaxed)) { if (PidEntry.compare_exchange_strong(ThisPid, 0)) { if (AddedPids.insert(ThisPid).second) { m_ProcessMonitor.AddPid(ThisPid); ZEN_INFO("added process with pid {} as a sponsor process", ThisPid); } } } } return m_ProcessMonitor.IsRunning(); } void ZenServer::CheckOwnerPid() { bool IsRunning = UpdateProcessMonitor(); if (IsRunning) { m_FoundNoActiveSponsors = false; EnqueueProcessMonitorTimer(); } else { // Delay exit one iteration to avoid race conditions where one process detaches // and another attaches if (m_FoundNoActiveSponsors) { ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone"); RequestExit(0); } else { m_FoundNoActiveSponsors = true; EnqueueProcessMonitorTimer(); } } } void ZenServer::ScrubStorage() { Stopwatch Timer; ZEN_INFO("Storage validation STARTING"); WorkerThreadPool ThreadPool{1, "Scrub"}; ScrubContext Ctx{ThreadPool}; m_CidStore->ScrubStorage(Ctx); m_ProjectStore->ScrubStorage(Ctx); m_StructuredCacheService->ScrubStorage(Ctx); const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})", NiceTimeSpanMs(ElapsedTimeMs), NiceBytes(Ctx.ScrubbedBytes()), Ctx.ScrubbedChunks(), NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs)); } void ZenServer::Flush() { if (m_CidStore) m_CidStore->Flush(); if (m_StructuredCacheService) m_StructuredCacheService->Flush(); if (m_ProjectStore) m_ProjectStore->Flush(); } void ZenServer::HandleStatusRequest(HttpServerRequest& Request) { CbObjectWriter Cbo; Cbo << "ok" << true; Cbo << "state" << ToString(m_CurrentState); Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } std::string_view ZenServer::ToString(ServerState Value) { switch (Value) { case kInitializing: return "initializing"sv; case kRunning: return "running"sv; case kShuttingDown: return "shutdown"sv; default: return "unknown"sv; } } #if ZEN_WITH_TESTS void zenserver_forcelinktests() { zen::prj_forcelink(); } #endif } // namespace zen