// Copyright Epic Games, Inc. All Rights Reserved. #include "zenstorageserver.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_PLATFORM_WINDOWS # include #endif #if ZEN_PLATFORM_LINUX # include #endif #if ZEN_PLATFORM_MAC # include #endif ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #include #include ////////////////////////////////////////////////////////////////////////// #include "diag/logging.h" #include "storageconfig.h" #include namespace zen { namespace utils { asio::error_code ResolveHostname(asio::io_context& Ctx, std::string_view Host, std::string_view DefaultPort, std::vector& OutEndpoints) { std::string_view Port = DefaultPort; if (const size_t Idx = Host.find(":"); Idx != std::string_view::npos) { Port = Host.substr(Idx + 1); Host = Host.substr(0, Idx); } asio::ip::tcp::resolver Resolver(Ctx); asio::error_code ErrorCode; asio::ip::tcp::resolver::results_type Endpoints = Resolver.resolve(Host, Port, ErrorCode); if (!ErrorCode) { for (const asio::ip::tcp::endpoint Ep : Endpoints) { OutEndpoints.push_back(fmt::format("http://{}:{}", Ep.address().to_string(), Ep.port())); } } return ErrorCode; } } // namespace utils using namespace std::literals; ZenStorageServer::ZenStorageServer() { } ZenStorageServer::~ZenStorageServer() { Cleanup(); } int ZenStorageServer::Initialize(const ZenStorageServerConfig& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry) { ZEN_TRACE_CPU("ZenStorageServer::Initialize"); ZEN_MEMSCOPE(GetZenserverTag()); const int EffectiveBasePort = ZenServerBase::Initialize(ServerOptions, ServerEntry); if (EffectiveBasePort < 0) { return EffectiveBasePort; } m_DebugOptionForcedCrash = ServerOptions.ShouldCrash; m_StartupScrubOptions = ServerOptions.ScrubOptions; InitializeState(ServerOptions); InitializeServices(ServerOptions); RegisterServices(); ZenServerBase::Finalize(); return EffectiveBasePort; } void ZenStorageServer::RegisterServices() { m_Http->RegisterService(*m_AuthService); m_Http->RegisterService(m_StatsService); m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics #if ZEN_WITH_TESTS m_Http->RegisterService(m_TestingService); #endif if (m_StructuredCacheService) { m_Http->RegisterService(*m_StructuredCacheService); } if (m_UpstreamService) { m_Http->RegisterService(*m_UpstreamService); } if (m_HttpProjectService) { m_Http->RegisterService(*m_HttpProjectService); } if (m_HttpWorkspacesService) { m_Http->RegisterService(*m_HttpWorkspacesService); } m_FrontendService = std::make_unique(m_ContentRoot, m_StatusService); if (m_FrontendService) { m_Http->RegisterService(*m_FrontendService); } if (m_ObjStoreService) { m_Http->RegisterService(*m_ObjStoreService); } if (m_BuildStoreService) { m_Http->RegisterService(*m_BuildStoreService); } #if ZEN_WITH_VFS m_Http->RegisterService(*m_VfsService); #endif // ZEN_WITH_VFS m_Http->RegisterService(*m_AdminService); #if ZEN_WITH_COMPUTE_SERVICES if (m_HttpFunctionService) { m_Http->RegisterService(*m_HttpFunctionService); } #endif } void ZenStorageServer::InitializeServices(const ZenStorageServerConfig& ServerOptions) { ZEN_OTEL_SPAN("InitializeServices"); InitializeAuthentication(ServerOptions); ZEN_INFO("initializing storage"); CidStoreConfiguration Config; Config.RootDirectory = m_DataRoot / "cas"; m_CidStore = std::make_unique(m_GcManager); m_CidStore->Initialize(Config); ZEN_INFO("instantiating project service"); m_JobQueue = MakeJobQueue(8, "bgjobs"); m_OpenProcessCache = std::make_unique(); m_ProjectStore = new ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, ProjectStore::Configuration{}); m_HttpProjectService.reset( new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatusService, m_StatsService, *m_AuthMgr, *m_OpenProcessCache, *m_JobQueue}); if (ServerOptions.WorksSpacesConfig.Enabled) { ZEN_OTEL_SPAN("InitializeWorkspaces"); m_Workspaces.reset(new Workspaces()); m_HttpWorkspacesService.reset( new HttpWorkspacesService(m_StatusService, m_StatsService, {.SystemRootDir = ServerOptions.SystemRootDir, .AllowConfigurationChanges = ServerOptions.WorksSpacesConfig.AllowConfigurationChanges}, *m_Workspaces)); } if (ServerOptions.BuildStoreConfig.Enabled) { CidStoreConfiguration BuildCidConfig; BuildCidConfig.RootDirectory = m_DataRoot / "builds_cas"; m_BuildCidStore = std::make_unique(m_GcManager); m_BuildCidStore->Initialize(BuildCidConfig); BuildStoreConfig BuildsCfg; BuildsCfg.RootDirectory = m_DataRoot / "builds"; BuildsCfg.MaxDiskSpaceLimit = ServerOptions.BuildStoreConfig.MaxDiskSpaceLimit; m_BuildStore = std::make_unique(std::move(BuildsCfg), m_GcManager, *m_BuildCidStore); } if (ServerOptions.StructuredCacheConfig.Enabled) { InitializeStructuredCache(ServerOptions); } else { ZEN_INFO("NOT instantiating structured cache service"); } if (ServerOptions.ObjectStoreEnabled) { ZEN_OTEL_SPAN("InitializeObjectStore"); ObjectStoreConfig ObjCfg; ObjCfg.RootDirectory = m_DataRoot / "obj"; for (const auto& Bucket : ServerOptions.ObjectStoreConfig.Buckets) { ObjectStoreConfig::BucketConfig NewBucket{.Name = Bucket.Name}; NewBucket.Directory = Bucket.Directory.empty() ? (ObjCfg.RootDirectory / Bucket.Name) : Bucket.Directory; ObjCfg.Buckets.push_back(std::move(NewBucket)); } m_ObjStoreService = std::make_unique(m_StatusService, std::move(ObjCfg)); } if (ServerOptions.BuildStoreConfig.Enabled) { ZEN_OTEL_SPAN("InitializeBuildStore"); m_BuildStoreService = std::make_unique(m_StatusService, m_StatsService, *m_BuildStore); } #if ZEN_WITH_COMPUTE_SERVICES if (ServerOptions.ComputeEnabled) { ZEN_OTEL_SPAN("InitializeComputeService"); m_HttpFunctionService = std::make_unique(*m_CidStore, m_StatsService, ServerOptions.DataDir / "functions"); } #endif #if ZEN_WITH_VFS m_VfsServiceImpl = std::make_unique(); m_VfsServiceImpl->AddService(Ref(m_ProjectStore)); m_VfsServiceImpl->AddService(Ref(m_CacheStore)); m_VfsService = std::make_unique(m_StatusService, m_VfsServiceImpl.get()); #endif // ZEN_WITH_VFS ZEN_INFO("initializing GC, enabled '{}', interval {}, lightweight interval {}", ServerOptions.GcConfig.Enabled, NiceTimeSpanMs(ServerOptions.GcConfig.IntervalSeconds * 1000ull), NiceTimeSpanMs(ServerOptions.GcConfig.LightweightIntervalSeconds * 1000ull)); GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc", .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), .MaxProjectStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds), .MaxBuildStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.BuildStore.MaxDurationSeconds), .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, .Enabled = ServerOptions.GcConfig.Enabled, .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, .DiskSizeSoftLimit = ServerOptions.GcConfig.DiskSizeSoftLimit, .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites, .LightweightInterval = std::chrono::seconds(ServerOptions.GcConfig.LightweightIntervalSeconds), .UseGCVersion = ServerOptions.GcConfig.UseGCV2 ? GcVersion::kV2 : GcVersion::kV1_Deprecated, .CompactBlockUsageThresholdPercent = ServerOptions.GcConfig.CompactBlockUsageThresholdPercent, .Verbose = ServerOptions.GcConfig.Verbose, .SingleThreaded = ServerOptions.GcConfig.SingleThreaded, .AttachmentPassCount = ServerOptions.GcConfig.AttachmentPassCount}; m_GcScheduler.Initialize(GcConfig); // Create and register admin interface last to make sure all is properly initialized m_AdminService = std::make_unique( m_GcScheduler, *m_JobQueue, m_CacheStore.Get(), [this]() { Flush(); }, HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.LoggingConfig.AbsLogFile, .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log", .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"}, ServerOptions); } void ZenStorageServer::InitializeAuthentication(const ZenStorageServerConfig& ServerOptions) { // Setup authentication manager { ZEN_TRACE_CPU("ZenStorageServer::InitAuth"); std::string EncryptionKey = ServerOptions.EncryptionKey; if (EncryptionKey.empty()) { EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456"; if (ServerOptions.IsDedicated) { ZEN_WARN("Using default encryption key for authentication state"); } } std::string EncryptionIV = ServerOptions.EncryptionIV; if (EncryptionIV.empty()) { EncryptionIV = "0123456789abcdef"; if (ServerOptions.IsDedicated) { ZEN_WARN("Using default encryption initialization vector for authentication state"); } } m_AuthMgr = AuthMgr::Create({.RootDirectory = m_DataRoot / "auth", .EncryptionKey = AesKey256Bit::FromString(EncryptionKey), .EncryptionIV = AesIV128Bit::FromString(EncryptionIV)}); for (const ZenOpenIdProviderConfig& OpenIdProvider : ServerOptions.AuthConfig.OpenIdProviders) { m_AuthMgr->AddOpenIdProvider({.Name = OpenIdProvider.Name, .Url = OpenIdProvider.Url, .ClientId = OpenIdProvider.ClientId}); } } m_AuthService = std::make_unique(*m_AuthMgr); } void ZenStorageServer::InitializeState(const ZenStorageServerConfig& ServerOptions) { ZEN_OTEL_SPAN("InitializeState"); ZEN_TRACE_CPU("ZenStorageServer::InitializeState"); // Check root manifest to deal with schema versioning bool WipeState = false; std::string WipeReason = "Unspecified"; if (ServerOptions.IsCleanStart) { WipeState = true; WipeReason = "clean start requested"; } bool UpdateManifest = false; std::filesystem::path ManifestPath = m_DataRoot / "root_manifest"; Oid StateId = Oid::Zero; DateTime CreatedWhen{0}; if (!WipeState) { FileContents ManifestData = ReadFile(ManifestPath); if (ManifestData.ErrorCode) { if (ServerOptions.IsFirstRun) { ZEN_INFO("Initializing state at '{}'", m_DataRoot); UpdateManifest = true; } else { WipeState = true; WipeReason = fmt::format("No manifest present at '{}'", ManifestPath); } } else { IoBuffer Manifest = ManifestData.Flatten(); if (CbValidateError ValidationResult = ValidateCompactBinary(Manifest, CbValidateMode::All); ValidationResult != CbValidateError::None) { ZEN_WARN("Manifest validation failed: {}, state will be wiped", zen::ToString(ValidationResult)); WipeState = true; WipeReason = fmt::format("Validation of manifest at '{}' failed: {}", ManifestPath, zen::ToString(ValidationResult)); } else { m_RootManifest = LoadCompactBinaryObject(Manifest); const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0); const int32_t ForceScrubVersion = m_RootManifest["force_scrub_version"].AsInt32(0); StateId = m_RootManifest["state_id"].AsObjectId(); CreatedWhen = m_RootManifest["created"].AsDateTime(); if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION) { std::filesystem::path ManifestSkipSchemaChangePath = m_DataRoot / "root_manifest.ignore_schema_mismatch"; if (ManifestVersion != 0 && IsFile(ManifestSkipSchemaChangePath)) { ZEN_INFO( "Schema version {} found in '{}' does not match {}, ignoring mismatch due to existance of '{}' and updating " "schema version", ManifestVersion, ManifestPath, ZEN_CFG_SCHEMA_VERSION, ManifestSkipSchemaChangePath); UpdateManifest = true; } else { WipeState = true; WipeReason = fmt::format("Manifest schema version: {}, differs from required: {}", ManifestVersion, ZEN_CFG_SCHEMA_VERSION); } } else if (ForceScrubVersion != ZEN_CFG_DATA_FORCE_SCRUB_VERSION) { m_StartupScrubOptions = "nogc,wait"; } } } } if (StateId == Oid::Zero) { StateId = Oid::NewOid(); UpdateManifest = true; } const DateTime Now = DateTime::Now(); if (CreatedWhen.GetTicks() == 0) { CreatedWhen = Now; UpdateManifest = true; } // Handle any state wipe if (WipeState) { ZEN_OTEL_SPAN("WipingState"); ZEN_WARN("Wiping state at '{}' - reason: '{}'", m_DataRoot, WipeReason); std::error_code Ec; for (const std::filesystem::directory_entry& DirEntry : std::filesystem::directory_iterator{m_DataRoot, Ec}) { if (DirEntry.is_directory() && (DirEntry.path().filename() != "logs")) { ZEN_INFO("Deleting '{}'", DirEntry.path()); DeleteDirectories(DirEntry.path(), Ec); if (Ec) { ZEN_WARN("Delete of '{}' failed: '{}'", DirEntry.path(), Ec.message()); } } } ZEN_INFO("Wiped all directories in data root"); UpdateManifest = true; } // Write manifest { CbObjectWriter Cbo; Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "force_scrub_version" << ZEN_CFG_DATA_FORCE_SCRUB_VERSION << "created" << CreatedWhen << "updated" << Now << "state_id" << StateId; m_RootManifest = Cbo.Save(); if (UpdateManifest) { TemporaryFile::SafeWriteFile(ManifestPath, m_RootManifest.GetBuffer().GetView()); } if (!ServerOptions.IsTest) { try { EmitCentralManifest(ServerOptions.SystemRootDir, StateId, m_RootManifest, ManifestPath); } catch (const std::exception& Ex) { ZEN_WARN("Unable to emit central manifest: ", Ex.what()); } } } // Write state marker { std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker"; static const std::string_view StateMarkerContent = "deleting this file will cause " ZEN_APP_NAME " to exit"sv; WriteFile(StateMarkerPath, IoBuffer(IoBuffer::Wrap, StateMarkerContent.data(), StateMarkerContent.size())); EnqueueStateMarkerTimer(); } EnqueueStateExitFlagTimer(); } void ZenStorageServer::InitializeStructuredCache(const ZenStorageServerConfig& ServerOptions) { ZEN_OTEL_SPAN("InitializeStructuredCache"); ZEN_TRACE_CPU("ZenStorageServer::InitializeStructuredCache"); using namespace std::literals; ZEN_INFO("instantiating structured cache service"); ZenCacheStore::Configuration Config; Config.AllowAutomaticCreationOfNamespaces = true; Config.Logging = {.EnableWriteLog = ServerOptions.StructuredCacheConfig.WriteLogEnabled, .EnableAccessLog = ServerOptions.StructuredCacheConfig.AccessLogEnabled}; for (const auto& It : ServerOptions.StructuredCacheConfig.PerBucketConfigs) { const std::string& BucketName = It.first; const ZenStructuredCacheBucketConfig& ZenBucketConfig = It.second; ZenCacheDiskLayer::BucketConfiguration BucketConfig = {.MaxBlockSize = ZenBucketConfig.MaxBlockSize, .PayloadAlignment = ZenBucketConfig.PayloadAlignment, .MemCacheSizeThreshold = ZenBucketConfig.MemCacheSizeThreshold, .LargeObjectThreshold = ZenBucketConfig.LargeObjectThreshold, .LimitOverwrites = ZenBucketConfig.LimitOverwrites}; Config.NamespaceConfig.DiskLayerConfig.BucketConfigMap.insert_or_assign(BucketName, BucketConfig); } Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MaxBlockSize = ServerOptions.StructuredCacheConfig.BucketConfig.MaxBlockSize, Config.NamespaceConfig.DiskLayerConfig.BucketConfig.PayloadAlignment = ServerOptions.StructuredCacheConfig.BucketConfig.PayloadAlignment, Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold = ServerOptions.StructuredCacheConfig.BucketConfig.MemCacheSizeThreshold, Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LargeObjectThreshold = ServerOptions.StructuredCacheConfig.BucketConfig.LargeObjectThreshold, Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LimitOverwrites = ServerOptions.StructuredCacheConfig.BucketConfig.LimitOverwrites; Config.NamespaceConfig.DiskLayerConfig.MemCacheTargetFootprintBytes = ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes; Config.NamespaceConfig.DiskLayerConfig.MemCacheTrimIntervalSeconds = ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds; Config.NamespaceConfig.DiskLayerConfig.MemCacheMaxAgeSeconds = ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds; if (ServerOptions.IsDedicated) { Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LargeObjectThreshold = 128 * 1024 * 1024; } m_CacheStore = new ZenCacheStore(m_GcManager, *m_JobQueue, m_DataRoot / "cache", Config, m_GcManager.GetDiskWriteBlocker()); const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; UpstreamCacheOptions UpstreamOptions; UpstreamOptions.ReadUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; UpstreamOptions.WriteUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; if (UpstreamConfig.UpstreamThreadCount < 32) { UpstreamOptions.ThreadCount = static_cast(UpstreamConfig.UpstreamThreadCount); } m_UpstreamCache = CreateUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); m_UpstreamService = std::make_unique(*m_UpstreamCache, *m_AuthMgr); m_UpstreamCache->Initialize(); if (ServerOptions.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) { // Zen upstream { std::vector ZenUrls = UpstreamConfig.ZenConfig.Urls; if (!UpstreamConfig.ZenConfig.Dns.empty()) { for (const std::string& Dns : UpstreamConfig.ZenConfig.Dns) { if (!Dns.empty()) { const asio::error_code Err = utils::ResolveHostname(m_IoContext, Dns, "8558"sv, ZenUrls); if (Err) { ZEN_ERROR("resolve of '{}' FAILED, reason '{}'", Dns, Err.message()); } } } } std::erase_if(ZenUrls, [](const auto& Url) { return Url.empty(); }); if (!ZenUrls.empty()) { const auto ZenEndpointName = UpstreamConfig.ZenConfig.Name.empty() ? "Zen"sv : UpstreamConfig.ZenConfig.Name; std::unique_ptr ZenEndpoint = UpstreamEndpoint::CreateZenEndpoint( {.Name = ZenEndpointName, .Urls = ZenUrls, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}); m_UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); } } // Jupiter upstream if (UpstreamConfig.JupiterConfig.Url.empty() == false) { std::string_view EndpointName = UpstreamConfig.JupiterConfig.Name.empty() ? "Jupiter"sv : UpstreamConfig.JupiterConfig.Name; auto Options = JupiterClientOptions{.Name = EndpointName, .ServiceUrl = UpstreamConfig.JupiterConfig.Url, .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace, .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; auto AuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl, .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId, .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret, .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider, .AccessToken = UpstreamConfig.JupiterConfig.AccessToken}; std::unique_ptr JupiterEndpoint = UpstreamEndpoint::CreateJupiterEndpoint(Options, AuthConfig, *m_AuthMgr); m_UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); } } m_StructuredCacheService = std::make_unique(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, *m_UpstreamCache, m_GcManager.GetDiskWriteBlocker(), *m_OpenProcessCache); m_StatsReporter.AddProvider(m_CacheStore.Get()); m_StatsReporter.AddProvider(m_CidStore.get()); m_StatsReporter.AddProvider(m_BuildCidStore.get()); } void ZenStorageServer::Run() { if (m_ProcessMonitor.IsActive()) { CheckOwnerPid(); } if (!m_TestMode) { ZEN_INFO( "__________ _________ __ \n" "\\____ /____ ____ / _____// |_ ___________ ____ \n" " / // __ \\ / \\ \\_____ \\\\ __\\/ _ \\_ __ \\_/ __ \\ \n" " / /\\ ___/| | \\ / \\| | ( <_> ) | \\/\\ ___/ \n" "/_______ \\___ >___| / /_______ /|__| \\____/|__| \\___ >\n" " \\/ \\/ \\/ \\/ \\/ \n"); ExtendableStringBuilder<256> BuildOptions; GetBuildOptions(BuildOptions, '\n'); ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions); } ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", GetCurrentProcessId()); if (m_FrontendService) { ZEN_INFO("frontend link: {}", m_Http->GetServiceUri(m_FrontendService.get())); } else { ZEN_INFO("frontend service disabled"); } #if ZEN_PLATFORM_WINDOWS if (zen::windows::IsRunningOnWine()) { ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well"); } #endif #if ZEN_USE_SENTRY ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); if (m_UseSentry) { SentryIntegration::ClearCaches(); } #endif if (m_DebugOptionForcedCrash) { ZEN_DEBUG_BREAK(); } const bool IsInteractiveMode = IsInteractiveSession() && !m_TestMode; if (!m_StartupScrubOptions.empty()) { using namespace std::literals; ZEN_INFO("triggering scrub with settings: '{}'", m_StartupScrubOptions); bool DoScrub = true; bool DoWait = false; GcScheduler::TriggerScrubParams ScrubParams; ForEachStrTok(m_StartupScrubOptions, ',', [&](std::string_view Token) { if (Token == "nocas"sv) { ScrubParams.SkipCas = true; } else if (Token == "nodelete"sv) { ScrubParams.SkipDelete = true; } else if (Token == "nogc"sv) { ScrubParams.SkipGc = true; } else if (Token == "no"sv) { DoScrub = false; } else if (Token == "wait"sv) { DoWait = true; } return true; }); if (DoScrub) { ZEN_INFO("Triggering Scrub/GC operation..."); while (!m_GcScheduler.TriggerScrub(ScrubParams)) { ZEN_INFO("GC scheduler is busy, waiting for it to complete..."); Sleep(500); } if (DoWait) { while (m_GcScheduler.IsManualTriggerPresent()) { ZEN_INFO("Waiting for Scrub/GC to complete..."); Sleep(2000); } ZEN_INFO("Scrub/GC completed"); } } } SetNewState(kRunning); OnReady(); if (m_IsPowerCycle) { ZEN_INFO("Power cycle mode enabled -- shutting down"); RequestExit(0); } m_Http->Run(IsInteractiveMode); SetNewState(kShuttingDown); ZEN_INFO(ZEN_APP_NAME " exiting"); } void ZenStorageServer::Cleanup() { ZEN_TRACE_CPU("ZenStorageServer::Cleanup"); ZEN_INFO(ZEN_APP_NAME " cleaning up"); try { m_IoContext.stop(); if (m_IoRunner.joinable()) { m_IoRunner.join(); } if (m_Http) { m_Http->Close(); } if (m_JobQueue) { m_JobQueue->Stop(); } m_StatsReporter.Shutdown(); m_GcScheduler.Shutdown(); Flush(); #if ZEN_WITH_COMPUTE_SERVICES m_HttpFunctionService.reset(); #endif m_AdminService.reset(); m_VfsService.reset(); m_VfsServiceImpl.reset(); m_ObjStoreService.reset(); m_FrontendService.reset(); m_BuildStoreService.reset(); m_BuildStore = {}; m_BuildCidStore.reset(); m_StructuredCacheService.reset(); m_UpstreamService.reset(); m_UpstreamCache.reset(); m_CacheStore = {}; m_HttpWorkspacesService.reset(); m_Workspaces.reset(); m_HttpProjectService.reset(); m_ProjectStore = {}; m_CidStore.reset(); m_AuthService.reset(); m_AuthMgr.reset(); m_OpenProcessCache.reset(); m_Http = {}; ShutdownWorkerPools(); m_JobQueue.reset(); } catch (const std::exception& Ex) { ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); } } void ZenStorageServer::EnqueueStateMarkerTimer() { ZEN_MEMSCOPE(GetZenserverTag()); m_StateMarkerTimer.expires_after(std::chrono::seconds(5)); m_StateMarkerTimer.async_wait([this](const asio::error_code&) { CheckStateMarker(); }); EnsureIoRunner(); } void ZenStorageServer::CheckStateMarker() { ZEN_MEMSCOPE(GetZenserverTag()); std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker"; try { if (!IsFile(StateMarkerPath)) { ZEN_WARN("state marker at {} has been deleted, exiting", StateMarkerPath); RequestExit(1); return; } } catch (const std::exception& Ex) { ZEN_WARN("state marker at {} could not be checked, reason: '{}'", StateMarkerPath, Ex.what()); RequestExit(1); return; } EnqueueStateMarkerTimer(); } void ZenStorageServer::Flush() { ZEN_TRACE_CPU("ZenStorageServer::Flush"); if (m_CidStore) m_CidStore->Flush(); if (m_StructuredCacheService) m_StructuredCacheService->Flush(); if (m_ProjectStore) m_ProjectStore->Flush(); if (m_BuildCidStore) m_BuildCidStore->Flush(); } ////////////////////////////////////////////////////////////////////////// ZenStorageServerMain::ZenStorageServerMain(ZenStorageServerConfig& ServerOptions) : ZenServerMain(ServerOptions) , m_ServerOptions(ServerOptions) { } ZenStorageServerMain::~ZenStorageServerMain() { } void ZenStorageServerMain::InitializeLogging() { InitializeServerLogging(m_ServerOptions, /* WithCacheService */ true); } void ZenStorageServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) { ZenStorageServer Server; Server.SetDataRoot(m_ServerOptions.DataDir); Server.SetContentRoot(m_ServerOptions.ContentDir); Server.SetTestMode(m_ServerOptions.IsTest); Server.SetDedicatedMode(m_ServerOptions.IsDedicated); int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry); if (EffectiveBasePort == -1) { // Server.Initialize has already logged what the issue is - just exit with failure code here. std::exit(1); } Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); if (EffectiveBasePort != m_ServerOptions.BasePort) { ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); m_ServerOptions.BasePort = EffectiveBasePort; } std::unique_ptr ShutdownThread; std::unique_ptr ShutdownEvent; ExtendableStringBuilder<64> ShutdownEventName; ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown"; ShutdownEvent.reset(new NamedEvent{ShutdownEventName}); // Monitor shutdown signals ShutdownThread.reset(new std::thread{[&] { SetCurrentThreadName("shutdown_mon"); ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId()); if (ShutdownEvent->Wait()) { ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId()); Server.RequestExit(0); } else { ZEN_INFO("shutdown signal wait() failed"); } }}); auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] { ReportServiceStatus(ServiceStatus::Stopping); if (ShutdownEvent) { ShutdownEvent->Set(); } if (ShutdownThread && ShutdownThread->joinable()) { ShutdownThread->join(); } }); // If we have a parent process, establish the mechanisms we need // to be able to communicate readiness with the parent Server.SetIsReadyFunc([&] { std::error_code Ec; m_LockFile.Update(MakeLockData(true), Ec); ReportServiceStatus(ServiceStatus::Running); NotifyReady(); }); Server.Run(); } } // namespace zen