aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/storage/zenstorageserver.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2025-10-14 11:32:16 +0200
committerGitHub Enterprise <[email protected]>2025-10-14 11:32:16 +0200
commitca09abbeef5b1788f4a52b61eedd2f3dd07f81f2 (patch)
tree005a50adfddf6982bab3a06bb93d4c50da1a11fd /src/zenserver/storage/zenstorageserver.cpp
parentmake asiohttp work without IPv6 (#562) (diff)
downloadzen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.tar.xz
zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.zip
move all storage-related services into storage tree (#571)
* move all storage-related services into storage tree * move config into config/ * also move admin service into storage since it mostly has storage related functionality * header consolidation
Diffstat (limited to 'src/zenserver/storage/zenstorageserver.cpp')
-rw-r--r--src/zenserver/storage/zenstorageserver.cpp961
1 files changed, 961 insertions, 0 deletions
diff --git a/src/zenserver/storage/zenstorageserver.cpp b/src/zenserver/storage/zenstorageserver.cpp
new file mode 100644
index 000000000..73896512d
--- /dev/null
+++ b/src/zenserver/storage/zenstorageserver.cpp
@@ -0,0 +1,961 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zenstorageserver.h"
+
+#include <zenbase/refcount.h>
+#include <zencore/basicfile.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/config.h>
+#include <zencore/except.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
+#include <zencore/jobqueue.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/sentryintegration.h>
+#include <zencore/session.h>
+#include <zencore/string.h>
+#include <zencore/thread.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
+#include <zenhttp/httpserver.h>
+#include <zenremotestore/jupiter/jupiterclient.h>
+#include <zenstore/buildstore/buildstore.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/scrubcontext.h>
+#include <zenstore/vfsimpl.h>
+#include <zenstore/workspaces.h>
+#include <zenutil/service.h>
+#include <zenutil/workerpools.h>
+#include <zenutil/zenserverprocess.h>
+
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+#endif
+
+#if ZEN_PLATFORM_LINUX
+# include <pwd.h>
+#endif
+
+#if ZEN_PLATFORM_MAC
+# include <pwd.h>
+#endif
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+#include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <exception>
+#include <set>
+
+//////////////////////////////////////////////////////////////////////////
+
+#include "diag/logging.h"
+#include "storageconfig.h"
+
+#include <zencore/memory/llm.h>
+
+namespace zen {
+
+namespace utils {
+ asio::error_code ResolveHostname(asio::io_context& Ctx,
+ std::string_view Host,
+ std::string_view DefaultPort,
+ std::vector<std::string>& OutEndpoints)
+ {
+ std::string_view Port = DefaultPort;
+
+ if (const size_t Idx = Host.find(":"); Idx != std::string_view::npos)
+ {
+ Port = Host.substr(Idx + 1);
+ Host = Host.substr(0, Idx);
+ }
+
+ asio::ip::tcp::resolver Resolver(Ctx);
+
+ asio::error_code ErrorCode;
+ asio::ip::tcp::resolver::results_type Endpoints = Resolver.resolve(Host, Port, ErrorCode);
+
+ if (!ErrorCode)
+ {
+ for (const asio::ip::tcp::endpoint Ep : Endpoints)
+ {
+ OutEndpoints.push_back(fmt::format("http://{}:{}", Ep.address().to_string(), Ep.port()));
+ }
+ }
+
+ return ErrorCode;
+ }
+} // namespace utils
+
+using namespace std::literals;
+
+ZenStorageServer::ZenStorageServer()
+{
+}
+
+ZenStorageServer::~ZenStorageServer()
+{
+}
+
+int
+ZenStorageServer::Initialize(const ZenStorageServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry)
+{
+ ZEN_TRACE_CPU("ZenStorageServer::Initialize");
+ ZEN_MEMSCOPE(GetZenserverTag());
+
+ const int EffectiveBasePort = ZenServerBase::Initialize(ServerOptions, ServerEntry);
+ if (EffectiveBasePort < 0)
+ {
+ return EffectiveBasePort;
+ }
+
+ m_DebugOptionForcedCrash = ServerOptions.ShouldCrash;
+ m_StartupScrubOptions = ServerOptions.ScrubOptions;
+
+ InitializeState(ServerOptions);
+ InitializeServices(ServerOptions);
+ RegisterServices();
+
+ ZenServerBase::Finalize();
+
+ return EffectiveBasePort;
+}
+
+void
+ZenStorageServer::RegisterServices()
+{
+ m_Http->RegisterService(*m_AuthService);
+ m_Http->RegisterService(m_StatsService);
+ m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics
+
+#if ZEN_WITH_TESTS
+ m_Http->RegisterService(m_TestingService);
+#endif
+
+ if (m_StructuredCacheService)
+ {
+ m_Http->RegisterService(*m_StructuredCacheService);
+ }
+
+ if (m_UpstreamService)
+ {
+ m_Http->RegisterService(*m_UpstreamService);
+ }
+
+ if (m_HttpProjectService)
+ {
+ m_Http->RegisterService(*m_HttpProjectService);
+ }
+
+ if (m_HttpWorkspacesService)
+ {
+ m_Http->RegisterService(*m_HttpWorkspacesService);
+ }
+
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService);
+
+ if (m_FrontendService)
+ {
+ m_Http->RegisterService(*m_FrontendService);
+ }
+
+ if (m_ObjStoreService)
+ {
+ m_Http->RegisterService(*m_ObjStoreService);
+ }
+
+ if (m_BuildStoreService)
+ {
+ m_Http->RegisterService(*m_BuildStoreService);
+ }
+
+#if ZEN_WITH_VFS
+ m_Http->RegisterService(*m_VfsService);
+#endif // ZEN_WITH_VFS
+
+ m_Http->RegisterService(*m_AdminService);
+}
+
+void
+ZenStorageServer::InitializeServices(const ZenStorageServerOptions& ServerOptions)
+{
+ InitializeAuthentication(ServerOptions);
+
+ ZEN_INFO("initializing storage");
+
+ CidStoreConfiguration Config;
+ Config.RootDirectory = m_DataRoot / "cas";
+
+ m_CidStore = std::make_unique<CidStore>(m_GcManager);
+ m_CidStore->Initialize(Config);
+
+ ZEN_INFO("instantiating project service");
+
+ m_JobQueue = MakeJobQueue(8, "bgjobs");
+
+ m_ProjectStore = new ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, ProjectStore::Configuration{});
+ m_HttpProjectService.reset(
+ new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatusService, m_StatsService, *m_AuthMgr, *m_OpenProcessCache, *m_JobQueue});
+
+ if (ServerOptions.WorksSpacesConfig.Enabled)
+ {
+ m_Workspaces.reset(new Workspaces());
+ m_HttpWorkspacesService.reset(
+ new HttpWorkspacesService(m_StatusService,
+ m_StatsService,
+ {.SystemRootDir = ServerOptions.SystemRootDir,
+ .AllowConfigurationChanges = ServerOptions.WorksSpacesConfig.AllowConfigurationChanges},
+ *m_Workspaces));
+ }
+
+ if (ServerOptions.BuildStoreConfig.Enabled)
+ {
+ CidStoreConfiguration BuildCidConfig;
+ BuildCidConfig.RootDirectory = m_DataRoot / "builds_cas";
+ m_BuildCidStore = std::make_unique<CidStore>(m_GcManager);
+ m_BuildCidStore->Initialize(BuildCidConfig);
+
+ BuildStoreConfig BuildsCfg;
+ BuildsCfg.RootDirectory = m_DataRoot / "builds";
+ BuildsCfg.MaxDiskSpaceLimit = ServerOptions.BuildStoreConfig.MaxDiskSpaceLimit;
+ m_BuildStore = std::make_unique<BuildStore>(std::move(BuildsCfg), m_GcManager, *m_BuildCidStore);
+ }
+
+ if (ServerOptions.StructuredCacheConfig.Enabled)
+ {
+ InitializeStructuredCache(ServerOptions);
+ }
+ else
+ {
+ ZEN_INFO("NOT instantiating structured cache service");
+ }
+
+ if (ServerOptions.ObjectStoreEnabled)
+ {
+ ObjectStoreConfig ObjCfg;
+ ObjCfg.RootDirectory = m_DataRoot / "obj";
+
+ for (const auto& Bucket : ServerOptions.ObjectStoreConfig.Buckets)
+ {
+ ObjectStoreConfig::BucketConfig NewBucket{.Name = Bucket.Name};
+ NewBucket.Directory = Bucket.Directory.empty() ? (ObjCfg.RootDirectory / Bucket.Name) : Bucket.Directory;
+ ObjCfg.Buckets.push_back(std::move(NewBucket));
+ }
+
+ m_ObjStoreService = std::make_unique<HttpObjectStoreService>(m_StatusService, std::move(ObjCfg));
+ }
+
+ if (ServerOptions.BuildStoreConfig.Enabled)
+ {
+ m_BuildStoreService = std::make_unique<HttpBuildStoreService>(m_StatusService, m_StatsService, *m_BuildStore);
+ }
+
+#if ZEN_WITH_VFS
+ m_VfsServiceImpl = std::make_unique<VfsServiceImpl>();
+ m_VfsServiceImpl->AddService(Ref<ProjectStore>(m_ProjectStore));
+ m_VfsServiceImpl->AddService(Ref<ZenCacheStore>(m_CacheStore));
+
+ m_VfsService = std::make_unique<VfsService>(m_StatusService, m_VfsServiceImpl.get());
+#endif // ZEN_WITH_VFS
+
+ ZEN_INFO("initializing GC, enabled '{}', interval {}, lightweight interval {}",
+ ServerOptions.GcConfig.Enabled,
+ NiceTimeSpanMs(ServerOptions.GcConfig.IntervalSeconds * 1000ull),
+ NiceTimeSpanMs(ServerOptions.GcConfig.LightweightIntervalSeconds * 1000ull));
+
+ GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc",
+ .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds),
+ .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds),
+ .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds),
+ .MaxProjectStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds),
+ .MaxBuildStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.BuildStore.MaxDurationSeconds),
+ .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects,
+ .Enabled = ServerOptions.GcConfig.Enabled,
+ .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize,
+ .DiskSizeSoftLimit = ServerOptions.GcConfig.DiskSizeSoftLimit,
+ .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites,
+ .LightweightInterval = std::chrono::seconds(ServerOptions.GcConfig.LightweightIntervalSeconds),
+ .UseGCVersion = ServerOptions.GcConfig.UseGCV2 ? GcVersion::kV2 : GcVersion::kV1_Deprecated,
+ .CompactBlockUsageThresholdPercent = ServerOptions.GcConfig.CompactBlockUsageThresholdPercent,
+ .Verbose = ServerOptions.GcConfig.Verbose,
+ .SingleThreaded = ServerOptions.GcConfig.SingleThreaded,
+ .AttachmentPassCount = ServerOptions.GcConfig.AttachmentPassCount};
+ m_GcScheduler.Initialize(GcConfig);
+
+ // Create and register admin interface last to make sure all is properly initialized
+ m_AdminService = std::make_unique<HttpAdminService>(
+ m_GcScheduler,
+ *m_JobQueue,
+ m_CacheStore.Get(),
+ [this]() { Flush(); },
+ HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile,
+ .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log",
+ .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"},
+ ServerOptions);
+}
+
+void
+ZenStorageServer::InitializeAuthentication(const ZenStorageServerOptions& ServerOptions)
+{
+ // Setup authentication manager
+ {
+ ZEN_TRACE_CPU("ZenStorageServer::InitAuth");
+ std::string EncryptionKey = ServerOptions.EncryptionKey;
+
+ if (EncryptionKey.empty())
+ {
+ EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456";
+
+ if (ServerOptions.IsDedicated)
+ {
+ ZEN_WARN("Using default encryption key for authentication state");
+ }
+ }
+
+ std::string EncryptionIV = ServerOptions.EncryptionIV;
+
+ if (EncryptionIV.empty())
+ {
+ EncryptionIV = "0123456789abcdef";
+
+ if (ServerOptions.IsDedicated)
+ {
+ ZEN_WARN("Using default encryption initialization vector for authentication state");
+ }
+ }
+
+ m_AuthMgr = AuthMgr::Create({.RootDirectory = m_DataRoot / "auth",
+ .EncryptionKey = AesKey256Bit::FromString(EncryptionKey),
+ .EncryptionIV = AesIV128Bit::FromString(EncryptionIV)});
+
+ for (const ZenOpenIdProviderConfig& OpenIdProvider : ServerOptions.AuthConfig.OpenIdProviders)
+ {
+ m_AuthMgr->AddOpenIdProvider({.Name = OpenIdProvider.Name, .Url = OpenIdProvider.Url, .ClientId = OpenIdProvider.ClientId});
+ }
+ }
+
+ m_AuthService = std::make_unique<HttpAuthService>(*m_AuthMgr);
+}
+
+void
+ZenStorageServer::InitializeState(const ZenStorageServerOptions& ServerOptions)
+{
+ ZEN_TRACE_CPU("ZenStorageServer::InitializeState");
+
+ // Check root manifest to deal with schema versioning
+
+ bool WipeState = false;
+ std::string WipeReason = "Unspecified";
+
+ if (ServerOptions.IsCleanStart)
+ {
+ WipeState = true;
+ WipeReason = "clean start requested";
+ }
+
+ bool UpdateManifest = false;
+ std::filesystem::path ManifestPath = m_DataRoot / "root_manifest";
+ Oid StateId = Oid::Zero;
+ DateTime CreatedWhen{0};
+
+ if (!WipeState)
+ {
+ FileContents ManifestData = ReadFile(ManifestPath);
+
+ if (ManifestData.ErrorCode)
+ {
+ if (ServerOptions.IsFirstRun)
+ {
+ ZEN_INFO("Initializing state at '{}'", m_DataRoot);
+
+ UpdateManifest = true;
+ }
+ else
+ {
+ WipeState = true;
+ WipeReason = fmt::format("No manifest present at '{}'", ManifestPath);
+ }
+ }
+ else
+ {
+ IoBuffer Manifest = ManifestData.Flatten();
+
+ if (CbValidateError ValidationResult = ValidateCompactBinary(Manifest, CbValidateMode::All);
+ ValidationResult != CbValidateError::None)
+ {
+ ZEN_WARN("Manifest validation failed: {}, state will be wiped", zen::ToString(ValidationResult));
+
+ WipeState = true;
+ WipeReason = fmt::format("Validation of manifest at '{}' failed: {}", ManifestPath, zen::ToString(ValidationResult));
+ }
+ else
+ {
+ m_RootManifest = LoadCompactBinaryObject(Manifest);
+
+ const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0);
+ StateId = m_RootManifest["state_id"].AsObjectId();
+ CreatedWhen = m_RootManifest["created"].AsDateTime();
+
+ if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION)
+ {
+ std::filesystem::path ManifestSkipSchemaChangePath = m_DataRoot / "root_manifest.ignore_schema_mismatch";
+ if (ManifestVersion != 0 && IsFile(ManifestSkipSchemaChangePath))
+ {
+ ZEN_INFO(
+ "Schema version {} found in '{}' does not match {}, ignoring mismatch due to existance of '{}' and updating "
+ "schema version",
+ ManifestVersion,
+ ManifestPath,
+ ZEN_CFG_SCHEMA_VERSION,
+ ManifestSkipSchemaChangePath);
+ UpdateManifest = true;
+ }
+ else
+ {
+ WipeState = true;
+ WipeReason =
+ fmt::format("Manifest schema version: {}, differs from required: {}", ManifestVersion, ZEN_CFG_SCHEMA_VERSION);
+ }
+ }
+ }
+ }
+ }
+
+ if (StateId == Oid::Zero)
+ {
+ StateId = Oid::NewOid();
+ UpdateManifest = true;
+ }
+
+ const DateTime Now = DateTime::Now();
+
+ if (CreatedWhen.GetTicks() == 0)
+ {
+ CreatedWhen = Now;
+ UpdateManifest = true;
+ }
+
+ // Handle any state wipe
+
+ if (WipeState)
+ {
+ ZEN_WARN("Wiping state at '{}' - reason: '{}'", m_DataRoot, WipeReason);
+
+ std::error_code Ec;
+ for (const std::filesystem::directory_entry& DirEntry : std::filesystem::directory_iterator{m_DataRoot, Ec})
+ {
+ if (DirEntry.is_directory() && (DirEntry.path().filename() != "logs"))
+ {
+ ZEN_INFO("Deleting '{}'", DirEntry.path());
+
+ DeleteDirectories(DirEntry.path(), Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Delete of '{}' returned error: '{}'", DirEntry.path(), Ec.message());
+ }
+ }
+ }
+
+ ZEN_INFO("Wiped all directories in data root");
+
+ UpdateManifest = true;
+ }
+
+ // Write manifest
+
+ {
+ CbObjectWriter Cbo;
+ Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << CreatedWhen << "updated" << Now << "state_id" << StateId;
+
+ m_RootManifest = Cbo.Save();
+
+ if (UpdateManifest)
+ {
+ TemporaryFile::SafeWriteFile(ManifestPath, m_RootManifest.GetBuffer().GetView());
+ }
+
+ if (!ServerOptions.IsTest)
+ {
+ try
+ {
+ EmitCentralManifest(ServerOptions.SystemRootDir, StateId, m_RootManifest, ManifestPath);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Unable to emit central manifest: ", Ex.what());
+ }
+ }
+ }
+
+ // Write state marker
+
+ {
+ std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker";
+ static const std::string_view StateMarkerContent = "deleting this file will cause " ZEN_APP_NAME " to exit"sv;
+ WriteFile(StateMarkerPath, IoBuffer(IoBuffer::Wrap, StateMarkerContent.data(), StateMarkerContent.size()));
+
+ EnqueueStateMarkerTimer();
+ }
+
+ EnqueueStateExitFlagTimer();
+}
+
+void
+ZenStorageServer::InitializeStructuredCache(const ZenStorageServerOptions& ServerOptions)
+{
+ ZEN_TRACE_CPU("ZenStorageServer::InitializeStructuredCache");
+
+ using namespace std::literals;
+
+ ZEN_INFO("instantiating structured cache service");
+ ZenCacheStore::Configuration Config;
+ Config.AllowAutomaticCreationOfNamespaces = true;
+ Config.Logging = {.EnableWriteLog = ServerOptions.StructuredCacheConfig.WriteLogEnabled,
+ .EnableAccessLog = ServerOptions.StructuredCacheConfig.AccessLogEnabled};
+
+ for (const auto& It : ServerOptions.StructuredCacheConfig.PerBucketConfigs)
+ {
+ const std::string& BucketName = It.first;
+ const ZenStructuredCacheBucketConfig& ZenBucketConfig = It.second;
+ ZenCacheDiskLayer::BucketConfiguration BucketConfig = {.MaxBlockSize = ZenBucketConfig.MaxBlockSize,
+ .PayloadAlignment = ZenBucketConfig.PayloadAlignment,
+ .MemCacheSizeThreshold = ZenBucketConfig.MemCacheSizeThreshold,
+ .LargeObjectThreshold = ZenBucketConfig.LargeObjectThreshold,
+ .LimitOverwrites = ZenBucketConfig.LimitOverwrites};
+ Config.NamespaceConfig.DiskLayerConfig.BucketConfigMap.insert_or_assign(BucketName, BucketConfig);
+ }
+ Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MaxBlockSize = ServerOptions.StructuredCacheConfig.BucketConfig.MaxBlockSize,
+ Config.NamespaceConfig.DiskLayerConfig.BucketConfig.PayloadAlignment =
+ ServerOptions.StructuredCacheConfig.BucketConfig.PayloadAlignment,
+ Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold =
+ ServerOptions.StructuredCacheConfig.BucketConfig.MemCacheSizeThreshold,
+ Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LargeObjectThreshold =
+ ServerOptions.StructuredCacheConfig.BucketConfig.LargeObjectThreshold,
+ Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LimitOverwrites = ServerOptions.StructuredCacheConfig.BucketConfig.LimitOverwrites;
+ Config.NamespaceConfig.DiskLayerConfig.MemCacheTargetFootprintBytes = ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes;
+ Config.NamespaceConfig.DiskLayerConfig.MemCacheTrimIntervalSeconds = ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds;
+ Config.NamespaceConfig.DiskLayerConfig.MemCacheMaxAgeSeconds = ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds;
+
+ if (ServerOptions.IsDedicated)
+ {
+ Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LargeObjectThreshold = 128 * 1024 * 1024;
+ }
+
+ m_CacheStore = new ZenCacheStore(m_GcManager, *m_JobQueue, m_DataRoot / "cache", Config, m_GcManager.GetDiskWriteBlocker());
+ m_OpenProcessCache = std::make_unique<OpenProcessCache>();
+
+ const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;
+
+ UpstreamCacheOptions UpstreamOptions;
+ UpstreamOptions.ReadUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
+ UpstreamOptions.WriteUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
+
+ if (UpstreamConfig.UpstreamThreadCount < 32)
+ {
+ UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
+ }
+
+ m_UpstreamCache = CreateUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
+ m_UpstreamService = std::make_unique<HttpUpstreamService>(*m_UpstreamCache, *m_AuthMgr);
+ m_UpstreamCache->Initialize();
+
+ if (ServerOptions.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
+ {
+ // Zen upstream
+ {
+ std::vector<std::string> ZenUrls = UpstreamConfig.ZenConfig.Urls;
+ if (!UpstreamConfig.ZenConfig.Dns.empty())
+ {
+ for (const std::string& Dns : UpstreamConfig.ZenConfig.Dns)
+ {
+ if (!Dns.empty())
+ {
+ const asio::error_code Err = utils::ResolveHostname(m_IoContext, Dns, "8558"sv, ZenUrls);
+ if (Err)
+ {
+ ZEN_ERROR("resolve of '{}' FAILED, reason '{}'", Dns, Err.message());
+ }
+ }
+ }
+ }
+
+ std::erase_if(ZenUrls, [](const auto& Url) { return Url.empty(); });
+
+ if (!ZenUrls.empty())
+ {
+ const auto ZenEndpointName = UpstreamConfig.ZenConfig.Name.empty() ? "Zen"sv : UpstreamConfig.ZenConfig.Name;
+
+ std::unique_ptr<UpstreamEndpoint> ZenEndpoint = UpstreamEndpoint::CreateZenEndpoint(
+ {.Name = ZenEndpointName,
+ .Urls = ZenUrls,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)});
+
+ m_UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
+ }
+ }
+
+ // Jupiter upstream
+ if (UpstreamConfig.JupiterConfig.Url.empty() == false)
+ {
+ std::string_view EndpointName = UpstreamConfig.JupiterConfig.Name.empty() ? "Jupiter"sv : UpstreamConfig.JupiterConfig.Name;
+
+ auto Options = JupiterClientOptions{.Name = EndpointName,
+ .ServiceUrl = UpstreamConfig.JupiterConfig.Url,
+ .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace,
+ .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
+
+ auto AuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl,
+ .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId,
+ .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret,
+ .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider,
+ .AccessToken = UpstreamConfig.JupiterConfig.AccessToken};
+
+ std::unique_ptr<UpstreamEndpoint> JupiterEndpoint = UpstreamEndpoint::CreateJupiterEndpoint(Options, AuthConfig, *m_AuthMgr);
+
+ m_UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
+ }
+ }
+
+ m_StructuredCacheService = std::make_unique<HttpStructuredCacheService>(*m_CacheStore,
+ *m_CidStore,
+ m_StatsService,
+ m_StatusService,
+ *m_UpstreamCache,
+ m_GcManager.GetDiskWriteBlocker(),
+ *m_OpenProcessCache);
+
+ m_StatsReporter.AddProvider(m_CacheStore.Get());
+ m_StatsReporter.AddProvider(m_CidStore.get());
+ m_StatsReporter.AddProvider(m_BuildCidStore.get());
+}
+
+void
+ZenStorageServer::Run()
+{
+ if (m_ProcessMonitor.IsActive())
+ {
+ CheckOwnerPid();
+ }
+
+ if (!m_TestMode)
+ {
+ ZEN_INFO(
+ "__________ _________ __ \n"
+ "\\____ /____ ____ / _____// |_ ___________ ____ \n"
+ " / // __ \\ / \\ \\_____ \\\\ __\\/ _ \\_ __ \\_/ __ \\ \n"
+ " / /\\ ___/| | \\ / \\| | ( <_> ) | \\/\\ ___/ \n"
+ "/_______ \\___ >___| / /_______ /|__| \\____/|__| \\___ >\n"
+ " \\/ \\/ \\/ \\/ \\/ \n");
+ }
+
+ ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", GetCurrentProcessId());
+
+#if ZEN_PLATFORM_WINDOWS
+ if (zen::windows::IsRunningOnWine())
+ {
+ ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well");
+ }
+#endif
+
+#if ZEN_USE_SENTRY
+ ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED");
+ if (m_UseSentry)
+ {
+ SentryIntegration::ClearCaches();
+ }
+#endif
+
+ if (m_DebugOptionForcedCrash)
+ {
+ ZEN_DEBUG_BREAK();
+ }
+
+ const bool IsInteractiveMode = IsInteractiveSession() && !m_TestMode;
+
+ SetNewState(kRunning);
+
+ OnReady();
+
+ if (!m_StartupScrubOptions.empty())
+ {
+ using namespace std::literals;
+
+ ZEN_INFO("triggering scrub with settings: '{}'", m_StartupScrubOptions);
+
+ bool DoScrub = true;
+ bool DoWait = false;
+ GcScheduler::TriggerScrubParams ScrubParams;
+
+ ForEachStrTok(m_StartupScrubOptions, ',', [&](std::string_view Token) {
+ if (Token == "nocas"sv)
+ {
+ ScrubParams.SkipCas = true;
+ }
+ else if (Token == "nodelete"sv)
+ {
+ ScrubParams.SkipDelete = true;
+ }
+ else if (Token == "nogc"sv)
+ {
+ ScrubParams.SkipGc = true;
+ }
+ else if (Token == "no"sv)
+ {
+ DoScrub = false;
+ }
+ else if (Token == "wait"sv)
+ {
+ DoWait = true;
+ }
+ return true;
+ });
+
+ if (DoScrub)
+ {
+ m_GcScheduler.TriggerScrub(ScrubParams);
+
+ if (DoWait)
+ {
+ auto State = m_GcScheduler.Status();
+
+ while ((State != GcSchedulerStatus::kRunning) && (State != GcSchedulerStatus::kStopped))
+ {
+ Sleep(500);
+
+ State = m_GcScheduler.Status();
+ }
+
+ ZEN_INFO("waiting for Scrub/GC to complete...");
+
+ while (State == GcSchedulerStatus::kRunning)
+ {
+ Sleep(500);
+
+ State = m_GcScheduler.Status();
+ }
+
+ ZEN_INFO("Scrub/GC completed");
+ }
+ }
+ }
+
+ if (m_IsPowerCycle)
+ {
+ ZEN_INFO("Power cycle mode enabled -- shutting down");
+ RequestExit(0);
+ }
+
+ m_Http->Run(IsInteractiveMode);
+
+ SetNewState(kShuttingDown);
+
+ ZEN_INFO(ZEN_APP_NAME " exiting");
+}
+
+void
+ZenStorageServer::Cleanup()
+{
+ ZEN_TRACE_CPU("ZenStorageServer::Cleanup");
+ ZEN_INFO(ZEN_APP_NAME " cleaning up");
+ try
+ {
+ m_IoContext.stop();
+ if (m_IoRunner.joinable())
+ {
+ m_IoRunner.join();
+ }
+
+ if (m_Http)
+ {
+ m_Http->Close();
+ }
+
+ if (m_JobQueue)
+ {
+ m_JobQueue->Stop();
+ }
+
+ m_StatsReporter.Shutdown();
+ m_GcScheduler.Shutdown();
+
+ Flush();
+
+ m_AdminService.reset();
+ m_VfsService.reset();
+ m_VfsServiceImpl.reset();
+ m_ObjStoreService.reset();
+ m_FrontendService.reset();
+
+ m_BuildStoreService.reset();
+ m_BuildStore = {};
+ m_BuildCidStore.reset();
+
+ m_StructuredCacheService.reset();
+ m_UpstreamService.reset();
+ m_UpstreamCache.reset();
+ m_CacheStore = {};
+ m_OpenProcessCache.reset();
+
+ m_HttpWorkspacesService.reset();
+ m_Workspaces.reset();
+ m_HttpProjectService.reset();
+ m_ProjectStore = {};
+ m_CidStore.reset();
+ m_AuthService.reset();
+ m_AuthMgr.reset();
+ m_Http = {};
+
+ ShutdownWorkerPools();
+
+ m_JobQueue.reset();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what());
+ }
+}
+
+void
+ZenStorageServer::EnqueueStateMarkerTimer()
+{
+ ZEN_MEMSCOPE(GetZenserverTag());
+ m_StateMarkerTimer.expires_after(std::chrono::seconds(5));
+ m_StateMarkerTimer.async_wait([this](const asio::error_code&) { CheckStateMarker(); });
+ EnsureIoRunner();
+}
+
+void
+ZenStorageServer::CheckStateMarker()
+{
+ ZEN_MEMSCOPE(GetZenserverTag());
+ std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker";
+ try
+ {
+ if (!IsFile(StateMarkerPath))
+ {
+ ZEN_WARN("state marker at {} has been deleted, exiting", StateMarkerPath);
+ RequestExit(1);
+ return;
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("state marker at {} could not be checked, reason: '{}'", StateMarkerPath, Ex.what());
+ RequestExit(1);
+ return;
+ }
+ EnqueueStateMarkerTimer();
+}
+
+void
+ZenStorageServer::Flush()
+{
+ ZEN_TRACE_CPU("ZenStorageServer::Flush");
+
+ if (m_CidStore)
+ m_CidStore->Flush();
+
+ if (m_StructuredCacheService)
+ m_StructuredCacheService->Flush();
+
+ if (m_ProjectStore)
+ m_ProjectStore->Flush();
+
+ if (m_BuildCidStore)
+ m_BuildCidStore->Flush();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenStorageServerMain::ZenStorageServerMain(ZenStorageServerOptions& ServerOptions)
+: ZenServerMain(ServerOptions)
+, m_ServerOptions(ServerOptions)
+{
+}
+
+void
+ZenStorageServerMain::DoRun(ZenServerState::ZenServerEntry* Entry)
+{
+ ZenStorageServer Server;
+ Server.SetDataRoot(m_ServerOptions.DataDir);
+ Server.SetContentRoot(m_ServerOptions.ContentDir);
+ Server.SetTestMode(m_ServerOptions.IsTest);
+ Server.SetDedicatedMode(m_ServerOptions.IsDedicated);
+
+ auto ServerCleanup = MakeGuard([&Server] { Server.Cleanup(); });
+
+ int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry);
+ if (EffectiveBasePort == -1)
+ {
+ // Server.Initialize has already logged what the issue is - just exit with failure code here.
+ std::exit(1);
+ }
+
+ Entry->EffectiveListenPort = uint16_t(EffectiveBasePort);
+ if (EffectiveBasePort != m_ServerOptions.BasePort)
+ {
+ ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort);
+ m_ServerOptions.BasePort = EffectiveBasePort;
+ }
+
+ std::unique_ptr<std::thread> ShutdownThread;
+ std::unique_ptr<NamedEvent> ShutdownEvent;
+
+ ExtendableStringBuilder<64> ShutdownEventName;
+ ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown";
+ ShutdownEvent.reset(new NamedEvent{ShutdownEventName});
+
+ // Monitor shutdown signals
+
+ ShutdownThread.reset(new std::thread{[&] {
+ SetCurrentThreadName("shutdown_monitor");
+
+ ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId());
+
+ if (ShutdownEvent->Wait())
+ {
+ ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId());
+ Server.RequestExit(0);
+ }
+ else
+ {
+ ZEN_INFO("shutdown signal wait() failed");
+ }
+ }});
+
+ auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] {
+ ReportServiceStatus(ServiceStatus::Stopping);
+
+ if (ShutdownEvent)
+ {
+ ShutdownEvent->Set();
+ }
+ if (ShutdownThread && ShutdownThread->joinable())
+ {
+ ShutdownThread->join();
+ }
+ });
+
+ // If we have a parent process, establish the mechanisms we need
+ // to be able to communicate readiness with the parent
+
+ Server.SetIsReadyFunc([&] {
+ std::error_code Ec;
+ m_LockFile.Update(MakeLockData(true), Ec);
+ ReportServiceStatus(ServiceStatus::Running);
+ NotifyReady();
+ });
+
+ Server.Run();
+}
+
+} // namespace zen