// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_USE_MIMALLOC # include # include #endif #include #include #include #include #include #include #include #include #if !defined(BUILD_VERSION) # define BUILD_VERSION ("dev-build") #endif ////////////////////////////////////////////////////////////////////////// // We don't have any doctest code in this file but this is needed to bring // in some shared code into the executable #if ZEN_WITH_TESTS # define DOCTEST_CONFIG_IMPLEMENT # include # undef DOCTEST_CONFIG_IMPLEMENT #endif ////////////////////////////////////////////////////////////////////////// #include "casstore.h" #include "config.h" #include "diag/logging.h" #if ZEN_PLATFORM_WINDOWS # include "windows/service.h" #endif ////////////////////////////////////////////////////////////////////////// // Sentry // #define USE_SENTRY 1 #if USE_SENTRY # define SENTRY_BUILD_STATIC 1 # include // Sentry currently does not automatically add all required Windows // libraries to the linker when consumed via vcpkg # if ZEN_PLATFORM_WINDOWS # pragma comment(lib, "sentry.lib") # pragma comment(lib, "dbghelp.lib") # pragma comment(lib, "winhttp.lib") # pragma comment(lib, "version.lib") # endif #endif ////////////////////////////////////////////////////////////////////////// // Services // #include "admin/admin.h" #include "cache/structuredcache.h" #include "cache/structuredcachestore.h" #include "compute/apply.h" #include "diag/diagsvcs.h" #include "experimental/frontend.h" #include "experimental/usnjournal.h" #include "monitoring/httpstats.h" #include "monitoring/httpstatus.h" #include "projectstore.h" #include "testing/httptest.h" #include "testing/launch.h" #include "upstream/jupiter.h" #include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/gc.h" #include "zenstore/scrub.h" #define ZEN_APP_NAME "Zen store" namespace zen { using namespace std::literals; class ZenServer : public IHttpStatusProvider { public: void Initialize(ZenServiceConfig& ServiceConfig, std::string_view HttpServerClass, int BasePort, int ParentPid, ZenServerState::ZenServerEntry* ServerEntry) { using namespace fmt::literals; m_ServerEntry = ServerEntry; m_DebugOptionForcedCrash = ServiceConfig.ShouldCrash; if (ParentPid) { zen::ProcessHandle OwnerProcess; OwnerProcess.Initialize(ParentPid); if (!OwnerProcess.IsValid()) { ZEN_WARN("Unable to initialize process handle for specified parent pid #{}", ParentPid); // If the pid is not reachable should we just shut down immediately? the intended owner process // could have been killed or somehow crashed already } else { ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid); } m_ProcessMonitor.AddPid(ParentPid); } // Initialize/check mutex based on base port std::string MutexName = "zen_{}"_format(BasePort); if (zen::NamedMutex::Exists(MutexName) || ((m_ServerMutex.Create(MutexName) == false))) { throw std::runtime_error("Failed to create mutex '{}' - is another instance already running?"_format(MutexName).c_str()); } // Ok so now we're configured, let's kick things off m_Http = zen::CreateHttpServer(HttpServerClass); m_Http->Initialize(BasePort); m_Http->RegisterService(m_HealthService); m_Http->RegisterService(m_StatsService); m_Http->RegisterService(m_StatusService); m_StatusService.RegisterHandler("status", *this); // Initialize storage and services ZEN_INFO("initializing storage"); zen::CasStoreConfiguration Config; Config.RootDirectory = m_DataRoot / "cas"; m_CasStore->Initialize(Config); m_CidStore = std::make_unique(*m_CasStore, m_DataRoot / "cid"); ZEN_INFO("instantiating project service"); m_ProjectStore = new zen::ProjectStore(*m_CasStore, m_DataRoot / "projects"); m_HttpProjectService.reset(new zen::HttpProjectService{*m_CasStore, m_ProjectStore}); m_LocalProjectService = zen::LocalProjectService::New(*m_CasStore, m_ProjectStore); ZEN_INFO("instantiating compute services"); std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox"; zen::CreateDirectories(SandboxDir); m_HttpLaunchService = std::make_unique(*m_CasStore, SandboxDir); std::filesystem::path ApplySandboxDir = m_DataRoot / "exec" / "apply"; zen::CreateDirectories(ApplySandboxDir); m_HttpFunctionService = std::make_unique(*m_CasStore, *m_CidStore, ApplySandboxDir); if (ServiceConfig.StructuredCacheEnabled) { InitializeStructuredCache(ServiceConfig); } else { ZEN_INFO("NOT instantiating structured cache service"); } #if ZEN_ENABLE_MESH if (ServiceConfig.MeshEnabled) { StartMesh(BasePort); } else { ZEN_INFO("NOT starting mesh"); } #endif m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics m_Http->RegisterService(m_TestingService); m_Http->RegisterService(m_AdminService); if (m_HttpProjectService) { m_Http->RegisterService(*m_HttpProjectService); } m_Http->RegisterService(m_CasService); if (m_StructuredCacheService) { m_Http->RegisterService(*m_StructuredCacheService); } if (m_HttpLaunchService) { m_Http->RegisterService(*m_HttpLaunchService); } if (m_HttpFunctionService) { m_Http->RegisterService(*m_HttpFunctionService); } m_FrontendService = std::make_unique(m_ContentRoot); if (m_FrontendService) { m_Http->RegisterService(*m_FrontendService); } } void InitializeStructuredCache(ZenServiceConfig& ServiceConfig); #if ZEN_ENABLE_MESH void StartMesh(int BasePort) { ZEN_INFO("initializing mesh discovery"); m_ZenMesh.Start(uint16_t(BasePort)); } #endif void Run() { Scrub(); if (m_ProcessMonitor.IsActive()) { EnqueueTimer(); } if (!m_TestMode) { ZEN_INFO("__________ _________ __ "); ZEN_INFO("\\____ /____ ____ / _____// |_ ___________ ____ "); ZEN_INFO(" / // __ \\ / \\ \\_____ \\\\ __\\/ _ \\_ __ \\_/ __ \\ "); ZEN_INFO(" / /\\ ___/| | \\ / \\| | ( <_> ) | \\/\\ ___/ "); ZEN_INFO("/_______ \\___ >___| / /_______ /|__| \\____/|__| \\___ >"); ZEN_INFO(" \\/ \\/ \\/ \\/ \\/ "); } ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", zen::GetCurrentProcessId()); #if USE_SENTRY sentry_clear_modulecache(); #endif if (m_DebugOptionForcedCrash) { __debugbreak(); } const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode; m_CurrentState = kRunning; m_Http->Run(IsInteractiveMode); m_CurrentState = kShuttingDown; ZEN_INFO(ZEN_APP_NAME " exiting"); m_IoContext.stop(); Flush(); } void RequestExit(int ExitCode) { RequestApplicationExit(ExitCode); m_Http->RequestExit(); } void Cleanup() { ZEN_INFO(ZEN_APP_NAME " cleaning up"); } void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } void SetTestMode(bool State) { m_TestMode = State; } void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } void EnsureIoRunner() { if (!m_IoRunner.joinable()) { m_IoRunner = std::move(std::jthread{[this] { m_IoContext.run(); }}); } } void EnqueueTimer() { m_PidCheckTimer.expires_after(std::chrono::seconds(1)); m_PidCheckTimer.async_wait([this](const asio::error_code&) { CheckOwnerPid(); }); EnsureIoRunner(); } void CheckOwnerPid() { // Pick up any new "owner" processes std::set AddedPids; for (auto& PidEntry : m_ServerEntry->SponsorPids) { if (uint32_t ThisPid = PidEntry.load(std::memory_order::memory_order_relaxed)) { if (PidEntry.compare_exchange_strong(ThisPid, 0)) { if (AddedPids.insert(ThisPid).second) { m_ProcessMonitor.AddPid(ThisPid); ZEN_INFO("added process with pid #{} as a sponsor process", ThisPid); } } } } if (m_ProcessMonitor.IsRunning()) { EnqueueTimer(); } else { ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone"); RequestExit(0); } } void Scrub() { Stopwatch Timer; ZEN_INFO("Storage validation STARTING"); ScrubContext Ctx; m_CasStore->Scrub(Ctx); m_CidStore->Scrub(Ctx); m_ProjectStore->Scrub(Ctx); m_StructuredCacheService->Scrub(Ctx); const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})", NiceTimeSpanMs(ElapsedTimeMs), NiceBytes(Ctx.ScrubbedBytes()), Ctx.ScrubbedChunks(), NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs)); } void Flush() { if (m_CasStore) m_CasStore->Flush(); if (m_CidStore) m_CidStore->Flush(); if (m_StructuredCacheService) m_StructuredCacheService->Flush(); if (m_ProjectStore) m_ProjectStore->Flush(); } virtual void HandleStatusRequest(HttpServerRequest& Request) override { CbObjectWriter Cbo; Cbo << "ok" << true; Cbo << "state" << ToString(m_CurrentState); Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } private: ZenServerState::ZenServerEntry* m_ServerEntry = nullptr; bool m_IsDedicatedMode = false; bool m_TestMode = false; std::filesystem::path m_DataRoot; std::filesystem::path m_ContentRoot; std::jthread m_IoRunner; asio::io_context m_IoContext; asio::steady_timer m_PidCheckTimer{m_IoContext}; zen::ProcessMonitor m_ProcessMonitor; zen::NamedMutex m_ServerMutex; enum ServerState { kInitializing, kRunning, kShuttingDown } m_CurrentState = kInitializing; std::string_view ToString(ServerState Value) { switch (Value) { case kInitializing: return "initializing"sv; case kRunning: return "running"sv; case kShuttingDown: return "shutdown"sv; default: return "unknown"sv; } } zen::Ref m_Http; zen::HttpStatusService m_StatusService; zen::HttpStatsService m_StatsService; std::unique_ptr m_CasStore{zen::CreateCasStore()}; std::unique_ptr m_CidStore; std::unique_ptr m_CacheStore; zen::CasGc m_Gc{*m_CasStore}; zen::CasScrubber m_Scrubber{*m_CasStore}; zen::HttpTestService m_TestService; zen::HttpTestingService m_TestingService; zen::HttpCasService m_CasService{*m_CasStore}; zen::RefPtr m_ProjectStore; zen::Ref m_LocalProjectService; std::unique_ptr m_HttpLaunchService; std::unique_ptr m_HttpProjectService; std::unique_ptr m_StructuredCacheService; zen::HttpAdminService m_AdminService; zen::HttpHealthService m_HealthService; zen::Mesh m_ZenMesh{m_IoContext}; std::unique_ptr m_HttpFunctionService; std::unique_ptr m_FrontendService; bool m_DebugOptionForcedCrash = false; }; void ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig) { using namespace std::literals; auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; }; ZEN_INFO("instantiating structured cache service"); m_CacheStore = std::make_unique(*m_CasStore, m_DataRoot / "cache"); std::unique_ptr UpstreamCache; if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) { const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig; zen::UpstreamCacheOptions UpstreamOptions; UpstreamOptions.ReadUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; UpstreamOptions.WriteUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; if (UpstreamConfig.UpstreamThreadCount < 32) { UpstreamOptions.ThreadCount = static_cast(UpstreamConfig.UpstreamThreadCount); } UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled; UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); if (!UpstreamConfig.ZenConfig.Urls.empty()) { std::unique_ptr ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Urls); UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); } { zen::CloudCacheClientOptions Options; if (UpstreamConfig.JupiterConfig.UseProductionSettings) { Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, .DdcNamespace = "ue.ddc"sv, .BlobStoreNamespace = "ue.ddc"sv, .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, .UseLegacyDdc = false}; } else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings) { Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv, .DdcNamespace = "ue4.ddc"sv, .BlobStoreNamespace = "test.ddc"sv, .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, .UseLegacyDdc = false}; } Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl); Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace); Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace); Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider); Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId); Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret); Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc; if (!Options.ServiceUrl.empty()) { std::unique_ptr JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options); UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); } } if (UpstreamCache->Initialize()) { ZEN_INFO("upstream cache active ({})", UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE" : UpstreamOptions.ReadUpstream ? "READONLY" : UpstreamOptions.WriteUpstream ? "WRITEONLY" : "DISABLED"); } else { UpstreamCache.reset(); ZEN_INFO("NOT using upstream cache"); } } m_StructuredCacheService.reset(new zen::HttpStructuredCacheService(*m_CacheStore, *m_CasStore, *m_CidStore, m_StatsService, m_StatusService, std::move(UpstreamCache))); } } // namespace zen class ZenWindowsService : public WindowsService { public: ZenWindowsService(ZenServerOptions& GlobalOptions, ZenServiceConfig& ServiceConfig) : m_GlobalOptions(GlobalOptions) , m_ServiceConfig(ServiceConfig) { } ZenWindowsService(const ZenWindowsService&) = delete; ZenWindowsService& operator=(const ZenWindowsService&) = delete; virtual int Run() override; private: ZenServerOptions& m_GlobalOptions; ZenServiceConfig& m_ServiceConfig; }; int ZenWindowsService::Run() { using namespace zen; #if USE_SENTRY // Initialize sentry.io client sentry_options_t* SentryOptions = sentry_options_new(); sentry_options_set_dsn(SentryOptions, "https://8ba3441bebc941c1ae24b8cd2fd25d55@o10593.ingest.sentry.io/5919284"); sentry_init(SentryOptions); auto _ = zen::MakeGuard([] { sentry_close(); }); #endif auto& GlobalOptions = m_GlobalOptions; auto& ServiceConfig = m_ServiceConfig; try { // Prototype config system, we'll see how this pans out // // TODO: we need to report any parse errors here ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig); ZEN_INFO(ZEN_APP_NAME " - starting on port {}, build '{}'", GlobalOptions.BasePort, BUILD_VERSION); ZenServerState ServerState; ServerState.Initialize(); ServerState.Sweep(); ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(GlobalOptions.BasePort); if (Entry) { // Instance already running for this port? Should double check pid ZEN_WARN("Looks like there is already a process listening to this port (pid: {})", Entry->Pid); if (GlobalOptions.OwnerPid) { Entry->AddSponsorProcess(GlobalOptions.OwnerPid); std::exit(0); } } Entry = ServerState.Register(GlobalOptions.BasePort); if (GlobalOptions.OwnerPid) { Entry->AddSponsorProcess(GlobalOptions.OwnerPid); } std::unique_ptr ShutdownThread; std::unique_ptr ShutdownEvent; zen::ExtendableStringBuilder<64> ShutdownEventName; ShutdownEventName << "Zen_" << GlobalOptions.BasePort << "_Shutdown"; ShutdownEvent.reset(new zen::NamedEvent{ShutdownEventName}); ZenServer Server; Server.SetDataRoot(GlobalOptions.DataDir); Server.SetContentRoot(GlobalOptions.ContentDir); Server.SetTestMode(GlobalOptions.IsTest); Server.SetDedicatedMode(GlobalOptions.IsDedicated); Server.Initialize(ServiceConfig, GlobalOptions.HttpServerClass, GlobalOptions.BasePort, GlobalOptions.OwnerPid, Entry); // Monitor shutdown signals ShutdownThread.reset(new std::thread{[&] { ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}'", ShutdownEventName); if (ShutdownEvent->Wait()) { ZEN_INFO("shutdown signal received"); Server.RequestExit(0); } }}); // If we have a parent process, establish the mechanisms we need // to be able to communicate with the parent if (!GlobalOptions.ChildId.empty()) { zen::NamedEvent ParentEvent{GlobalOptions.ChildId}; ParentEvent.Set(); } Server.Run(); Server.Cleanup(); ShutdownEvent->Set(); ShutdownThread->join(); } catch (std::exception& e) { SPDLOG_CRITICAL("Caught exception in main: {}", e.what()); } ShutdownLogging(); return 0; } int main(int argc, char* argv[]) { using namespace zen; #if ZEN_USE_MIMALLOC mi_version(); #endif ZenServerOptions GlobalOptions; ZenServiceConfig ServiceConfig; ParseGlobalCliOptions(argc, argv, GlobalOptions, ServiceConfig); InitializeLogging(GlobalOptions); #if ZEN_PLATFORM_WINDOWS if (GlobalOptions.InstallService) { WindowsService::Install(); std::exit(0); } if (GlobalOptions.UninstallService) { WindowsService::Delete(); std::exit(0); } #endif ZenWindowsService App(GlobalOptions, ServiceConfig); return App.ServiceMain(); }