// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_PLATFORM_WINDOWS # include #endif #if ZEN_PLATFORM_LINUX # include #endif #if ZEN_PLATFORM_MAC # include #endif #if ZEN_USE_MIMALLOC ZEN_THIRD_PARTY_INCLUDES_START # include # include ZEN_THIRD_PARTY_INCLUDES_END #endif ZEN_THIRD_PARTY_INCLUDES_START #include #include #include ZEN_THIRD_PARTY_INCLUDES_END #include #include #include #include #include #include ////////////////////////////////////////////////////////////////////////// // We don't have any doctest code in this file but this is needed to bring // in some shared code into the executable #if ZEN_WITH_TESTS # define ZEN_TEST_WITH_RUNNER 1 # include #endif ////////////////////////////////////////////////////////////////////////// #include "config.h" #include "diag/logging.h" #if ZEN_PLATFORM_WINDOWS # include "windows/service.h" #endif ////////////////////////////////////////////////////////////////////////// // Sentry // #if !defined(ZEN_USE_SENTRY) # if ZEN_PLATFORM_MAC && ZEN_ARCH_ARM64 // vcpkg's sentry-native port does not support Arm on Mac. # define ZEN_USE_SENTRY 0 # else # define ZEN_USE_SENTRY 1 # endif #endif #if ZEN_USE_SENTRY # define SENTRY_BUILD_STATIC 1 ZEN_THIRD_PARTY_INCLUDES_START # include # include ZEN_THIRD_PARTY_INCLUDES_END // Sentry currently does not automatically add all required Windows // libraries to the linker when consumed via vcpkg # if ZEN_PLATFORM_WINDOWS # pragma comment(lib, "sentry.lib") # pragma comment(lib, "dbghelp.lib") # pragma comment(lib, "winhttp.lib") # pragma comment(lib, "version.lib") # endif #endif ////////////////////////////////////////////////////////////////////////// // Services // #include #include #include #include #include #include #include #include "admin/admin.h" #include "cache/httpstructuredcache.h" #include "cache/structuredcachestore.h" #include "compute/function.h" #include "frontend/frontend.h" #include "httpcidstore.h" #include "objectstore/objectstore.h" #include "projectstore/httpprojectstore.h" #include "projectstore/projectstore.h" #include "upstream/upstream.h" #include "vfs/vfsservice.h" #define ZEN_APP_NAME "Zen store" namespace zen { using namespace std::literals; namespace utils { static std::atomic_uint32_t SignalCounter[NSIG] = {0}; static void SignalCallbackHandler(int SigNum) { if (SigNum >= 0 && SigNum < NSIG) { SignalCounter[SigNum].fetch_add(1); } } #if ZEN_USE_SENTRY class sentry_sink final : public spdlog::sinks::base_sink { public: sentry_sink() {} protected: static constexpr sentry_level_t MapToSentryLevel[spdlog::level::level_enum::n_levels] = {SENTRY_LEVEL_DEBUG, SENTRY_LEVEL_DEBUG, SENTRY_LEVEL_INFO, SENTRY_LEVEL_WARNING, SENTRY_LEVEL_ERROR, SENTRY_LEVEL_FATAL, SENTRY_LEVEL_DEBUG}; void sink_it_(const spdlog::details::log_msg& msg) override { std::string Message = fmt::format("{}\n{}({}) [{}]", msg.payload, msg.source.filename, msg.source.line, msg.source.funcname); sentry_value_t event = sentry_value_new_message_event( /* level */ MapToSentryLevel[msg.level], /* logger */ nullptr, /* message */ Message.c_str()); sentry_event_value_add_stacktrace(event, NULL, 0); sentry_capture_event(event); } void flush_() override {} }; struct SentryAssertImpl : AssertImpl { ZEN_FORCENOINLINE ZEN_DEBUG_SECTION SentryAssertImpl() : PrevAssertImpl(CurrentAssertImpl) { CurrentAssertImpl = this; } virtual ZEN_FORCENOINLINE ZEN_DEBUG_SECTION ~SentryAssertImpl() { CurrentAssertImpl = PrevAssertImpl; } virtual void ZEN_FORCENOINLINE ZEN_DEBUG_SECTION OnAssert(const char* Filename, int LineNumber, const char* FunctionName, const char* Msg) { std::string Message = fmt::format("ASSERT {}:({}) [{}]\n\"{}\"", Filename, LineNumber, FunctionName, Msg); sentry_value_t event = sentry_value_new_message_event( /* level */ SENTRY_LEVEL_ERROR, /* logger */ nullptr, /* message */ Message.c_str()); sentry_event_value_add_stacktrace(event, NULL, 0); sentry_capture_event(event); } AssertImpl* PrevAssertImpl; }; #endif asio::error_code ResolveHostname(asio::io_context& Ctx, std::string_view Host, std::string_view DefaultPort, std::vector& OutEndpoints) { std::string_view Port = DefaultPort; if (const size_t Idx = Host.find(":"); Idx != std::string_view::npos) { Port = Host.substr(Idx + 1); Host = Host.substr(0, Idx); } asio::ip::tcp::resolver Resolver(Ctx); asio::error_code ErrorCode; asio::ip::tcp::resolver::results_type Endpoints = Resolver.resolve(Host, Port, ErrorCode); if (!ErrorCode) { for (const asio::ip::tcp::endpoint Ep : Endpoints) { OutEndpoints.push_back(fmt::format("http://{}:{}", Ep.address().to_string(), Ep.port())); } } return ErrorCode; } } // namespace utils class ZenServer : public IHttpStatusProvider { public: ~ZenServer() {} int Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry) { m_UseSentry = ServerOptions.NoSentry == false; m_ServerEntry = ServerEntry; m_DebugOptionForcedCrash = ServerOptions.ShouldCrash; const int ParentPid = ServerOptions.OwnerPid; if (ParentPid) { zen::ProcessHandle OwnerProcess; OwnerProcess.Initialize(ParentPid); if (!OwnerProcess.IsValid()) { ZEN_WARN("Unable to initialize process handle for specified parent pid #{}", ParentPid); // If the pid is not reachable should we just shut down immediately? the intended owner process // could have been killed or somehow crashed already } else { ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid); } m_ProcessMonitor.AddPid(ParentPid); } // Initialize/check mutex based on base port std::string MutexName = fmt::format("zen_{}", ServerOptions.BasePort); if (zen::NamedMutex::Exists(MutexName) || ((m_ServerMutex.Create(MutexName) == false))) { throw std::runtime_error(fmt::format("Failed to create mutex '{}' - is another instance already running?", MutexName).c_str()); } InitializeState(ServerOptions); m_JobQueue = MakeJobQueue(8, "backgroundjobs"); m_HealthService.SetHealthInfo({.DataRoot = m_DataRoot, .AbsLogPath = ServerOptions.AbsLogFile, .HttpServerClass = std::string(ServerOptions.HttpServerConfig.ServerClass), .BuildVersion = std::string(ZEN_CFG_VERSION_BUILD_STRING_FULL)}); // Ok so now we're configured, let's kick things off m_Http = zen::CreateHttpServer(ServerOptions.HttpServerConfig); int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort); if (ServerOptions.WebSocketPort != 0) { const uint32_t ThreadCount = ServerOptions.WebSocketThreads > 0 ? uint32_t(ServerOptions.WebSocketThreads) : std::thread::hardware_concurrency(); m_WebSocket = zen::WebSocketServer::Create( {.Port = gsl::narrow(ServerOptions.WebSocketPort), .ThreadCount = Max(ThreadCount, uint32_t(16))}); } // Setup authentication manager { std::string EncryptionKey = ServerOptions.EncryptionKey; if (EncryptionKey.empty()) { EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456"; ZEN_WARN("using default encryption key"); } std::string EncryptionIV = ServerOptions.EncryptionIV; if (EncryptionIV.empty()) { EncryptionIV = "0123456789abcdef"; ZEN_WARN("using default encryption initialization vector"); } m_AuthMgr = AuthMgr::Create({.RootDirectory = m_DataRoot / "auth", .EncryptionKey = AesKey256Bit::FromString(EncryptionKey), .EncryptionIV = AesIV128Bit::FromString(EncryptionIV)}); for (const ZenOpenIdProviderConfig& OpenIdProvider : ServerOptions.AuthConfig.OpenIdProviders) { m_AuthMgr->AddOpenIdProvider({.Name = OpenIdProvider.Name, .Url = OpenIdProvider.Url, .ClientId = OpenIdProvider.ClientId}); } } m_AuthService = std::make_unique(*m_AuthMgr); m_Http->RegisterService(*m_AuthService); m_Http->RegisterService(m_HealthService); m_Http->RegisterService(m_StatsService); m_Http->RegisterService(m_StatusService); m_StatusService.RegisterHandler("status", *this); // Initialize storage and services ZEN_INFO("initializing storage"); zen::CidStoreConfiguration Config; Config.RootDirectory = m_DataRoot / "cas"; m_CidStore = std::make_unique(m_GcManager); m_CidStore->Initialize(Config); m_CidService.reset(new zen::HttpCidService{*m_CidStore}); ZEN_INFO("instantiating project service"); m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue); m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr}); #if ZEN_WITH_COMPUTE_SERVICES if (ServerOptions.ComputeServiceEnabled) { InitializeCompute(ServerOptions); } else { ZEN_INFO("NOT instantiating compute services"); } #endif // ZEN_WITH_COMPUTE_SERVICES if (ServerOptions.StructuredCacheEnabled) { InitializeStructuredCache(ServerOptions); } else { ZEN_INFO("NOT instantiating structured cache service"); } m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics #if ZEN_WITH_TESTS m_Http->RegisterService(m_TestingService); if (m_WebSocket) { m_WebSocket->RegisterService(m_TestingService); } #endif if (m_HttpProjectService) { m_Http->RegisterService(*m_HttpProjectService); } m_Http->RegisterService(*m_CidService); #if ZEN_WITH_COMPUTE_SERVICES if (ServerOptions.ComputeServiceEnabled) { if (m_HttpFunctionService != nullptr) { m_Http->RegisterService(*m_HttpFunctionService); } } #endif // ZEN_WITH_COMPUTE_SERVICES m_FrontendService = std::make_unique(m_ContentRoot); if (m_FrontendService) { m_Http->RegisterService(*m_FrontendService); } if (ServerOptions.ObjectStoreEnabled) { ObjectStoreConfig ObjCfg; ObjCfg.RootDirectory = m_DataRoot / "obj"; ObjCfg.ServerPort = static_cast(EffectiveBasePort); for (const auto& Bucket : ServerOptions.ObjectStoreConfig.Buckets) { ObjectStoreConfig::BucketConfig NewBucket{.Name = Bucket.Name}; NewBucket.Directory = Bucket.Directory.empty() ? (ObjCfg.RootDirectory / Bucket.Name) : Bucket.Directory; ObjCfg.Buckets.push_back(std::move(NewBucket)); } m_ObjStoreService = std::make_unique(std::move(ObjCfg)); m_Http->RegisterService(*m_ObjStoreService); } m_VfsService = std::make_unique(); m_VfsService->AddService(Ref(m_ProjectStore)); m_VfsService->AddService(Ref(m_CacheStore)); m_Http->RegisterService(*m_VfsService); ZEN_INFO("initializing GC, enabled '{}', interval {}s", ServerOptions.GcConfig.Enabled, ServerOptions.GcConfig.IntervalSeconds); zen::GcSchedulerConfig GcConfig{ .RootDirectory = m_DataRoot / "gc", .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), .MaxProjectStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds), .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, .Enabled = ServerOptions.GcConfig.Enabled, .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, .DiskSizeSoftLimit = ServerOptions.GcConfig.DiskSizeSoftLimit, .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites}; m_GcScheduler.Initialize(GcConfig); // Create and register admin interface last to make sure all is properly initialized m_AdminService = std::make_unique(m_GcScheduler, *m_JobQueue); m_Http->RegisterService(*m_AdminService); return EffectiveBasePort; } void InitializeState(const ZenServerOptions& ServerOptions); void InitializeStructuredCache(const ZenServerOptions& ServerOptions); void InitializeCompute(const ZenServerOptions& ServerOptions); void Run() { // This is disabled for now, awaiting better scheduling // // ScrubStorage(); if (m_ProcessMonitor.IsActive()) { EnqueueTimer(); } if (!m_TestMode) { ZEN_INFO("__________ _________ __ "); ZEN_INFO("\\____ /____ ____ / _____// |_ ___________ ____ "); ZEN_INFO(" / // __ \\ / \\ \\_____ \\\\ __\\/ _ \\_ __ \\_/ __ \\ "); ZEN_INFO(" / /\\ ___/| | \\ / \\| | ( <_> ) | \\/\\ ___/ "); ZEN_INFO("/_______ \\___ >___| / /_______ /|__| \\____/|__| \\___ >"); ZEN_INFO(" \\/ \\/ \\/ \\/ \\/ "); } ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", zen::GetCurrentProcessId()); #if ZEN_USE_SENTRY ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); if (m_UseSentry) { sentry_clear_modulecache(); } #endif if (m_DebugOptionForcedCrash) { ZEN_DEBUG_BREAK(); } const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode; SetNewState(kRunning); OnReady(); if (m_WebSocket) { m_WebSocket->Run(); } m_Http->Run(IsInteractiveMode); SetNewState(kShuttingDown); ZEN_INFO(ZEN_APP_NAME " exiting"); Flush(); } void RequestExit(int ExitCode) { RequestApplicationExit(ExitCode); if (m_Http) { m_Http->RequestExit(); } } void Cleanup() { ZEN_INFO(ZEN_APP_NAME " cleaning up"); try { m_IoContext.stop(); if (m_IoRunner.joinable()) { m_IoRunner.join(); } if (m_Http) { m_Http->Close(); } if (m_JobQueue) { m_JobQueue->Stop(); } m_GcScheduler.Shutdown(); m_AdminService.reset(); m_VfsService.reset(); m_ObjStoreService.reset(); m_FrontendService.reset(); m_StructuredCacheService.reset(); m_UpstreamService.reset(); m_UpstreamCache.reset(); m_CacheStore = {}; #if ZEN_WITH_COMPUTE_SERVICES m_HttpFunctionService.reset(); #endif // ZEN_WITH_COMPUTE_SERVICES m_HttpProjectService.reset(); m_ProjectStore = {}; m_CidService.reset(); m_CidStore.reset(); m_AuthService.reset(); m_AuthMgr.reset(); m_WebSocket.reset(); m_Http = {}; m_JobQueue.reset(); } catch (std::exception& Ex) { ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); } } void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } void SetTestMode(bool State) { m_TestMode = State; } void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } std::function m_IsReadyFunc; void SetIsReadyFunc(std::function&& IsReadyFunc) { m_IsReadyFunc = std::move(IsReadyFunc); } void OnReady(); void EnsureIoRunner() { if (!m_IoRunner.joinable()) { m_IoRunner = std::thread{[this] { zen::SetCurrentThreadName("timer_io"); m_IoContext.run(); }}; } } void EnqueueTimer() { m_PidCheckTimer.expires_after(std::chrono::seconds(1)); m_PidCheckTimer.async_wait([this](const asio::error_code&) { CheckOwnerPid(); }); EnsureIoRunner(); } void EnqueueStateMarkerTimer() { m_StateMakerTimer.expires_after(std::chrono::seconds(5)); m_StateMakerTimer.async_wait([this](const asio::error_code&) { CheckStateMarker(); }); EnsureIoRunner(); } void EnqueueSigIntTimer() { m_SigIntTimer.expires_after(std::chrono::milliseconds(500)); m_SigIntTimer.async_wait([this](const asio::error_code&) { CheckSigInt(); }); EnsureIoRunner(); } void CheckStateMarker() { std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker"; try { if (!std::filesystem::exists(StateMarkerPath)) { ZEN_WARN("state marker at {} has been deleted, exiting", StateMarkerPath); RequestExit(1); return; } } catch (std::exception& Ex) { ZEN_WARN("state marker at {} could not be checked, reason: '{}'", StateMarkerPath, Ex.what()); RequestExit(1); return; } EnqueueStateMarkerTimer(); } void CheckSigInt() { if (utils::SignalCounter[SIGINT] > 0) { ZEN_INFO("SIGINT triggered (Ctrl+C), exiting"); RequestExit(128 + SIGINT); return; } EnqueueSigIntTimer(); } void CheckOwnerPid() { // Pick up any new "owner" processes std::set AddedPids; for (auto& PidEntry : m_ServerEntry->SponsorPids) { if (uint32_t ThisPid = PidEntry.load(std::memory_order_relaxed)) { if (PidEntry.compare_exchange_strong(ThisPid, 0)) { if (AddedPids.insert(ThisPid).second) { m_ProcessMonitor.AddPid(ThisPid); ZEN_INFO("added process with pid #{} as a sponsor process", ThisPid); } } } } if (m_ProcessMonitor.IsRunning()) { EnqueueTimer(); } else { ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone"); RequestExit(0); } } void ScrubStorage() { Stopwatch Timer; ZEN_INFO("Storage validation STARTING"); WorkerThreadPool ThreadPool{1}; ScrubContext Ctx{ThreadPool}; m_CidStore->ScrubStorage(Ctx); m_ProjectStore->ScrubStorage(Ctx); m_StructuredCacheService->ScrubStorage(Ctx); const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})", NiceTimeSpanMs(ElapsedTimeMs), NiceBytes(Ctx.ScrubbedBytes()), Ctx.ScrubbedChunks(), NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs)); } void Flush() { if (m_CidStore) m_CidStore->Flush(); if (m_StructuredCacheService) m_StructuredCacheService->Flush(); if (m_ProjectStore) m_ProjectStore->Flush(); } virtual void HandleStatusRequest(HttpServerRequest& Request) override { CbObjectWriter Cbo; Cbo << "ok" << true; Cbo << "state" << ToString(m_CurrentState); Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } private: ZenServerState::ZenServerEntry* m_ServerEntry = nullptr; bool m_IsDedicatedMode = false; bool m_TestMode = false; CbObject m_RootManifest; std::filesystem::path m_DataRoot; std::filesystem::path m_ContentRoot; std::thread m_IoRunner; asio::io_context m_IoContext; asio::steady_timer m_PidCheckTimer{m_IoContext}; asio::steady_timer m_StateMakerTimer{m_IoContext}; asio::steady_timer m_SigIntTimer{m_IoContext}; zen::ProcessMonitor m_ProcessMonitor; zen::NamedMutex m_ServerMutex; enum ServerState { kInitializing, kRunning, kShuttingDown } m_CurrentState = kInitializing; inline void SetNewState(ServerState NewState) { m_CurrentState = NewState; } std::string_view ToString(ServerState Value) { switch (Value) { case kInitializing: return "initializing"sv; case kRunning: return "running"sv; case kShuttingDown: return "shutdown"sv; default: return "unknown"sv; } } zen::Ref m_Http; std::unique_ptr m_WebSocket; std::unique_ptr m_AuthMgr; std::unique_ptr m_AuthService; zen::HttpStatusService m_StatusService; zen::HttpStatsService m_StatsService; zen::GcManager m_GcManager; zen::GcScheduler m_GcScheduler{m_GcManager}; std::unique_ptr m_CidStore; Ref m_CacheStore; zen::HttpTestService m_TestService; #if ZEN_WITH_TESTS zen::HttpTestingService m_TestingService; #endif std::unique_ptr m_CidService; zen::RefPtr m_ProjectStore; std::unique_ptr m_HttpProjectService; std::unique_ptr m_UpstreamCache; std::unique_ptr m_UpstreamService; std::unique_ptr m_StructuredCacheService; zen::HttpHealthService m_HealthService; #if ZEN_WITH_COMPUTE_SERVICES std::unique_ptr m_HttpFunctionService; #endif // ZEN_WITH_COMPUTE_SERVICES std::unique_ptr m_FrontendService; std::unique_ptr m_ObjStoreService; std::unique_ptr m_VfsService; std::unique_ptr m_JobQueue; std::unique_ptr m_AdminService; bool m_DebugOptionForcedCrash = false; bool m_UseSentry = false; }; void ZenServer::OnReady() { m_ServerEntry->SignalReady(); if (m_IsReadyFunc) { m_IsReadyFunc(); } } void ZenServer::InitializeState(const ZenServerOptions& ServerOptions) { EnqueueSigIntTimer(); // Check root manifest to deal with schema versioning bool WipeState = false; std::string WipeReason = "Unspecified"; bool UpdateManifest = false; std::filesystem::path ManifestPath = m_DataRoot / "root_manifest"; FileContents ManifestData = zen::ReadFile(ManifestPath); if (ManifestData.ErrorCode) { if (ServerOptions.IsFirstRun) { ZEN_INFO("Initializing state at '{}'", m_DataRoot); UpdateManifest = true; } else { WipeState = true; WipeReason = fmt::format("No manifest present at '{}'", ManifestPath); } } else { IoBuffer Manifest = ManifestData.Flatten(); if (CbValidateError ValidationResult = ValidateCompactBinary(Manifest, CbValidateMode::All); ValidationResult != CbValidateError::None) { ZEN_WARN("Manifest validation failed: {}, state will be wiped", uint32_t(ValidationResult)); WipeState = true; WipeReason = fmt::format("Validation of manifest at '{}' failed: {}", ManifestPath, uint32_t(ValidationResult)); } else { m_RootManifest = LoadCompactBinaryObject(Manifest); const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0); if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION) { WipeState = true; WipeReason = fmt::format("Manifest schema version: {}, differs from required: {}", ManifestVersion, ZEN_CFG_SCHEMA_VERSION); } } } // Release any open handles so we can overwrite the manifest ManifestData = {}; // Handle any state wipe if (WipeState) { ZEN_WARN("Wiping state at '{}' - reason: '{}'", m_DataRoot, WipeReason); std::error_code Ec; for (const std::filesystem::directory_entry& DirEntry : std::filesystem::directory_iterator{m_DataRoot, Ec}) { if (DirEntry.is_directory() && (DirEntry.path().filename() != "logs")) { ZEN_INFO("Deleting '{}'", DirEntry.path()); std::filesystem::remove_all(DirEntry.path(), Ec); if (Ec) { ZEN_WARN("Delete of '{}' returned error: '{}'", DirEntry.path(), Ec.message()); } } } ZEN_INFO("Wiped all directories in data root"); UpdateManifest = true; } if (UpdateManifest) { // Write new manifest const DateTime Now = DateTime::Now(); CbObjectWriter Cbo; Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << Now << "updated" << Now << "state_id" << Oid::NewOid(); m_RootManifest = Cbo.Save(); WriteFile(ManifestPath, m_RootManifest.GetBuffer().AsIoBuffer()); } { std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker"; if (!std::filesystem::is_regular_file(StateMarkerPath)) { static const std::string_view StateMarkerContent = "deleting this file will cause " ZEN_APP_NAME " to exit"sv; WriteFile(StateMarkerPath, IoBuffer(IoBuffer::Wrap, StateMarkerContent.data(), StateMarkerContent.size())); } EnqueueStateMarkerTimer(); } } void ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) { using namespace std::literals; ZEN_INFO("instantiating structured cache service"); m_CacheStore = new ZenCacheStore(m_GcManager, ZenCacheStore::Configuration{.BasePath = m_DataRoot / "cache", .AllowAutomaticCreationOfNamespaces = true, .EnableWriteLog = ServerOptions.StructuredCacheWriteLogEnabled, .EnableAccessLog = ServerOptions.StructuredCacheAccessLogEnabled}, m_GcManager.GetDiskWriteBlocker()); const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; zen::UpstreamCacheOptions UpstreamOptions; UpstreamOptions.ReadUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; UpstreamOptions.WriteUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; if (UpstreamConfig.UpstreamThreadCount < 32) { UpstreamOptions.ThreadCount = static_cast(UpstreamConfig.UpstreamThreadCount); } m_UpstreamCache = zen::UpstreamCache::Create(UpstreamOptions, *m_CacheStore, *m_CidStore); m_UpstreamService = std::make_unique(*m_UpstreamCache, *m_AuthMgr); m_UpstreamCache->Initialize(); if (ServerOptions.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) { // Zen upstream { std::vector ZenUrls = UpstreamConfig.ZenConfig.Urls; if (!UpstreamConfig.ZenConfig.Dns.empty()) { for (const std::string& Dns : UpstreamConfig.ZenConfig.Dns) { if (!Dns.empty()) { const asio::error_code Err = zen::utils::ResolveHostname(m_IoContext, Dns, "1337"sv, ZenUrls); if (Err) { ZEN_ERROR("resolve FAILED, reason '{}'", Err.message()); } } } } std::erase_if(ZenUrls, [](const auto& Url) { return Url.empty(); }); if (!ZenUrls.empty()) { const auto ZenEndpointName = UpstreamConfig.ZenConfig.Name.empty() ? "Zen"sv : UpstreamConfig.ZenConfig.Name; std::unique_ptr ZenEndpoint = zen::UpstreamEndpoint::CreateZenEndpoint( {.Name = ZenEndpointName, .Urls = ZenUrls, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}); m_UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); } } // Jupiter upstream if (UpstreamConfig.JupiterConfig.Url.empty() == false) { std::string_view EndpointName = UpstreamConfig.JupiterConfig.Name.empty() ? "Jupiter"sv : UpstreamConfig.JupiterConfig.Name; auto Options = zen::CloudCacheClientOptions{.Name = EndpointName, .ServiceUrl = UpstreamConfig.JupiterConfig.Url, .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace, .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; auto AuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl, .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId, .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret, .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider, .AccessToken = UpstreamConfig.JupiterConfig.AccessToken}; std::unique_ptr JupiterEndpoint = zen::UpstreamEndpoint::CreateJupiterEndpoint(Options, AuthConfig, *m_AuthMgr); m_UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); } } m_StructuredCacheService = std::make_unique(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, *m_UpstreamCache, m_GcManager.GetDiskWriteBlocker()); m_Http->RegisterService(*m_StructuredCacheService); m_Http->RegisterService(*m_UpstreamService); } #if ZEN_WITH_COMPUTE_SERVICES void ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) { ServerOptions; const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; // Horde compute upstream if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.HordeConfig.StorageUrl.empty() == false) { ZEN_INFO("instantiating compute service"); std::string_view EndpointName = UpstreamConfig.HordeConfig.Name.empty() ? "Horde"sv : UpstreamConfig.HordeConfig.Name; auto ComputeOptions = zen::CloudCacheClientOptions{.Name = EndpointName, .ServiceUrl = UpstreamConfig.HordeConfig.Url, .ComputeCluster = UpstreamConfig.HordeConfig.Cluster, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; auto ComputeAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.OAuthUrl, .OAuthClientId = UpstreamConfig.HordeConfig.OAuthClientId, .OAuthClientSecret = UpstreamConfig.HordeConfig.OAuthClientSecret, .OpenIdProvider = UpstreamConfig.HordeConfig.OpenIdProvider, .AccessToken = UpstreamConfig.HordeConfig.AccessToken}; auto StorageOptions = zen::CloudCacheClientOptions{.Name = EndpointName, .ServiceUrl = UpstreamConfig.HordeConfig.StorageUrl, .BlobStoreNamespace = UpstreamConfig.HordeConfig.Namespace, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; auto StorageAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.StorageOAuthUrl, .OAuthClientId = UpstreamConfig.HordeConfig.StorageOAuthClientId, .OAuthClientSecret = UpstreamConfig.HordeConfig.StorageOAuthClientSecret, .OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider, .AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken}; m_HttpFunctionService = std::make_unique(*m_CidStore, ComputeOptions, StorageOptions, ComputeAuthConfig, StorageAuthConfig, *m_AuthMgr); } else { ZEN_INFO("NOT instantiating compute service (missing Horde or Storage config)"); } } #endif // ZEN_WITH_COMPUTE_SERVICES //////////////////////////////////////////////////////////////////////////////// class ZenEntryPoint { public: ZenEntryPoint(ZenServerOptions& ServerOptions); ZenEntryPoint(const ZenEntryPoint&) = delete; ZenEntryPoint& operator=(const ZenEntryPoint&) = delete; int Run(); private: ZenServerOptions& m_ServerOptions; zen::LockFile m_LockFile; }; ZenEntryPoint::ZenEntryPoint(ZenServerOptions& ServerOptions) : m_ServerOptions(ServerOptions) { } #if ZEN_USE_SENTRY static void SentryLogFunction(sentry_level_t Level, const char* Message, va_list Args, [[maybe_unused]] void* Userdata) { char LogMessageBuffer[160]; std::string LogMessage; const char* MessagePtr = LogMessageBuffer; int n = vsnprintf(LogMessageBuffer, sizeof LogMessageBuffer, Message, Args); if (n >= int(sizeof LogMessageBuffer)) { LogMessage.resize(n + 1); n = vsnprintf(LogMessage.data(), LogMessage.size(), Message, Args); MessagePtr = LogMessage.c_str(); } switch (Level) { case SENTRY_LEVEL_DEBUG: ConsoleLog().debug("sentry: {}", MessagePtr); break; case SENTRY_LEVEL_INFO: ConsoleLog().info("sentry: {}", MessagePtr); break; case SENTRY_LEVEL_WARNING: ConsoleLog().warn("sentry: {}", MessagePtr); break; case SENTRY_LEVEL_ERROR: ConsoleLog().error("sentry: {}", MessagePtr); break; case SENTRY_LEVEL_FATAL: ConsoleLog().critical("sentry: {}", MessagePtr); break; } } #endif int ZenEntryPoint::Run() { #if ZEN_USE_SENTRY int SentryErrorCode = 0; std::unique_ptr SentryAssert; std::string SentryUserName; if (m_ServerOptions.NoSentry == false) { std::string SentryDatabasePath = PathToUtf8(m_ServerOptions.DataDir / ".sentry-native"); if (SentryDatabasePath.starts_with("\\\\?\\")) { SentryDatabasePath = SentryDatabasePath.substr(4); } sentry_options_t* SentryOptions = sentry_options_new(); sentry_options_set_dsn(SentryOptions, "https://8ba3441bebc941c1ae24b8cd2fd25d55@o10593.ingest.sentry.io/5919284"); sentry_options_set_database_path(SentryOptions, SentryDatabasePath.c_str()); sentry_options_set_logger(SentryOptions, SentryLogFunction, this); std::string SentryAttachmentPath = m_ServerOptions.AbsLogFile.string(); if (SentryAttachmentPath.starts_with("\\\\?\\")) { SentryAttachmentPath = SentryAttachmentPath.substr(4); } sentry_options_add_attachment(SentryOptions, SentryAttachmentPath.c_str()); sentry_options_set_release(SentryOptions, ZEN_CFG_VERSION); // sentry_options_set_debug(SentryOptions, 1); SentryErrorCode = sentry_init(SentryOptions); if (SentryErrorCode == 0) { if (m_ServerOptions.SentryAllowPII) { # if ZEN_PLATFORM_WINDOWS CHAR UserNameBuffer[511 + 1]; DWORD UserNameLength = sizeof(UserNameBuffer) / sizeof(CHAR); BOOL OK = GetUserNameA(UserNameBuffer, &UserNameLength); if (OK) { SentryUserName = std::string(UserNameBuffer, UserNameLength); } # endif // ZEN_PLATFORM_WINDOWS # if (ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC) uid_t uid = geteuid(); struct passwd* pw = getpwuid(uid); if (pw) { SentryUserName = std::string(pw->pw_name); } # endif sentry_value_t SentryUserObject = sentry_value_new_object(); sentry_value_set_by_key(SentryUserObject, "id", sentry_value_new_string(SentryUserName.c_str())); sentry_value_set_by_key(SentryUserObject, "username", sentry_value_new_string(SentryUserName.c_str())); sentry_value_set_by_key(SentryUserObject, "ip_address", sentry_value_new_string("{{auto}}")); sentry_set_user(SentryUserObject); } auto SentrySink = spdlog::create("sentry"); zen::logging::SetErrorLog(std::move(SentrySink)); SentryAssert = std::make_unique(); } } auto _ = zen::MakeGuard([&SentryAssert, SentryErrorCode] { if (SentryErrorCode == 0) { SentryAssert.reset(); zen::logging::SetErrorLog(std::shared_ptr()); sentry_close(); } }); #endif auto& ServerOptions = m_ServerOptions; try { // Mutual exclusion and synchronization ZenServerState ServerState; ServerState.Initialize(); ServerState.Sweep(); ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(ServerOptions.BasePort); if (Entry) { if (ServerOptions.OwnerPid) { ZEN_INFO( "Looks like there is already a process listening to this port {} (pid: {}), attaching owner pid {} to running instance", ServerOptions.BasePort, Entry->Pid.load(), ServerOptions.OwnerPid); Entry->AddSponsorProcess(ServerOptions.OwnerPid); std::exit(0); } else { ZEN_WARN("Exiting since there is already a process listening to port {} (pid: {})", ServerOptions.BasePort, Entry->Pid.load()); std::exit(1); } } std::error_code Ec; std::filesystem::path LockFilePath = ServerOptions.DataDir / ".lock"; bool IsReady = false; auto MakeLockData = [&] { CbObjectWriter Cbo; Cbo << "pid" << zen::GetCurrentProcessId() << "data" << PathToUtf8(ServerOptions.DataDir) << "port" << ServerOptions.BasePort << "session_id" << GetSessionId() << "ready" << IsReady; return Cbo.Save(); }; m_LockFile.Create(LockFilePath, MakeLockData(), Ec); if (Ec) { ZEN_WARN("ERROR: Unable to grab lock at '{}' (error: '{}')", LockFilePath, Ec.message()); std::exit(99); } InitializeLogging(ServerOptions); #if ZEN_USE_SENTRY if (m_ServerOptions.NoSentry == false) { if (SentryErrorCode == 0) { if (m_ServerOptions.SentryAllowPII) { ZEN_INFO("sentry initialized, username: '{}'", SentryUserName); } else { ZEN_INFO("sentry initialized with anonymous reports"); } } else { ZEN_WARN("sentry_init returned failure! (error code: {})", SentryErrorCode); } } #endif MaximizeOpenFileCount(); ZEN_INFO(ZEN_APP_NAME " - using lock file at '{}'", LockFilePath); ZEN_INFO(ZEN_APP_NAME " - starting on port {}, version '{}'", ServerOptions.BasePort, ZEN_CFG_VERSION_BUILD_STRING_FULL); Entry = ServerState.Register(ServerOptions.BasePort); if (ServerOptions.OwnerPid) { Entry->AddSponsorProcess(ServerOptions.OwnerPid); } ZenServer Server; Server.SetDataRoot(ServerOptions.DataDir); Server.SetContentRoot(ServerOptions.ContentDir); Server.SetTestMode(ServerOptions.IsTest); Server.SetDedicatedMode(ServerOptions.IsDedicated); auto ServerCleanup = zen::MakeGuard([&Server] { Server.Cleanup(); }); int EffectiveBasePort = Server.Initialize(ServerOptions, Entry); Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); if (EffectiveBasePort != ServerOptions.BasePort) { ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); ServerOptions.BasePort = EffectiveBasePort; } std::unique_ptr ShutdownThread; std::unique_ptr ShutdownEvent; zen::ExtendableStringBuilder<64> ShutdownEventName; ShutdownEventName << "Zen_" << ServerOptions.BasePort << "_Shutdown"; ShutdownEvent.reset(new zen::NamedEvent{ShutdownEventName}); // Monitor shutdown signals ShutdownThread.reset(new std::thread{[&] { zen::SetCurrentThreadName("shutdown_monitor"); ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}'", ShutdownEventName); if (ShutdownEvent->Wait()) { if (!IsApplicationExitRequested()) { ZEN_INFO("shutdown signal received"); Server.RequestExit(0); } } else { ZEN_INFO("shutdown signal wait() failed"); } }}); auto CleanupShutdown = zen::MakeGuard([&ShutdownEvent, &ShutdownThread] { if (ShutdownEvent) { ShutdownEvent->Set(); } if (ShutdownThread && ShutdownThread->joinable()) { ShutdownThread->join(); } }); // If we have a parent process, establish the mechanisms we need // to be able to communicate readiness with the parent Server.SetIsReadyFunc([&] { IsReady = true; m_LockFile.Update(MakeLockData(), Ec); if (!ServerOptions.ChildId.empty()) { zen::NamedEvent ParentEvent{ServerOptions.ChildId}; ParentEvent.Set(); } }); Server.Run(); } catch (std::exception& e) { SPDLOG_CRITICAL("Caught exception in main: {}", e.what()); if (!IsApplicationExitRequested()) { RequestApplicationExit(1); } } ShutdownLogging(); return ApplicationExitCode(); } } // namespace zen //////////////////////////////////////////////////////////////////////////////// #if ZEN_PLATFORM_WINDOWS class ZenWindowsService : public WindowsService { public: ZenWindowsService(ZenServerOptions& ServerOptions) : m_EntryPoint(ServerOptions) {} ZenWindowsService(const ZenWindowsService&) = delete; ZenWindowsService& operator=(const ZenWindowsService&) = delete; virtual int Run() override; private: zen::ZenEntryPoint m_EntryPoint; }; int ZenWindowsService::Run() { return m_EntryPoint.Run(); } #endif // ZEN_PLATFORM_WINDOWS //////////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS int test_main(int argc, char** argv) { zen::zencore_forcelinktests(); zen::zenhttp_forcelinktests(); zen::zenstore_forcelinktests(); zen::z$_forcelink(); zen::z$service_forcelink(); zen::logging::InitializeLogging(); spdlog::set_level(spdlog::level::debug); zen::MaximizeOpenFileCount(); return ZEN_RUN_TESTS(argc, argv); } #endif int main(int argc, char* argv[]) { using namespace zen; #if ZEN_USE_MIMALLOC mi_version(); #endif if (argc >= 2) { if (argv[1] == "test"sv) { #if ZEN_WITH_TESTS return test_main(argc, argv); #else fprintf(stderr, "test option not available in release mode!\n"); exit(5); #endif } } signal(SIGINT, utils::SignalCallbackHandler); try { ZenServerOptions ServerOptions; ParseCliOptions(argc, argv, ServerOptions); if (!std::filesystem::exists(ServerOptions.DataDir)) { ServerOptions.IsFirstRun = true; std::filesystem::create_directories(ServerOptions.DataDir); } #if ZEN_WITH_TRACE if (ServerOptions.TraceHost.size()) { TraceStart(ServerOptions.TraceHost.c_str(), TraceType::Network); } else if (ServerOptions.TraceFile.size()) { TraceStart(ServerOptions.TraceFile.c_str(), TraceType::File); } else { TraceInit(); } atexit(TraceShutdown); #endif // ZEN_WITH_TRACE #if ZEN_PLATFORM_WINDOWS if (ServerOptions.InstallService) { WindowsService::Install(); std::exit(0); } if (ServerOptions.UninstallService) { WindowsService::Delete(); std::exit(0); } ZenWindowsService App(ServerOptions); return App.ServiceMain(); #else if (ServerOptions.InstallService || ServerOptions.UninstallService) { throw std::runtime_error("Service mode is not supported on this platform"); } ZenEntryPoint App(ServerOptions); return App.Run(); #endif // ZEN_PLATFORM_WINDOWS } catch (std::exception& Ex) { fprintf(stderr, "ERROR: Caught exception in main: '%s'", Ex.what()); return 1; } }