// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_PLATFORM_WINDOWS # include #endif #if ZEN_USE_MIMALLOC ZEN_THIRD_PARTY_INCLUDES_START # include # include ZEN_THIRD_PARTY_INCLUDES_END #endif ZEN_THIRD_PARTY_INCLUDES_START #include #include #include ZEN_THIRD_PARTY_INCLUDES_END #include #include #include #include #include #include ////////////////////////////////////////////////////////////////////////// // We don't have any doctest code in this file but this is needed to bring // in some shared code into the executable #if ZEN_WITH_TESTS # define ZEN_TEST_WITH_RUNNER # include #endif ////////////////////////////////////////////////////////////////////////// #include "casstore.h" #include "config.h" #include "diag/logging.h" #if ZEN_PLATFORM_WINDOWS # include "windows/service.h" #endif ////////////////////////////////////////////////////////////////////////// // Sentry // #if !defined(ZEN_USE_SENTRY) # if ZEN_PLATFORM_MAC && ZEN_ARCH_ARM64 // vcpkg's sentry-native port does not support Arm on Mac. # define ZEN_USE_SENTRY 0 # else # define ZEN_USE_SENTRY 1 # endif #endif #if ZEN_USE_SENTRY # define SENTRY_BUILD_STATIC 1 ZEN_THIRD_PARTY_INCLUDES_START # include ZEN_THIRD_PARTY_INCLUDES_END // Sentry currently does not automatically add all required Windows // libraries to the linker when consumed via vcpkg # if ZEN_PLATFORM_WINDOWS # pragma comment(lib, "sentry.lib") # pragma comment(lib, "dbghelp.lib") # pragma comment(lib, "winhttp.lib") # pragma comment(lib, "version.lib") # endif #endif ////////////////////////////////////////////////////////////////////////// // Services // #include "admin/admin.h" #include "auth/authmgr.h" #include "auth/authservice.h" #include "cache/structuredcache.h" #include "cache/structuredcachestore.h" #include "compute/function.h" #include "diag/diagsvcs.h" #include "experimental/usnjournal.h" #include "frontend/frontend.h" #include "monitoring/httpstats.h" #include "monitoring/httpstatus.h" #include "projectstore.h" #include "testing/httptest.h" #include "testing/launch.h" #include "upstream/upstream.h" #include "zenstore/gc.h" #include "zenstore/scrub.h" #define ZEN_APP_NAME "Zen store" namespace zen { using namespace std::literals; namespace utils { asio::error_code ResolveHostname(asio::io_context& Ctx, std::string_view Host, std::string_view DefaultPort, std::vector& OutEndpoints) { std::string_view Port = DefaultPort; if (const size_t Idx = Host.find(":"); Idx != std::string_view::npos) { Port = Host.substr(Idx + 1); Host = Host.substr(0, Idx); } asio::ip::tcp::resolver Resolver(Ctx); asio::error_code ErrorCode; asio::ip::tcp::resolver::results_type Endpoints = Resolver.resolve(Host, Port, ErrorCode); if (!ErrorCode) { for (const asio::ip::tcp::endpoint Ep : Endpoints) { OutEndpoints.push_back(fmt::format("http://{}:{}", Ep.address().to_string(), Ep.port())); } } return ErrorCode; } } // namespace utils class ZenServer : public IHttpStatusProvider { public: int Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry) { m_UseSentry = ServerOptions.NoSentry == false; m_ServerEntry = ServerEntry; m_DebugOptionForcedCrash = ServerOptions.ShouldCrash; const int ParentPid = ServerOptions.OwnerPid; if (ParentPid) { zen::ProcessHandle OwnerProcess; OwnerProcess.Initialize(ParentPid); if (!OwnerProcess.IsValid()) { ZEN_WARN("Unable to initialize process handle for specified parent pid #{}", ParentPid); // If the pid is not reachable should we just shut down immediately? the intended owner process // could have been killed or somehow crashed already } else { ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid); } m_ProcessMonitor.AddPid(ParentPid); } // Initialize/check mutex based on base port std::string MutexName = fmt::format("zen_{}", ServerOptions.BasePort); if (zen::NamedMutex::Exists(MutexName) || ((m_ServerMutex.Create(MutexName) == false))) { throw std::runtime_error(fmt::format("Failed to create mutex '{}' - is another instance already running?", MutexName).c_str()); } InitializeState(ServerOptions); m_HealthService.SetHealthInfo({.DataRoot = m_DataRoot, .AbsLogPath = ServerOptions.AbsLogFile, .HttpServerClass = std::string(ServerOptions.HttpServerClass), .BuildVersion = std::string(ZEN_CFG_VERSION_BUILD_STRING_FULL)}); // Ok so now we're configured, let's kick things off m_Http = zen::CreateHttpServer(ServerOptions.HttpServerClass); int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort); if (ServerOptions.WebSocketPort != 0) { const uint32 ThreadCount = ServerOptions.WebSocketThreads > 0 ? uint32_t(ServerOptions.WebSocketThreads) : std::thread::hardware_concurrency(); m_WebSocket = zen::WebSocketServer::Create( {.Port = gsl::narrow(ServerOptions.WebSocketPort), .ThreadCount = Max(ThreadCount, uint32_t(16))}); } // Setup authentication manager { std::string EncryptionKey = ServerOptions.EncryptionKey; if (EncryptionKey.empty()) { EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456"; ZEN_WARN("using default encryption key"); } std::string EncryptionIV = ServerOptions.EncryptionIV; if (EncryptionIV.empty()) { EncryptionIV = "0123456789abcdef"; ZEN_WARN("using default encryption initialization vector"); } m_AuthMgr = AuthMgr::Create({.RootDirectory = m_DataRoot / "auth", .EncryptionKey = AesKey256Bit::FromString(EncryptionKey), .EncryptionIV = AesIV128Bit::FromString(EncryptionIV)}); m_AuthMgr->AddOpenIdProvider({.Name = "Okta"sv, .Url = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7"sv, .ClientId = "0oapq1knoglGFqQvr0x7"sv}); } m_AuthService = std::make_unique(*m_AuthMgr); m_Http->RegisterService(*m_AuthService); m_Http->RegisterService(m_HealthService); m_Http->RegisterService(m_StatsService); m_Http->RegisterService(m_StatusService); m_StatusService.RegisterHandler("status", *this); // Initialize storage and services ZEN_INFO("initializing storage"); zen::CasStoreConfiguration Config; Config.RootDirectory = m_DataRoot / "cas"; m_CasStore->Initialize(Config); m_CidStore = std::make_unique(*m_CasStore, m_DataRoot / "cid"); m_CasGc.SetCidStore(m_CidStore.get()); ZEN_INFO("instantiating project service"); m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_CasGc); m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore}); #if ZEN_USE_NAMED_PIPES m_LocalProjectService = zen::LocalProjectService::New(*m_CasStore, m_ProjectStore); #endif #if ZEN_WITH_EXEC_SERVICES if (ServerOptions.ExecServiceEnabled) { ZEN_INFO("instantiating exec service"); std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox"; zen::CreateDirectories(SandboxDir); m_HttpLaunchService = std::make_unique(*m_CasStore, SandboxDir); } else { ZEN_INFO("NOT instantiating exec services"); } #endif // ZEN_WITH_EXEC_SERVICES #if ZEN_WITH_COMPUTE_SERVICES if (ServerOptions.ComputeServiceEnabled) { InitializeCompute(ServerOptions); } else { ZEN_INFO("NOT instantiating compute services"); } #endif // ZEN_WITH_COMPUTE_SERVICES if (ServerOptions.StructuredCacheEnabled) { InitializeStructuredCache(ServerOptions); } else { ZEN_INFO("NOT instantiating structured cache service"); } #if ZEN_ENABLE_MESH if (ServerOptions.MeshEnabled) { StartMesh(EffectiveBasePort); } else { ZEN_INFO("NOT starting mesh"); } #endif m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics m_Http->RegisterService(m_TestingService); m_Http->RegisterService(m_AdminService); if (m_WebSocket) { m_WebSocket->RegisterService(m_TestingService); } if (m_HttpProjectService) { m_Http->RegisterService(*m_HttpProjectService); } m_Http->RegisterService(m_CasService); #if ZEN_WITH_EXEC_SERVICES if (ServerOptions.ExecServiceEnabled) { if (m_HttpLaunchService != nullptr) { m_Http->RegisterService(*m_HttpLaunchService); } } #endif // ZEN_WITH_EXEC_SERVICES #if ZEN_WITH_COMPUTE_SERVICES if (ServerOptions.ComputeServiceEnabled) { if (m_HttpFunctionService != nullptr) { m_Http->RegisterService(*m_HttpFunctionService); } } #endif // ZEN_WITH_COMPUTE_SERVICES m_FrontendService = std::make_unique(m_ContentRoot); if (m_FrontendService) { m_Http->RegisterService(*m_FrontendService); } ZEN_INFO("initializing GC, enabled '{}', interval {}s", ServerOptions.GcConfig.Enabled, ServerOptions.GcConfig.IntervalSeconds); zen::GcSchedulerConfig GcConfig{ .RootDirectory = m_DataRoot / "gc", .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, .Enabled = ServerOptions.GcConfig.Enabled, .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, }; m_GcScheduler.Initialize(GcConfig); return EffectiveBasePort; } void InitializeState(const ZenServerOptions& ServerOptions); void InitializeStructuredCache(const ZenServerOptions& ServerOptions); void InitializeCompute(const ZenServerOptions& ServerOptions); #if ZEN_ENABLE_MESH void StartMesh(int BasePort) { ZEN_INFO("initializing mesh discovery"); m_ZenMesh.Start(uint16_t(BasePort)); } #endif void Run() { // This is disabled for now, awaiting better scheduling // // Scrub(); if (m_ProcessMonitor.IsActive()) { EnqueueTimer(); } if (!m_TestMode) { ZEN_INFO("__________ _________ __ "); ZEN_INFO("\\____ /____ ____ / _____// |_ ___________ ____ "); ZEN_INFO(" / // __ \\ / \\ \\_____ \\\\ __\\/ _ \\_ __ \\_/ __ \\ "); ZEN_INFO(" / /\\ ___/| | \\ / \\| | ( <_> ) | \\/\\ ___/ "); ZEN_INFO("/_______ \\___ >___| / /_______ /|__| \\____/|__| \\___ >"); ZEN_INFO(" \\/ \\/ \\/ \\/ \\/ "); } ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", zen::GetCurrentProcessId()); #if ZEN_USE_SENTRY ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); if (m_UseSentry) { sentry_clear_modulecache(); } #endif if (m_DebugOptionForcedCrash) { ZEN_DEBUG_BREAK(); } const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode; SetNewState(kRunning); OnReady(); if (m_WebSocket) { m_WebSocket->Run(); } m_Http->Run(IsInteractiveMode); SetNewState(kShuttingDown); ZEN_INFO(ZEN_APP_NAME " exiting"); m_IoContext.stop(); if (m_IoRunner.joinable()) { m_IoRunner.join(); } Flush(); } void RequestExit(int ExitCode) { RequestApplicationExit(ExitCode); m_Http->RequestExit(); } void Cleanup() { ZEN_INFO(ZEN_APP_NAME " cleaning up"); m_GcScheduler.Shutdown(); } void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } void SetTestMode(bool State) { m_TestMode = State; } void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } std::function m_IsReadyFunc; void SetIsReadyFunc(std::function&& IsReadyFunc) { m_IsReadyFunc = std::move(IsReadyFunc); } void OnReady(); void EnsureIoRunner() { if (!m_IoRunner.joinable()) { m_IoRunner = std::thread{[this] { m_IoContext.run(); }}; } } void EnqueueTimer() { m_PidCheckTimer.expires_after(std::chrono::seconds(1)); m_PidCheckTimer.async_wait([this](const asio::error_code&) { CheckOwnerPid(); }); EnsureIoRunner(); } void CheckOwnerPid() { // Pick up any new "owner" processes std::set AddedPids; for (auto& PidEntry : m_ServerEntry->SponsorPids) { if (uint32_t ThisPid = PidEntry.load(std::memory_order_relaxed)) { if (PidEntry.compare_exchange_strong(ThisPid, 0)) { if (AddedPids.insert(ThisPid).second) { m_ProcessMonitor.AddPid(ThisPid); ZEN_INFO("added process with pid #{} as a sponsor process", ThisPid); } } } } if (m_ProcessMonitor.IsRunning()) { EnqueueTimer(); } else { ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone"); RequestExit(0); } } void Scrub() { Stopwatch Timer; ZEN_INFO("Storage validation STARTING"); ScrubContext Ctx; m_CasStore->Scrub(Ctx); m_CidStore->Scrub(Ctx); m_ProjectStore->Scrub(Ctx); m_StructuredCacheService->Scrub(Ctx); const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})", NiceTimeSpanMs(ElapsedTimeMs), NiceBytes(Ctx.ScrubbedBytes()), Ctx.ScrubbedChunks(), NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs)); } void Flush() { if (m_CasStore) m_CasStore->Flush(); if (m_CidStore) m_CidStore->Flush(); if (m_StructuredCacheService) m_StructuredCacheService->Flush(); if (m_ProjectStore) m_ProjectStore->Flush(); } virtual void HandleStatusRequest(HttpServerRequest& Request) override { CbObjectWriter Cbo; Cbo << "ok" << true; Cbo << "state" << ToString(m_CurrentState); Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } private: ZenServerState::ZenServerEntry* m_ServerEntry = nullptr; bool m_IsDedicatedMode = false; bool m_TestMode = false; CbObject m_RootManifest; std::filesystem::path m_DataRoot; std::filesystem::path m_ContentRoot; std::thread m_IoRunner; asio::io_context m_IoContext; asio::steady_timer m_PidCheckTimer{m_IoContext}; zen::ProcessMonitor m_ProcessMonitor; zen::NamedMutex m_ServerMutex; enum ServerState { kInitializing, kRunning, kShuttingDown } m_CurrentState = kInitializing; inline void SetNewState(ServerState NewState) { m_CurrentState = NewState; } std::string_view ToString(ServerState Value) { switch (Value) { case kInitializing: return "initializing"sv; case kRunning: return "running"sv; case kShuttingDown: return "shutdown"sv; default: return "unknown"sv; } } zen::Ref m_Http; std::unique_ptr m_WebSocket; std::unique_ptr m_AuthMgr; std::unique_ptr m_AuthService; zen::HttpStatusService m_StatusService; zen::HttpStatsService m_StatsService; zen::CasGc m_CasGc; zen::GcScheduler m_GcScheduler{m_CasGc}; std::unique_ptr m_CasStore{zen::CreateCasStore(m_CasGc)}; std::unique_ptr m_CidStore; std::unique_ptr m_CacheStore; zen::CasScrubber m_Scrubber{*m_CasStore}; zen::HttpTestService m_TestService; zen::HttpTestingService m_TestingService; zen::HttpCasService m_CasService{*m_CasStore}; zen::RefPtr m_ProjectStore; zen::Ref m_LocalProjectService; std::unique_ptr m_HttpProjectService; std::unique_ptr m_UpstreamCache; std::unique_ptr m_UpstreamService; std::unique_ptr m_StructuredCacheService; zen::HttpAdminService m_AdminService{m_GcScheduler}; zen::HttpHealthService m_HealthService; zen::MeshTracker m_ZenMesh{m_IoContext}; #if ZEN_WITH_EXEC_SERVICES std::unique_ptr m_HttpLaunchService; #endif // ZEN_WITH_EXEC_SERVICES #if ZEN_WITH_COMPUTE_SERVICES std::unique_ptr m_HttpFunctionService; #endif // ZEN_WITH_COMPUTE_SERVICES std::unique_ptr m_FrontendService; #if ZEN_USE_NAMED_PIPES zen::Ref m_LocalProjectService; #endif bool m_DebugOptionForcedCrash = false; bool m_UseSentry = false; }; void ZenServer::OnReady() { m_ServerEntry->SignalReady(); if (m_IsReadyFunc) { m_IsReadyFunc(); } } void ZenServer::InitializeState(const ZenServerOptions& ServerOptions) { // Check root manifest to deal with schema versioning bool WipeState = false; std::string WipeReason = "Unspecified"; bool UpdateManifest = false; std::filesystem::path ManifestPath = m_DataRoot / "root_manifest"; FileContents ManifestData = zen::ReadFile(ManifestPath); if (ManifestData.ErrorCode) { if (ServerOptions.IsFirstRun) { ZEN_INFO("Initializing state at '{}'", m_DataRoot); UpdateManifest = true; } else { WipeState = true; WipeReason = fmt::format("No manifest present at '{}'", ManifestPath); } } else { IoBuffer Manifest = ManifestData.Flatten(); if (CbValidateError ValidationResult = ValidateCompactBinary(Manifest, CbValidateMode::All); ValidationResult != CbValidateError::None) { ZEN_ERROR("Manifest validation failed: {}, state will be wiped", uint32_t(ValidationResult)); WipeState = true; WipeReason = fmt::format("Validation of manifest at '{}' failed: {}", ManifestPath, uint32_t(ValidationResult)); } else { m_RootManifest = LoadCompactBinaryObject(Manifest); const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0); if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION) { WipeState = true; WipeReason = fmt::format("Manifest schema version: {}, differs from required: {}", ManifestVersion, ZEN_CFG_SCHEMA_VERSION); } } } // Release any open handles so we can overwrite the manifest ManifestData = {}; // Handle any state wipe if (WipeState) { ZEN_WARN("Wiping state at '{}' - reason: '{}'", m_DataRoot, WipeReason); std::error_code Ec; for (const std::filesystem::directory_entry& DirEntry : std::filesystem::directory_iterator{m_DataRoot, Ec}) { if (DirEntry.is_directory() && (DirEntry.path().filename() != "logs")) { ZEN_INFO("Deleting '{}'", DirEntry.path()); std::filesystem::remove_all(DirEntry.path(), Ec); if (Ec) { ZEN_WARN("Delete of '{}' returned error: '{}'", DirEntry.path(), Ec.message()); } } } ZEN_INFO("Wiped all directories in data root"); UpdateManifest = true; } if (UpdateManifest) { // Write new manifest const DateTime Now = DateTime::Now(); CbObjectWriter Cbo; Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << Now << "updated" << Now << "state_id" << Oid::NewOid(); m_RootManifest = Cbo.Save(); WriteFile(ManifestPath, m_RootManifest.GetBuffer().AsIoBuffer()); } } void ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) { using namespace std::literals; ZEN_INFO("instantiating structured cache service"); m_CacheStore = std::make_unique( m_CasGc, ZenCacheStore::Configuration{.BasePath = m_DataRoot / "cache", .AllowAutomaticCreationOfNamespaces = true}); const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; zen::UpstreamCacheOptions UpstreamOptions; UpstreamOptions.ReadUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; UpstreamOptions.WriteUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; if (UpstreamConfig.UpstreamThreadCount < 32) { UpstreamOptions.ThreadCount = static_cast(UpstreamConfig.UpstreamThreadCount); } m_UpstreamCache = zen::UpstreamCache::Create(UpstreamOptions, *m_CacheStore, *m_CidStore); m_UpstreamService = std::make_unique(*m_UpstreamCache, *m_AuthMgr); m_UpstreamCache->Initialize(); if (ServerOptions.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) { // Zen upstream { std::vector ZenUrls = UpstreamConfig.ZenConfig.Urls; if (!UpstreamConfig.ZenConfig.Dns.empty()) { for (const std::string& Dns : UpstreamConfig.ZenConfig.Dns) { if (!Dns.empty()) { const asio::error_code Err = zen::utils::ResolveHostname(m_IoContext, Dns, "1337"sv, ZenUrls); if (Err) { ZEN_ERROR("resolve FAILED, reason '{}'", Err.message()); } } } } std::erase_if(ZenUrls, [](const auto& Url) { return Url.empty(); }); if (!ZenUrls.empty()) { const auto ZenEndpointName = UpstreamConfig.ZenConfig.Name.empty() ? "Zen"sv : UpstreamConfig.ZenConfig.Name; std::unique_ptr ZenEndpoint = zen::UpstreamEndpoint::CreateZenEndpoint( {.Name = ZenEndpointName, .Urls = ZenUrls, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}); m_UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); } } // Jupiter upstream if (UpstreamConfig.JupiterConfig.Url.empty() == false) { std::string_view EndpointName = UpstreamConfig.JupiterConfig.Name.empty() ? "Jupiter"sv : UpstreamConfig.JupiterConfig.Name; auto Options = zen::CloudCacheClientOptions{.Name = EndpointName, .ServiceUrl = UpstreamConfig.JupiterConfig.Url, .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace, .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds), .UseLegacyDdc = false}; auto AuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl, .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId, .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret, .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider, .AccessToken = UpstreamConfig.JupiterConfig.AccessToken}; std::unique_ptr JupiterEndpoint = zen::UpstreamEndpoint::CreateJupiterEndpoint(Options, AuthConfig, *m_AuthMgr); m_UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); } } m_StructuredCacheService = std::make_unique(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, *m_UpstreamCache); m_Http->RegisterService(*m_StructuredCacheService); m_Http->RegisterService(*m_UpstreamService); } #if ZEN_WITH_COMPUTE_SERVICES void ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) { ServerOptions; const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; // Horde compute upstream if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.HordeConfig.StorageUrl.empty() == false) { ZEN_INFO("instantiating compute service"); std::string_view EndpointName = UpstreamConfig.HordeConfig.Name.empty() ? "Horde"sv : UpstreamConfig.HordeConfig.Name; auto ComputeOptions = zen::CloudCacheClientOptions{.Name = EndpointName, .ServiceUrl = UpstreamConfig.HordeConfig.Url, .ComputeCluster = UpstreamConfig.HordeConfig.Cluster, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; auto ComputeAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.OAuthUrl, .OAuthClientId = UpstreamConfig.HordeConfig.OAuthClientId, .OAuthClientSecret = UpstreamConfig.HordeConfig.OAuthClientSecret, .OpenIdProvider = UpstreamConfig.HordeConfig.OpenIdProvider, .AccessToken = UpstreamConfig.HordeConfig.AccessToken}; auto StorageOptions = zen::CloudCacheClientOptions{.Name = EndpointName, .ServiceUrl = UpstreamConfig.HordeConfig.StorageUrl, .BlobStoreNamespace = UpstreamConfig.HordeConfig.Namespace, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; auto StorageAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.StorageOAuthUrl, .OAuthClientId = UpstreamConfig.HordeConfig.StorageOAuthClientId, .OAuthClientSecret = UpstreamConfig.HordeConfig.StorageOAuthClientSecret, .OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider, .AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken}; m_HttpFunctionService = std::make_unique(*m_CasStore, *m_CidStore, ComputeOptions, StorageOptions, ComputeAuthConfig, StorageAuthConfig, *m_AuthMgr); } else { ZEN_INFO("NOT instantiating compute service (missing Horde or Storage config)"); } } #endif // ZEN_WITH_COMPUTE_SERVICES //////////////////////////////////////////////////////////////////////////////// class ZenEntryPoint { public: ZenEntryPoint(ZenServerOptions& ServerOptions); ZenEntryPoint(const ZenEntryPoint&) = delete; ZenEntryPoint& operator=(const ZenEntryPoint&) = delete; int Run(); private: ZenServerOptions& m_ServerOptions; zen::LockFile m_LockFile; }; ZenEntryPoint::ZenEntryPoint(ZenServerOptions& ServerOptions) : m_ServerOptions(ServerOptions) { } #if ZEN_USE_SENTRY static void SentryLogFunction(sentry_level_t Level, const char* Message, va_list Args, [[maybe_unused]] void* Userdata) { char LogMessageBuffer[160]; std::string LogMessage; const char* MessagePtr = LogMessageBuffer; int n = vsnprintf(LogMessageBuffer, sizeof LogMessageBuffer, Message, Args); if (n >= int(sizeof LogMessageBuffer)) { LogMessage.resize(n + 1); n = vsnprintf(LogMessage.data(), LogMessage.size(), Message, Args); MessagePtr = LogMessage.c_str(); } switch (Level) { case SENTRY_LEVEL_DEBUG: ConsoleLog().debug("sentry: {}", MessagePtr); break; case SENTRY_LEVEL_INFO: ConsoleLog().info("sentry: {}", MessagePtr); break; case SENTRY_LEVEL_WARNING: ConsoleLog().warn("sentry: {}", MessagePtr); break; case SENTRY_LEVEL_ERROR: ConsoleLog().error("sentry: {}", MessagePtr); break; case SENTRY_LEVEL_FATAL: ConsoleLog().critical("sentry: {}", MessagePtr); break; } } #endif int ZenEntryPoint::Run() { #if ZEN_USE_SENTRY std::string SentryDatabasePath = PathToUtf8(m_ServerOptions.DataDir / ".sentry-native"); if (m_ServerOptions.NoSentry == false) { sentry_options_t* SentryOptions = sentry_options_new(); sentry_options_set_dsn(SentryOptions, "https://8ba3441bebc941c1ae24b8cd2fd25d55@o10593.ingest.sentry.io/5919284"); sentry_options_set_database_path(SentryOptions, SentryDatabasePath.c_str()); sentry_options_set_logger(SentryOptions, SentryLogFunction, this); // sentry_options_set_debug(SentryOptions, 1); if (int ErrorCode = sentry_init(SentryOptions); ErrorCode == 0) { printf("sentry initialized"); } else { printf("sentry_init returned failure! (error code: %d)", ErrorCode); } } auto _ = zen::MakeGuard([] { sentry_close(); }); #endif auto& ServerOptions = m_ServerOptions; try { // Mutual exclusion and synchronization std::error_code Ec; std::filesystem::path LockFilePath = ServerOptions.DataDir / ".lock"; bool IsReady = false; auto MakeLockData = [&] { CbObjectWriter Cbo; Cbo << "pid" << zen::GetCurrentProcessId() << "data" << PathToUtf8(ServerOptions.DataDir) << "port" << ServerOptions.BasePort << "session_id" << GetSessionId() << "ready" << IsReady; return Cbo.Save(); }; m_LockFile.Create(LockFilePath, MakeLockData(), Ec); if (Ec) { ConsoleLog().error("ERROR: Unable to grab lock at '{}' (error: '{}')", LockFilePath, Ec.message()); std::exit(99); } InitializeLogging(ServerOptions); MaximizeOpenFileCount(); ZEN_INFO(ZEN_APP_NAME " - using lock file at '{}'", LockFilePath); ZEN_INFO(ZEN_APP_NAME " - starting on port {}, version '{}'", ServerOptions.BasePort, ZEN_CFG_VERSION_BUILD_STRING_FULL); ZenServerState ServerState; ServerState.Initialize(); ServerState.Sweep(); ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(ServerOptions.BasePort); if (Entry) { // Instance already running for this port? Should double check pid ZEN_WARN("Looks like there is already a process listening to this port (pid: {})", Entry->Pid); if (ServerOptions.OwnerPid) { Entry->AddSponsorProcess(ServerOptions.OwnerPid); std::exit(0); } } Entry = ServerState.Register(ServerOptions.BasePort); if (ServerOptions.OwnerPid) { Entry->AddSponsorProcess(ServerOptions.OwnerPid); } ZenServer Server; Server.SetDataRoot(ServerOptions.DataDir); Server.SetContentRoot(ServerOptions.ContentDir); Server.SetTestMode(ServerOptions.IsTest); Server.SetDedicatedMode(ServerOptions.IsDedicated); int EffectiveBasePort = Server.Initialize(ServerOptions, Entry); Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); if (EffectiveBasePort != ServerOptions.BasePort) { ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); ServerOptions.BasePort = EffectiveBasePort; } std::unique_ptr ShutdownThread; std::unique_ptr ShutdownEvent; zen::ExtendableStringBuilder<64> ShutdownEventName; ShutdownEventName << "Zen_" << ServerOptions.BasePort << "_Shutdown"; ShutdownEvent.reset(new zen::NamedEvent{ShutdownEventName}); // Monitor shutdown signals ShutdownThread.reset(new std::thread{[&] { ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}'", ShutdownEventName); if (ShutdownEvent->Wait()) { ZEN_INFO("shutdown signal received"); Server.RequestExit(0); } else { ZEN_INFO("shutdown signal wait() failed"); } }}); // If we have a parent process, establish the mechanisms we need // to be able to communicate readiness with the parent Server.SetIsReadyFunc([&] { IsReady = true; m_LockFile.Update(MakeLockData(), Ec); if (!ServerOptions.ChildId.empty()) { zen::NamedEvent ParentEvent{ServerOptions.ChildId}; ParentEvent.Set(); } }); Server.Run(); Server.Cleanup(); ShutdownEvent->Set(); ShutdownThread->join(); } catch (std::exception& e) { SPDLOG_CRITICAL("Caught exception in main: {}", e.what()); } ShutdownLogging(); return 0; } } // namespace zen //////////////////////////////////////////////////////////////////////////////// #if ZEN_PLATFORM_WINDOWS class ZenWindowsService : public WindowsService { public: ZenWindowsService(ZenServerOptions& ServerOptions) : m_EntryPoint(ServerOptions) {} ZenWindowsService(const ZenWindowsService&) = delete; ZenWindowsService& operator=(const ZenWindowsService&) = delete; virtual int Run() override; private: zen::ZenEntryPoint m_EntryPoint; }; int ZenWindowsService::Run() { return m_EntryPoint.Run(); } #endif // ZEN_PLATFORM_WINDOWS //////////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS int test_main(int argc, char** argv) { zen::zencore_forcelinktests(); zen::zenhttp_forcelinktests(); zen::zenstore_forcelinktests(); zen::z$_forcelink(); zen::z$service_forcelink(); zen::logging::InitializeLogging(); spdlog::set_level(spdlog::level::debug); zen::MaximizeOpenFileCount(); return ZEN_RUN_TESTS(argc, argv); } #endif int main(int argc, char* argv[]) { using namespace zen; #if ZEN_USE_MIMALLOC mi_version(); #endif #if ZEN_WITH_TESTS if (argc >= 2) { if (argv[1] == "test"sv) { return test_main(argc, argv); } } #endif try { ZenServerOptions ServerOptions; ParseCliOptions(argc, argv, ServerOptions); if (!std::filesystem::exists(ServerOptions.DataDir)) { ServerOptions.IsFirstRun = true; std::filesystem::create_directories(ServerOptions.DataDir); } #if ZEN_WITH_TRACE if (ServerOptions.TraceHost.size()) { TraceInit(ServerOptions.TraceHost.c_str(), TraceType::Network); } else if (ServerOptions.TraceFile.size()) { TraceInit(ServerOptions.TraceFile.c_str(), TraceType::File); } else { TraceInit(nullptr, TraceType::None); } #endif // ZEN_WITH_TRACE #if ZEN_PLATFORM_WINDOWS if (ServerOptions.InstallService) { WindowsService::Install(); std::exit(0); } if (ServerOptions.UninstallService) { WindowsService::Delete(); std::exit(0); } ZenWindowsService App(ServerOptions); return App.ServiceMain(); #else if (ServerOptions.InstallService || ServerOptions.UninstallService) { throw std::runtime_error("Service mode is not supported on this platform"); } ZenEntryPoint App(ServerOptions); return App.Run(); #endif // ZEN_PLATFORM_WINDOWS } catch (std::exception& Ex) { fprintf(stderr, "ERROR: Caught exception in main: '%s'", Ex.what()); return 1; } }