diff options
| author | Stefan Boberg <[email protected]> | 2025-10-14 11:32:16 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-14 11:32:16 +0200 |
| commit | ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2 (patch) | |
| tree | 005a50adfddf6982bab3a06bb93d4c50da1a11fd /src/zenserver/storage/zenstorageserver.cpp | |
| parent | make asiohttp work without IPv6 (#562) (diff) | |
| download | zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.tar.xz zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.zip | |
move all storage-related services into storage tree (#571)
* move all storage-related services into storage tree
* move config into config/
* also move admin service into storage since it mostly has storage related functionality
* header consolidation
Diffstat (limited to 'src/zenserver/storage/zenstorageserver.cpp')
| -rw-r--r-- | src/zenserver/storage/zenstorageserver.cpp | 961 |
1 files changed, 961 insertions, 0 deletions
diff --git a/src/zenserver/storage/zenstorageserver.cpp b/src/zenserver/storage/zenstorageserver.cpp new file mode 100644 index 000000000..73896512d --- /dev/null +++ b/src/zenserver/storage/zenstorageserver.cpp @@ -0,0 +1,961 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenstorageserver.h" + +#include <zenbase/refcount.h> +#include <zencore/basicfile.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/config.h> +#include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> +#include <zencore/jobqueue.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/sentryintegration.h> +#include <zencore/session.h> +#include <zencore/string.h> +#include <zencore/thread.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpserver.h> +#include <zenremotestore/jupiter/jupiterclient.h> +#include <zenstore/buildstore/buildstore.h> +#include <zenstore/cidstore.h> +#include <zenstore/scrubcontext.h> +#include <zenstore/vfsimpl.h> +#include <zenstore/workspaces.h> +#include <zenutil/service.h> +#include <zenutil/workerpools.h> +#include <zenutil/zenserverprocess.h> + +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> +#endif + +#if ZEN_PLATFORM_LINUX +# include <pwd.h> +#endif + +#if ZEN_PLATFORM_MAC +# include <pwd.h> +#endif + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +#include <asio.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <exception> +#include <set> + +////////////////////////////////////////////////////////////////////////// + +#include "diag/logging.h" +#include "storageconfig.h" + +#include <zencore/memory/llm.h> + +namespace zen { + +namespace utils { + asio::error_code ResolveHostname(asio::io_context& Ctx, + std::string_view Host, + std::string_view DefaultPort, + std::vector<std::string>& OutEndpoints) + { + std::string_view Port = DefaultPort; + + if (const size_t Idx = Host.find(":"); Idx != std::string_view::npos) + { + Port = Host.substr(Idx + 1); + Host = Host.substr(0, Idx); + } + + asio::ip::tcp::resolver Resolver(Ctx); + + asio::error_code ErrorCode; + asio::ip::tcp::resolver::results_type Endpoints = Resolver.resolve(Host, Port, ErrorCode); + + if (!ErrorCode) + { + for (const asio::ip::tcp::endpoint Ep : Endpoints) + { + OutEndpoints.push_back(fmt::format("http://{}:{}", Ep.address().to_string(), Ep.port())); + } + } + + return ErrorCode; + } +} // namespace utils + +using namespace std::literals; + +ZenStorageServer::ZenStorageServer() +{ +} + +ZenStorageServer::~ZenStorageServer() +{ +} + +int +ZenStorageServer::Initialize(const ZenStorageServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry) +{ + ZEN_TRACE_CPU("ZenStorageServer::Initialize"); + ZEN_MEMSCOPE(GetZenserverTag()); + + const int EffectiveBasePort = ZenServerBase::Initialize(ServerOptions, ServerEntry); + if (EffectiveBasePort < 0) + { + return EffectiveBasePort; + } + + m_DebugOptionForcedCrash = ServerOptions.ShouldCrash; + m_StartupScrubOptions = ServerOptions.ScrubOptions; + + InitializeState(ServerOptions); + InitializeServices(ServerOptions); + RegisterServices(); + + ZenServerBase::Finalize(); + + return EffectiveBasePort; +} + +void +ZenStorageServer::RegisterServices() +{ + m_Http->RegisterService(*m_AuthService); + m_Http->RegisterService(m_StatsService); + m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics + +#if ZEN_WITH_TESTS + m_Http->RegisterService(m_TestingService); +#endif + + if (m_StructuredCacheService) + { + m_Http->RegisterService(*m_StructuredCacheService); + } + + if (m_UpstreamService) + { + m_Http->RegisterService(*m_UpstreamService); + } + + if (m_HttpProjectService) + { + m_Http->RegisterService(*m_HttpProjectService); + } + + if (m_HttpWorkspacesService) + { + m_Http->RegisterService(*m_HttpWorkspacesService); + } + + m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService); + + if (m_FrontendService) + { + m_Http->RegisterService(*m_FrontendService); + } + + if (m_ObjStoreService) + { + m_Http->RegisterService(*m_ObjStoreService); + } + + if (m_BuildStoreService) + { + m_Http->RegisterService(*m_BuildStoreService); + } + +#if ZEN_WITH_VFS + m_Http->RegisterService(*m_VfsService); +#endif // ZEN_WITH_VFS + + m_Http->RegisterService(*m_AdminService); +} + +void +ZenStorageServer::InitializeServices(const ZenStorageServerOptions& ServerOptions) +{ + InitializeAuthentication(ServerOptions); + + ZEN_INFO("initializing storage"); + + CidStoreConfiguration Config; + Config.RootDirectory = m_DataRoot / "cas"; + + m_CidStore = std::make_unique<CidStore>(m_GcManager); + m_CidStore->Initialize(Config); + + ZEN_INFO("instantiating project service"); + + m_JobQueue = MakeJobQueue(8, "bgjobs"); + + m_ProjectStore = new ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, ProjectStore::Configuration{}); + m_HttpProjectService.reset( + new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatusService, m_StatsService, *m_AuthMgr, *m_OpenProcessCache, *m_JobQueue}); + + if (ServerOptions.WorksSpacesConfig.Enabled) + { + m_Workspaces.reset(new Workspaces()); + m_HttpWorkspacesService.reset( + new HttpWorkspacesService(m_StatusService, + m_StatsService, + {.SystemRootDir = ServerOptions.SystemRootDir, + .AllowConfigurationChanges = ServerOptions.WorksSpacesConfig.AllowConfigurationChanges}, + *m_Workspaces)); + } + + if (ServerOptions.BuildStoreConfig.Enabled) + { + CidStoreConfiguration BuildCidConfig; + BuildCidConfig.RootDirectory = m_DataRoot / "builds_cas"; + m_BuildCidStore = std::make_unique<CidStore>(m_GcManager); + m_BuildCidStore->Initialize(BuildCidConfig); + + BuildStoreConfig BuildsCfg; + BuildsCfg.RootDirectory = m_DataRoot / "builds"; + BuildsCfg.MaxDiskSpaceLimit = ServerOptions.BuildStoreConfig.MaxDiskSpaceLimit; + m_BuildStore = std::make_unique<BuildStore>(std::move(BuildsCfg), m_GcManager, *m_BuildCidStore); + } + + if (ServerOptions.StructuredCacheConfig.Enabled) + { + InitializeStructuredCache(ServerOptions); + } + else + { + ZEN_INFO("NOT instantiating structured cache service"); + } + + if (ServerOptions.ObjectStoreEnabled) + { + ObjectStoreConfig ObjCfg; + ObjCfg.RootDirectory = m_DataRoot / "obj"; + + for (const auto& Bucket : ServerOptions.ObjectStoreConfig.Buckets) + { + ObjectStoreConfig::BucketConfig NewBucket{.Name = Bucket.Name}; + NewBucket.Directory = Bucket.Directory.empty() ? (ObjCfg.RootDirectory / Bucket.Name) : Bucket.Directory; + ObjCfg.Buckets.push_back(std::move(NewBucket)); + } + + m_ObjStoreService = std::make_unique<HttpObjectStoreService>(m_StatusService, std::move(ObjCfg)); + } + + if (ServerOptions.BuildStoreConfig.Enabled) + { + m_BuildStoreService = std::make_unique<HttpBuildStoreService>(m_StatusService, m_StatsService, *m_BuildStore); + } + +#if ZEN_WITH_VFS + m_VfsServiceImpl = std::make_unique<VfsServiceImpl>(); + m_VfsServiceImpl->AddService(Ref<ProjectStore>(m_ProjectStore)); + m_VfsServiceImpl->AddService(Ref<ZenCacheStore>(m_CacheStore)); + + m_VfsService = std::make_unique<VfsService>(m_StatusService, m_VfsServiceImpl.get()); +#endif // ZEN_WITH_VFS + + ZEN_INFO("initializing GC, enabled '{}', interval {}, lightweight interval {}", + ServerOptions.GcConfig.Enabled, + NiceTimeSpanMs(ServerOptions.GcConfig.IntervalSeconds * 1000ull), + NiceTimeSpanMs(ServerOptions.GcConfig.LightweightIntervalSeconds * 1000ull)); + + GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc", + .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), + .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), + .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), + .MaxProjectStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds), + .MaxBuildStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.BuildStore.MaxDurationSeconds), + .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, + .Enabled = ServerOptions.GcConfig.Enabled, + .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, + .DiskSizeSoftLimit = ServerOptions.GcConfig.DiskSizeSoftLimit, + .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites, + .LightweightInterval = std::chrono::seconds(ServerOptions.GcConfig.LightweightIntervalSeconds), + .UseGCVersion = ServerOptions.GcConfig.UseGCV2 ? GcVersion::kV2 : GcVersion::kV1_Deprecated, + .CompactBlockUsageThresholdPercent = ServerOptions.GcConfig.CompactBlockUsageThresholdPercent, + .Verbose = ServerOptions.GcConfig.Verbose, + .SingleThreaded = ServerOptions.GcConfig.SingleThreaded, + .AttachmentPassCount = ServerOptions.GcConfig.AttachmentPassCount}; + m_GcScheduler.Initialize(GcConfig); + + // Create and register admin interface last to make sure all is properly initialized + m_AdminService = std::make_unique<HttpAdminService>( + m_GcScheduler, + *m_JobQueue, + m_CacheStore.Get(), + [this]() { Flush(); }, + HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile, + .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log", + .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"}, + ServerOptions); +} + +void +ZenStorageServer::InitializeAuthentication(const ZenStorageServerOptions& ServerOptions) +{ + // Setup authentication manager + { + ZEN_TRACE_CPU("ZenStorageServer::InitAuth"); + std::string EncryptionKey = ServerOptions.EncryptionKey; + + if (EncryptionKey.empty()) + { + EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456"; + + if (ServerOptions.IsDedicated) + { + ZEN_WARN("Using default encryption key for authentication state"); + } + } + + std::string EncryptionIV = ServerOptions.EncryptionIV; + + if (EncryptionIV.empty()) + { + EncryptionIV = "0123456789abcdef"; + + if (ServerOptions.IsDedicated) + { + ZEN_WARN("Using default encryption initialization vector for authentication state"); + } + } + + m_AuthMgr = AuthMgr::Create({.RootDirectory = m_DataRoot / "auth", + .EncryptionKey = AesKey256Bit::FromString(EncryptionKey), + .EncryptionIV = AesIV128Bit::FromString(EncryptionIV)}); + + for (const ZenOpenIdProviderConfig& OpenIdProvider : ServerOptions.AuthConfig.OpenIdProviders) + { + m_AuthMgr->AddOpenIdProvider({.Name = OpenIdProvider.Name, .Url = OpenIdProvider.Url, .ClientId = OpenIdProvider.ClientId}); + } + } + + m_AuthService = std::make_unique<HttpAuthService>(*m_AuthMgr); +} + +void +ZenStorageServer::InitializeState(const ZenStorageServerOptions& ServerOptions) +{ + ZEN_TRACE_CPU("ZenStorageServer::InitializeState"); + + // Check root manifest to deal with schema versioning + + bool WipeState = false; + std::string WipeReason = "Unspecified"; + + if (ServerOptions.IsCleanStart) + { + WipeState = true; + WipeReason = "clean start requested"; + } + + bool UpdateManifest = false; + std::filesystem::path ManifestPath = m_DataRoot / "root_manifest"; + Oid StateId = Oid::Zero; + DateTime CreatedWhen{0}; + + if (!WipeState) + { + FileContents ManifestData = ReadFile(ManifestPath); + + if (ManifestData.ErrorCode) + { + if (ServerOptions.IsFirstRun) + { + ZEN_INFO("Initializing state at '{}'", m_DataRoot); + + UpdateManifest = true; + } + else + { + WipeState = true; + WipeReason = fmt::format("No manifest present at '{}'", ManifestPath); + } + } + else + { + IoBuffer Manifest = ManifestData.Flatten(); + + if (CbValidateError ValidationResult = ValidateCompactBinary(Manifest, CbValidateMode::All); + ValidationResult != CbValidateError::None) + { + ZEN_WARN("Manifest validation failed: {}, state will be wiped", zen::ToString(ValidationResult)); + + WipeState = true; + WipeReason = fmt::format("Validation of manifest at '{}' failed: {}", ManifestPath, zen::ToString(ValidationResult)); + } + else + { + m_RootManifest = LoadCompactBinaryObject(Manifest); + + const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0); + StateId = m_RootManifest["state_id"].AsObjectId(); + CreatedWhen = m_RootManifest["created"].AsDateTime(); + + if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION) + { + std::filesystem::path ManifestSkipSchemaChangePath = m_DataRoot / "root_manifest.ignore_schema_mismatch"; + if (ManifestVersion != 0 && IsFile(ManifestSkipSchemaChangePath)) + { + ZEN_INFO( + "Schema version {} found in '{}' does not match {}, ignoring mismatch due to existance of '{}' and updating " + "schema version", + ManifestVersion, + ManifestPath, + ZEN_CFG_SCHEMA_VERSION, + ManifestSkipSchemaChangePath); + UpdateManifest = true; + } + else + { + WipeState = true; + WipeReason = + fmt::format("Manifest schema version: {}, differs from required: {}", ManifestVersion, ZEN_CFG_SCHEMA_VERSION); + } + } + } + } + } + + if (StateId == Oid::Zero) + { + StateId = Oid::NewOid(); + UpdateManifest = true; + } + + const DateTime Now = DateTime::Now(); + + if (CreatedWhen.GetTicks() == 0) + { + CreatedWhen = Now; + UpdateManifest = true; + } + + // Handle any state wipe + + if (WipeState) + { + ZEN_WARN("Wiping state at '{}' - reason: '{}'", m_DataRoot, WipeReason); + + std::error_code Ec; + for (const std::filesystem::directory_entry& DirEntry : std::filesystem::directory_iterator{m_DataRoot, Ec}) + { + if (DirEntry.is_directory() && (DirEntry.path().filename() != "logs")) + { + ZEN_INFO("Deleting '{}'", DirEntry.path()); + + DeleteDirectories(DirEntry.path(), Ec); + + if (Ec) + { + ZEN_WARN("Delete of '{}' returned error: '{}'", DirEntry.path(), Ec.message()); + } + } + } + + ZEN_INFO("Wiped all directories in data root"); + + UpdateManifest = true; + } + + // Write manifest + + { + CbObjectWriter Cbo; + Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << CreatedWhen << "updated" << Now << "state_id" << StateId; + + m_RootManifest = Cbo.Save(); + + if (UpdateManifest) + { + TemporaryFile::SafeWriteFile(ManifestPath, m_RootManifest.GetBuffer().GetView()); + } + + if (!ServerOptions.IsTest) + { + try + { + EmitCentralManifest(ServerOptions.SystemRootDir, StateId, m_RootManifest, ManifestPath); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Unable to emit central manifest: ", Ex.what()); + } + } + } + + // Write state marker + + { + std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker"; + static const std::string_view StateMarkerContent = "deleting this file will cause " ZEN_APP_NAME " to exit"sv; + WriteFile(StateMarkerPath, IoBuffer(IoBuffer::Wrap, StateMarkerContent.data(), StateMarkerContent.size())); + + EnqueueStateMarkerTimer(); + } + + EnqueueStateExitFlagTimer(); +} + +void +ZenStorageServer::InitializeStructuredCache(const ZenStorageServerOptions& ServerOptions) +{ + ZEN_TRACE_CPU("ZenStorageServer::InitializeStructuredCache"); + + using namespace std::literals; + + ZEN_INFO("instantiating structured cache service"); + ZenCacheStore::Configuration Config; + Config.AllowAutomaticCreationOfNamespaces = true; + Config.Logging = {.EnableWriteLog = ServerOptions.StructuredCacheConfig.WriteLogEnabled, + .EnableAccessLog = ServerOptions.StructuredCacheConfig.AccessLogEnabled}; + + for (const auto& It : ServerOptions.StructuredCacheConfig.PerBucketConfigs) + { + const std::string& BucketName = It.first; + const ZenStructuredCacheBucketConfig& ZenBucketConfig = It.second; + ZenCacheDiskLayer::BucketConfiguration BucketConfig = {.MaxBlockSize = ZenBucketConfig.MaxBlockSize, + .PayloadAlignment = ZenBucketConfig.PayloadAlignment, + .MemCacheSizeThreshold = ZenBucketConfig.MemCacheSizeThreshold, + .LargeObjectThreshold = ZenBucketConfig.LargeObjectThreshold, + .LimitOverwrites = ZenBucketConfig.LimitOverwrites}; + Config.NamespaceConfig.DiskLayerConfig.BucketConfigMap.insert_or_assign(BucketName, BucketConfig); + } + Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MaxBlockSize = ServerOptions.StructuredCacheConfig.BucketConfig.MaxBlockSize, + Config.NamespaceConfig.DiskLayerConfig.BucketConfig.PayloadAlignment = + ServerOptions.StructuredCacheConfig.BucketConfig.PayloadAlignment, + Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold = + ServerOptions.StructuredCacheConfig.BucketConfig.MemCacheSizeThreshold, + Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LargeObjectThreshold = + ServerOptions.StructuredCacheConfig.BucketConfig.LargeObjectThreshold, + Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LimitOverwrites = ServerOptions.StructuredCacheConfig.BucketConfig.LimitOverwrites; + Config.NamespaceConfig.DiskLayerConfig.MemCacheTargetFootprintBytes = ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes; + Config.NamespaceConfig.DiskLayerConfig.MemCacheTrimIntervalSeconds = ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds; + Config.NamespaceConfig.DiskLayerConfig.MemCacheMaxAgeSeconds = ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds; + + if (ServerOptions.IsDedicated) + { + Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LargeObjectThreshold = 128 * 1024 * 1024; + } + + m_CacheStore = new ZenCacheStore(m_GcManager, *m_JobQueue, m_DataRoot / "cache", Config, m_GcManager.GetDiskWriteBlocker()); + m_OpenProcessCache = std::make_unique<OpenProcessCache>(); + + const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; + + UpstreamCacheOptions UpstreamOptions; + UpstreamOptions.ReadUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; + UpstreamOptions.WriteUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; + + if (UpstreamConfig.UpstreamThreadCount < 32) + { + UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); + } + + m_UpstreamCache = CreateUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); + m_UpstreamService = std::make_unique<HttpUpstreamService>(*m_UpstreamCache, *m_AuthMgr); + m_UpstreamCache->Initialize(); + + if (ServerOptions.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) + { + // Zen upstream + { + std::vector<std::string> ZenUrls = UpstreamConfig.ZenConfig.Urls; + if (!UpstreamConfig.ZenConfig.Dns.empty()) + { + for (const std::string& Dns : UpstreamConfig.ZenConfig.Dns) + { + if (!Dns.empty()) + { + const asio::error_code Err = utils::ResolveHostname(m_IoContext, Dns, "8558"sv, ZenUrls); + if (Err) + { + ZEN_ERROR("resolve of '{}' FAILED, reason '{}'", Dns, Err.message()); + } + } + } + } + + std::erase_if(ZenUrls, [](const auto& Url) { return Url.empty(); }); + + if (!ZenUrls.empty()) + { + const auto ZenEndpointName = UpstreamConfig.ZenConfig.Name.empty() ? "Zen"sv : UpstreamConfig.ZenConfig.Name; + + std::unique_ptr<UpstreamEndpoint> ZenEndpoint = UpstreamEndpoint::CreateZenEndpoint( + {.Name = ZenEndpointName, + .Urls = ZenUrls, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}); + + m_UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); + } + } + + // Jupiter upstream + if (UpstreamConfig.JupiterConfig.Url.empty() == false) + { + std::string_view EndpointName = UpstreamConfig.JupiterConfig.Name.empty() ? "Jupiter"sv : UpstreamConfig.JupiterConfig.Name; + + auto Options = JupiterClientOptions{.Name = EndpointName, + .ServiceUrl = UpstreamConfig.JupiterConfig.Url, + .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace, + .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; + + auto AuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl, + .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId, + .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret, + .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider, + .AccessToken = UpstreamConfig.JupiterConfig.AccessToken}; + + std::unique_ptr<UpstreamEndpoint> JupiterEndpoint = UpstreamEndpoint::CreateJupiterEndpoint(Options, AuthConfig, *m_AuthMgr); + + m_UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); + } + } + + m_StructuredCacheService = std::make_unique<HttpStructuredCacheService>(*m_CacheStore, + *m_CidStore, + m_StatsService, + m_StatusService, + *m_UpstreamCache, + m_GcManager.GetDiskWriteBlocker(), + *m_OpenProcessCache); + + m_StatsReporter.AddProvider(m_CacheStore.Get()); + m_StatsReporter.AddProvider(m_CidStore.get()); + m_StatsReporter.AddProvider(m_BuildCidStore.get()); +} + +void +ZenStorageServer::Run() +{ + if (m_ProcessMonitor.IsActive()) + { + CheckOwnerPid(); + } + + if (!m_TestMode) + { + ZEN_INFO( + "__________ _________ __ \n" + "\\____ /____ ____ / _____// |_ ___________ ____ \n" + " / // __ \\ / \\ \\_____ \\\\ __\\/ _ \\_ __ \\_/ __ \\ \n" + " / /\\ ___/| | \\ / \\| | ( <_> ) | \\/\\ ___/ \n" + "/_______ \\___ >___| / /_______ /|__| \\____/|__| \\___ >\n" + " \\/ \\/ \\/ \\/ \\/ \n"); + } + + ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", GetCurrentProcessId()); + +#if ZEN_PLATFORM_WINDOWS + if (zen::windows::IsRunningOnWine()) + { + ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well"); + } +#endif + +#if ZEN_USE_SENTRY + ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); + if (m_UseSentry) + { + SentryIntegration::ClearCaches(); + } +#endif + + if (m_DebugOptionForcedCrash) + { + ZEN_DEBUG_BREAK(); + } + + const bool IsInteractiveMode = IsInteractiveSession() && !m_TestMode; + + SetNewState(kRunning); + + OnReady(); + + if (!m_StartupScrubOptions.empty()) + { + using namespace std::literals; + + ZEN_INFO("triggering scrub with settings: '{}'", m_StartupScrubOptions); + + bool DoScrub = true; + bool DoWait = false; + GcScheduler::TriggerScrubParams ScrubParams; + + ForEachStrTok(m_StartupScrubOptions, ',', [&](std::string_view Token) { + if (Token == "nocas"sv) + { + ScrubParams.SkipCas = true; + } + else if (Token == "nodelete"sv) + { + ScrubParams.SkipDelete = true; + } + else if (Token == "nogc"sv) + { + ScrubParams.SkipGc = true; + } + else if (Token == "no"sv) + { + DoScrub = false; + } + else if (Token == "wait"sv) + { + DoWait = true; + } + return true; + }); + + if (DoScrub) + { + m_GcScheduler.TriggerScrub(ScrubParams); + + if (DoWait) + { + auto State = m_GcScheduler.Status(); + + while ((State != GcSchedulerStatus::kRunning) && (State != GcSchedulerStatus::kStopped)) + { + Sleep(500); + + State = m_GcScheduler.Status(); + } + + ZEN_INFO("waiting for Scrub/GC to complete..."); + + while (State == GcSchedulerStatus::kRunning) + { + Sleep(500); + + State = m_GcScheduler.Status(); + } + + ZEN_INFO("Scrub/GC completed"); + } + } + } + + if (m_IsPowerCycle) + { + ZEN_INFO("Power cycle mode enabled -- shutting down"); + RequestExit(0); + } + + m_Http->Run(IsInteractiveMode); + + SetNewState(kShuttingDown); + + ZEN_INFO(ZEN_APP_NAME " exiting"); +} + +void +ZenStorageServer::Cleanup() +{ + ZEN_TRACE_CPU("ZenStorageServer::Cleanup"); + ZEN_INFO(ZEN_APP_NAME " cleaning up"); + try + { + m_IoContext.stop(); + if (m_IoRunner.joinable()) + { + m_IoRunner.join(); + } + + if (m_Http) + { + m_Http->Close(); + } + + if (m_JobQueue) + { + m_JobQueue->Stop(); + } + + m_StatsReporter.Shutdown(); + m_GcScheduler.Shutdown(); + + Flush(); + + m_AdminService.reset(); + m_VfsService.reset(); + m_VfsServiceImpl.reset(); + m_ObjStoreService.reset(); + m_FrontendService.reset(); + + m_BuildStoreService.reset(); + m_BuildStore = {}; + m_BuildCidStore.reset(); + + m_StructuredCacheService.reset(); + m_UpstreamService.reset(); + m_UpstreamCache.reset(); + m_CacheStore = {}; + m_OpenProcessCache.reset(); + + m_HttpWorkspacesService.reset(); + m_Workspaces.reset(); + m_HttpProjectService.reset(); + m_ProjectStore = {}; + m_CidStore.reset(); + m_AuthService.reset(); + m_AuthMgr.reset(); + m_Http = {}; + + ShutdownWorkerPools(); + + m_JobQueue.reset(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); + } +} + +void +ZenStorageServer::EnqueueStateMarkerTimer() +{ + ZEN_MEMSCOPE(GetZenserverTag()); + m_StateMarkerTimer.expires_after(std::chrono::seconds(5)); + m_StateMarkerTimer.async_wait([this](const asio::error_code&) { CheckStateMarker(); }); + EnsureIoRunner(); +} + +void +ZenStorageServer::CheckStateMarker() +{ + ZEN_MEMSCOPE(GetZenserverTag()); + std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker"; + try + { + if (!IsFile(StateMarkerPath)) + { + ZEN_WARN("state marker at {} has been deleted, exiting", StateMarkerPath); + RequestExit(1); + return; + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("state marker at {} could not be checked, reason: '{}'", StateMarkerPath, Ex.what()); + RequestExit(1); + return; + } + EnqueueStateMarkerTimer(); +} + +void +ZenStorageServer::Flush() +{ + ZEN_TRACE_CPU("ZenStorageServer::Flush"); + + if (m_CidStore) + m_CidStore->Flush(); + + if (m_StructuredCacheService) + m_StructuredCacheService->Flush(); + + if (m_ProjectStore) + m_ProjectStore->Flush(); + + if (m_BuildCidStore) + m_BuildCidStore->Flush(); +} + +////////////////////////////////////////////////////////////////////////// + +ZenStorageServerMain::ZenStorageServerMain(ZenStorageServerOptions& ServerOptions) +: ZenServerMain(ServerOptions) +, m_ServerOptions(ServerOptions) +{ +} + +void +ZenStorageServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) +{ + ZenStorageServer Server; + Server.SetDataRoot(m_ServerOptions.DataDir); + Server.SetContentRoot(m_ServerOptions.ContentDir); + Server.SetTestMode(m_ServerOptions.IsTest); + Server.SetDedicatedMode(m_ServerOptions.IsDedicated); + + auto ServerCleanup = MakeGuard([&Server] { Server.Cleanup(); }); + + int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry); + if (EffectiveBasePort == -1) + { + // Server.Initialize has already logged what the issue is - just exit with failure code here. + std::exit(1); + } + + Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); + if (EffectiveBasePort != m_ServerOptions.BasePort) + { + ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); + m_ServerOptions.BasePort = EffectiveBasePort; + } + + std::unique_ptr<std::thread> ShutdownThread; + std::unique_ptr<NamedEvent> ShutdownEvent; + + ExtendableStringBuilder<64> ShutdownEventName; + ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown"; + ShutdownEvent.reset(new NamedEvent{ShutdownEventName}); + + // Monitor shutdown signals + + ShutdownThread.reset(new std::thread{[&] { + SetCurrentThreadName("shutdown_monitor"); + + ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId()); + + if (ShutdownEvent->Wait()) + { + ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId()); + Server.RequestExit(0); + } + else + { + ZEN_INFO("shutdown signal wait() failed"); + } + }}); + + auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] { + ReportServiceStatus(ServiceStatus::Stopping); + + if (ShutdownEvent) + { + ShutdownEvent->Set(); + } + if (ShutdownThread && ShutdownThread->joinable()) + { + ShutdownThread->join(); + } + }); + + // If we have a parent process, establish the mechanisms we need + // to be able to communicate readiness with the parent + + Server.SetIsReadyFunc([&] { + std::error_code Ec; + m_LockFile.Update(MakeLockData(true), Ec); + ReportServiceStatus(ServiceStatus::Running); + NotifyReady(); + }); + + Server.Run(); +} + +} // namespace zen |