diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/zenserver.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenserver/zenserver.cpp')
| -rw-r--r-- | src/zenserver/zenserver.cpp | 1261 |
1 files changed, 1261 insertions, 0 deletions
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp new file mode 100644 index 000000000..635fd04e0 --- /dev/null +++ b/src/zenserver/zenserver.cpp @@ -0,0 +1,1261 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/config.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> +#include <zencore/logging.h> +#include <zencore/refcount.h> +#include <zencore/scopeguard.h> +#include <zencore/session.h> +#include <zencore/string.h> +#include <zencore/thread.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zenhttp/httpserver.h> +#include <zenhttp/websocket.h> +#include <zenstore/cidstore.h> +#include <zenstore/scrubcontext.h> +#include <zenutil/basicfile.h> +#include <zenutil/zenserverprocess.h> + +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> +#endif + +#if ZEN_USE_MIMALLOC +ZEN_THIRD_PARTY_INCLUDES_START +# include <mimalloc-new-delete.h> +# include <mimalloc.h> +ZEN_THIRD_PARTY_INCLUDES_END +#endif + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +#include <asio.hpp> +#include <lua.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <exception> +#include <list> +#include <optional> +#include <regex> +#include <set> +#include <unordered_map> + +////////////////////////////////////////////////////////////////////////// +// We don't have any doctest code in this file but this is needed to bring +// in some shared code into the executable + +#if ZEN_WITH_TESTS +# define ZEN_TEST_WITH_RUNNER 1 +# include <zencore/testing.h> +#endif + +////////////////////////////////////////////////////////////////////////// + +#include "config.h" +#include "diag/logging.h" + +#if ZEN_PLATFORM_WINDOWS +# include "windows/service.h" +#endif + +////////////////////////////////////////////////////////////////////////// +// Sentry +// + +#if !defined(ZEN_USE_SENTRY) +# if ZEN_PLATFORM_MAC && ZEN_ARCH_ARM64 +// vcpkg's sentry-native port does not support Arm on Mac. +# define ZEN_USE_SENTRY 0 +# else +# define ZEN_USE_SENTRY 1 +# endif +#endif + +#if ZEN_USE_SENTRY +# define SENTRY_BUILD_STATIC 1 +ZEN_THIRD_PARTY_INCLUDES_START +# include <sentry.h> +# include <spdlog/sinks/base_sink.h> +ZEN_THIRD_PARTY_INCLUDES_END + +// Sentry currently does not automatically add all required Windows +// libraries to the linker when consumed via vcpkg + +# if ZEN_PLATFORM_WINDOWS +# pragma comment(lib, "sentry.lib") +# pragma comment(lib, "dbghelp.lib") +# pragma comment(lib, "winhttp.lib") +# pragma comment(lib, "version.lib") +# endif +#endif + +////////////////////////////////////////////////////////////////////////// +// Services +// + +#include "admin/admin.h" +#include "auth/authmgr.h" +#include "auth/authservice.h" +#include "cache/structuredcache.h" +#include "cache/structuredcachestore.h" +#include "cidstore.h" +#include "compute/function.h" +#include "diag/diagsvcs.h" +#include "frontend/frontend.h" +#include "monitoring/httpstats.h" +#include "monitoring/httpstatus.h" +#include "objectstore/objectstore.h" +#include "projectstore/projectstore.h" +#include "testing/httptest.h" +#include "upstream/upstream.h" +#include "zenstore/gc.h" + +#define ZEN_APP_NAME "Zen store" + +namespace zen { + +using namespace std::literals; + +namespace utils { +#if ZEN_USE_SENTRY + class sentry_sink final : public spdlog::sinks::base_sink<spdlog::details::null_mutex> + { + public: + sentry_sink() {} + + protected: + static constexpr sentry_level_t MapToSentryLevel[spdlog::level::level_enum::n_levels] = {SENTRY_LEVEL_DEBUG, + SENTRY_LEVEL_DEBUG, + SENTRY_LEVEL_INFO, + SENTRY_LEVEL_WARNING, + SENTRY_LEVEL_ERROR, + SENTRY_LEVEL_FATAL, + SENTRY_LEVEL_DEBUG}; + + void sink_it_(const spdlog::details::log_msg& msg) override + { + std::string Message = fmt::format("{}\n{}({}) [{}]", msg.payload, msg.source.filename, msg.source.line, msg.source.funcname); + sentry_value_t event = sentry_value_new_message_event( + /* level */ MapToSentryLevel[msg.level], + /* logger */ nullptr, + /* message */ Message.c_str()); + sentry_event_value_add_stacktrace(event, NULL, 0); + sentry_capture_event(event); + } + void flush_() override {} + }; +#endif + + asio::error_code ResolveHostname(asio::io_context& Ctx, + std::string_view Host, + std::string_view DefaultPort, + std::vector<std::string>& OutEndpoints) + { + std::string_view Port = DefaultPort; + + if (const size_t Idx = Host.find(":"); Idx != std::string_view::npos) + { + Port = Host.substr(Idx + 1); + Host = Host.substr(0, Idx); + } + + asio::ip::tcp::resolver Resolver(Ctx); + + asio::error_code ErrorCode; + asio::ip::tcp::resolver::results_type Endpoints = Resolver.resolve(Host, Port, ErrorCode); + + if (!ErrorCode) + { + for (const asio::ip::tcp::endpoint Ep : Endpoints) + { + OutEndpoints.push_back(fmt::format("http://{}:{}", Ep.address().to_string(), Ep.port())); + } + } + + return ErrorCode; + } +} // namespace utils + +class ZenServer : public IHttpStatusProvider +{ +public: + int Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry) + { + m_UseSentry = ServerOptions.NoSentry == false; + m_ServerEntry = ServerEntry; + m_DebugOptionForcedCrash = ServerOptions.ShouldCrash; + const int ParentPid = ServerOptions.OwnerPid; + + if (ParentPid) + { + zen::ProcessHandle OwnerProcess; + OwnerProcess.Initialize(ParentPid); + + if (!OwnerProcess.IsValid()) + { + ZEN_WARN("Unable to initialize process handle for specified parent pid #{}", ParentPid); + + // If the pid is not reachable should we just shut down immediately? the intended owner process + // could have been killed or somehow crashed already + } + else + { + ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid); + } + + m_ProcessMonitor.AddPid(ParentPid); + } + + // Initialize/check mutex based on base port + + std::string MutexName = fmt::format("zen_{}", ServerOptions.BasePort); + + if (zen::NamedMutex::Exists(MutexName) || ((m_ServerMutex.Create(MutexName) == false))) + { + throw std::runtime_error(fmt::format("Failed to create mutex '{}' - is another instance already running?", MutexName).c_str()); + } + + InitializeState(ServerOptions); + + m_HealthService.SetHealthInfo({.DataRoot = m_DataRoot, + .AbsLogPath = ServerOptions.AbsLogFile, + .HttpServerClass = std::string(ServerOptions.HttpServerClass), + .BuildVersion = std::string(ZEN_CFG_VERSION_BUILD_STRING_FULL)}); + + // Ok so now we're configured, let's kick things off + + m_Http = zen::CreateHttpServer(ServerOptions.HttpServerClass); + int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort); + + if (ServerOptions.WebSocketPort != 0) + { + const uint32 ThreadCount = + ServerOptions.WebSocketThreads > 0 ? uint32_t(ServerOptions.WebSocketThreads) : std::thread::hardware_concurrency(); + + m_WebSocket = zen::WebSocketServer::Create( + {.Port = gsl::narrow<uint16_t>(ServerOptions.WebSocketPort), .ThreadCount = Max(ThreadCount, uint32_t(16))}); + } + + // Setup authentication manager + { + std::string EncryptionKey = ServerOptions.EncryptionKey; + + if (EncryptionKey.empty()) + { + EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456"; + + ZEN_WARN("using default encryption key"); + } + + std::string EncryptionIV = ServerOptions.EncryptionIV; + + if (EncryptionIV.empty()) + { + EncryptionIV = "0123456789abcdef"; + + ZEN_WARN("using default encryption initialization vector"); + } + + m_AuthMgr = AuthMgr::Create({.RootDirectory = m_DataRoot / "auth", + .EncryptionKey = AesKey256Bit::FromString(EncryptionKey), + .EncryptionIV = AesIV128Bit::FromString(EncryptionIV)}); + + for (const ZenOpenIdProviderConfig& OpenIdProvider : ServerOptions.AuthConfig.OpenIdProviders) + { + m_AuthMgr->AddOpenIdProvider({.Name = OpenIdProvider.Name, .Url = OpenIdProvider.Url, .ClientId = OpenIdProvider.ClientId}); + } + } + + m_AuthService = std::make_unique<zen::HttpAuthService>(*m_AuthMgr); + m_Http->RegisterService(*m_AuthService); + + m_Http->RegisterService(m_HealthService); + m_Http->RegisterService(m_StatsService); + m_Http->RegisterService(m_StatusService); + m_StatusService.RegisterHandler("status", *this); + + // Initialize storage and services + + ZEN_INFO("initializing storage"); + + zen::CidStoreConfiguration Config; + Config.RootDirectory = m_DataRoot / "cas"; + + m_CidStore = std::make_unique<zen::CidStore>(m_GcManager); + m_CidStore->Initialize(Config); + m_CidService.reset(new zen::HttpCidService{*m_CidStore}); + + ZEN_INFO("instantiating project service"); + + m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager); + m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr}); + +#if ZEN_WITH_COMPUTE_SERVICES + if (ServerOptions.ComputeServiceEnabled) + { + InitializeCompute(ServerOptions); + } + else + { + ZEN_INFO("NOT instantiating compute services"); + } +#endif // ZEN_WITH_COMPUTE_SERVICES + + if (ServerOptions.StructuredCacheEnabled) + { + InitializeStructuredCache(ServerOptions); + } + else + { + ZEN_INFO("NOT instantiating structured cache service"); + } + + m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics + m_Http->RegisterService(m_TestingService); + m_Http->RegisterService(m_AdminService); + + if (m_WebSocket) + { + m_WebSocket->RegisterService(m_TestingService); + } + + if (m_HttpProjectService) + { + m_Http->RegisterService(*m_HttpProjectService); + } + + m_Http->RegisterService(*m_CidService); + +#if ZEN_WITH_COMPUTE_SERVICES + if (ServerOptions.ComputeServiceEnabled) + { + if (m_HttpFunctionService != nullptr) + { + m_Http->RegisterService(*m_HttpFunctionService); + } + } +#endif // ZEN_WITH_COMPUTE_SERVICES + + m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot); + + if (m_FrontendService) + { + m_Http->RegisterService(*m_FrontendService); + } + + if (ServerOptions.ObjectStoreEnabled) + { + ObjectStoreConfig ObjCfg; + ObjCfg.RootDirectory = m_DataRoot / "obj"; + ObjCfg.ServerPort = static_cast<uint16_t>(EffectiveBasePort); + + for (const auto& Bucket : ServerOptions.ObjectStoreConfig.Buckets) + { + ObjectStoreConfig::BucketConfig NewBucket{.Name = Bucket.Name}; + NewBucket.Directory = Bucket.Directory.empty() ? (ObjCfg.RootDirectory / Bucket.Name) : Bucket.Directory; + ObjCfg.Buckets.push_back(std::move(NewBucket)); + } + + m_ObjStoreService = std::make_unique<HttpObjectStoreService>(std::move(ObjCfg)); + m_Http->RegisterService(*m_ObjStoreService); + } + + ZEN_INFO("initializing GC, enabled '{}', interval {}s", ServerOptions.GcConfig.Enabled, ServerOptions.GcConfig.IntervalSeconds); + zen::GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc", + .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), + .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), + .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), + .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, + .Enabled = ServerOptions.GcConfig.Enabled, + .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, + .DiskSizeSoftLimit = ServerOptions.GcConfig.Cache.DiskSizeSoftLimit}; + m_GcScheduler.Initialize(GcConfig); + + return EffectiveBasePort; + } + + void InitializeState(const ZenServerOptions& ServerOptions); + void InitializeStructuredCache(const ZenServerOptions& ServerOptions); + void InitializeCompute(const ZenServerOptions& ServerOptions); + + void Run() + { + // This is disabled for now, awaiting better scheduling + // + // Scrub(); + + if (m_ProcessMonitor.IsActive()) + { + EnqueueTimer(); + } + + if (!m_TestMode) + { + ZEN_INFO("__________ _________ __ "); + ZEN_INFO("\\____ /____ ____ / _____// |_ ___________ ____ "); + ZEN_INFO(" / // __ \\ / \\ \\_____ \\\\ __\\/ _ \\_ __ \\_/ __ \\ "); + ZEN_INFO(" / /\\ ___/| | \\ / \\| | ( <_> ) | \\/\\ ___/ "); + ZEN_INFO("/_______ \\___ >___| / /_______ /|__| \\____/|__| \\___ >"); + ZEN_INFO(" \\/ \\/ \\/ \\/ \\/ "); + } + + ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", zen::GetCurrentProcessId()); + +#if ZEN_USE_SENTRY + ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); + if (m_UseSentry) + { + sentry_clear_modulecache(); + } +#endif + + if (m_DebugOptionForcedCrash) + { + ZEN_DEBUG_BREAK(); + } + + const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode; + + SetNewState(kRunning); + + OnReady(); + + if (m_WebSocket) + { + m_WebSocket->Run(); + } + + m_Http->Run(IsInteractiveMode); + + SetNewState(kShuttingDown); + + ZEN_INFO(ZEN_APP_NAME " exiting"); + + m_IoContext.stop(); + if (m_IoRunner.joinable()) + { + m_IoRunner.join(); + } + + Flush(); + } + + void RequestExit(int ExitCode) + { + RequestApplicationExit(ExitCode); + m_Http->RequestExit(); + } + + void Cleanup() + { + ZEN_INFO(ZEN_APP_NAME " cleaning up"); + m_GcScheduler.Shutdown(); + } + + void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } + void SetTestMode(bool State) { m_TestMode = State; } + void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } + void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } + + std::function<void()> m_IsReadyFunc; + void SetIsReadyFunc(std::function<void()>&& IsReadyFunc) { m_IsReadyFunc = std::move(IsReadyFunc); } + void OnReady(); + + void EnsureIoRunner() + { + if (!m_IoRunner.joinable()) + { + m_IoRunner = std::thread{[this] { m_IoContext.run(); }}; + } + } + + void EnqueueTimer() + { + m_PidCheckTimer.expires_after(std::chrono::seconds(1)); + m_PidCheckTimer.async_wait([this](const asio::error_code&) { CheckOwnerPid(); }); + + EnsureIoRunner(); + } + + void CheckOwnerPid() + { + // Pick up any new "owner" processes + + std::set<uint32_t> AddedPids; + + for (auto& PidEntry : m_ServerEntry->SponsorPids) + { + if (uint32_t ThisPid = PidEntry.load(std::memory_order_relaxed)) + { + if (PidEntry.compare_exchange_strong(ThisPid, 0)) + { + if (AddedPids.insert(ThisPid).second) + { + m_ProcessMonitor.AddPid(ThisPid); + + ZEN_INFO("added process with pid #{} as a sponsor process", ThisPid); + } + } + } + } + + if (m_ProcessMonitor.IsRunning()) + { + EnqueueTimer(); + } + else + { + ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone"); + + RequestExit(0); + } + } + + void Scrub() + { + Stopwatch Timer; + ZEN_INFO("Storage validation STARTING"); + + ScrubContext Ctx; + m_CidStore->Scrub(Ctx); + m_ProjectStore->Scrub(Ctx); + m_StructuredCacheService->Scrub(Ctx); + + const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); + + ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})", + NiceTimeSpanMs(ElapsedTimeMs), + NiceBytes(Ctx.ScrubbedBytes()), + Ctx.ScrubbedChunks(), + NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs)); + } + + void Flush() + { + if (m_CidStore) + m_CidStore->Flush(); + + if (m_StructuredCacheService) + m_StructuredCacheService->Flush(); + + if (m_ProjectStore) + m_ProjectStore->Flush(); + } + + virtual void HandleStatusRequest(HttpServerRequest& Request) override + { + CbObjectWriter Cbo; + Cbo << "ok" << true; + Cbo << "state" << ToString(m_CurrentState); + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } + +private: + ZenServerState::ZenServerEntry* m_ServerEntry = nullptr; + bool m_IsDedicatedMode = false; + bool m_TestMode = false; + CbObject m_RootManifest; + std::filesystem::path m_DataRoot; + std::filesystem::path m_ContentRoot; + std::thread m_IoRunner; + asio::io_context m_IoContext; + asio::steady_timer m_PidCheckTimer{m_IoContext}; + zen::ProcessMonitor m_ProcessMonitor; + zen::NamedMutex m_ServerMutex; + + enum ServerState + { + kInitializing, + kRunning, + kShuttingDown + } m_CurrentState = kInitializing; + + inline void SetNewState(ServerState NewState) { m_CurrentState = NewState; } + + std::string_view ToString(ServerState Value) + { + switch (Value) + { + case kInitializing: + return "initializing"sv; + case kRunning: + return "running"sv; + case kShuttingDown: + return "shutdown"sv; + default: + return "unknown"sv; + } + } + + zen::Ref<zen::HttpServer> m_Http; + std::unique_ptr<zen::WebSocketServer> m_WebSocket; + std::unique_ptr<zen::AuthMgr> m_AuthMgr; + std::unique_ptr<zen::HttpAuthService> m_AuthService; + zen::HttpStatusService m_StatusService; + zen::HttpStatsService m_StatsService; + zen::GcManager m_GcManager; + zen::GcScheduler m_GcScheduler{m_GcManager}; + std::unique_ptr<zen::CidStore> m_CidStore; + std::unique_ptr<zen::ZenCacheStore> m_CacheStore; + zen::HttpTestService m_TestService; + zen::HttpTestingService m_TestingService; + std::unique_ptr<zen::HttpCidService> m_CidService; + zen::RefPtr<zen::ProjectStore> m_ProjectStore; + std::unique_ptr<zen::HttpProjectService> m_HttpProjectService; + std::unique_ptr<zen::UpstreamCache> m_UpstreamCache; + std::unique_ptr<zen::HttpUpstreamService> m_UpstreamService; + std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService; + zen::HttpAdminService m_AdminService{m_GcScheduler}; + zen::HttpHealthService m_HealthService; +#if ZEN_WITH_COMPUTE_SERVICES + std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService; +#endif // ZEN_WITH_COMPUTE_SERVICES + std::unique_ptr<zen::HttpFrontendService> m_FrontendService; + std::unique_ptr<zen::HttpObjectStoreService> m_ObjStoreService; + + bool m_DebugOptionForcedCrash = false; + bool m_UseSentry = false; +}; + +void +ZenServer::OnReady() +{ + m_ServerEntry->SignalReady(); + + if (m_IsReadyFunc) + { + m_IsReadyFunc(); + } +} + +void +ZenServer::InitializeState(const ZenServerOptions& ServerOptions) +{ + // Check root manifest to deal with schema versioning + + bool WipeState = false; + std::string WipeReason = "Unspecified"; + + bool UpdateManifest = false; + std::filesystem::path ManifestPath = m_DataRoot / "root_manifest"; + FileContents ManifestData = zen::ReadFile(ManifestPath); + + if (ManifestData.ErrorCode) + { + if (ServerOptions.IsFirstRun) + { + ZEN_INFO("Initializing state at '{}'", m_DataRoot); + + UpdateManifest = true; + } + else + { + WipeState = true; + WipeReason = fmt::format("No manifest present at '{}'", ManifestPath); + } + } + else + { + IoBuffer Manifest = ManifestData.Flatten(); + + if (CbValidateError ValidationResult = ValidateCompactBinary(Manifest, CbValidateMode::All); + ValidationResult != CbValidateError::None) + { + ZEN_WARN("Manifest validation failed: {}, state will be wiped", uint32_t(ValidationResult)); + + WipeState = true; + WipeReason = fmt::format("Validation of manifest at '{}' failed: {}", ManifestPath, uint32_t(ValidationResult)); + } + else + { + m_RootManifest = LoadCompactBinaryObject(Manifest); + + const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0); + + if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION) + { + WipeState = true; + WipeReason = fmt::format("Manifest schema version: {}, differs from required: {}", ManifestVersion, ZEN_CFG_SCHEMA_VERSION); + } + } + } + + // Release any open handles so we can overwrite the manifest + ManifestData = {}; + + // Handle any state wipe + + if (WipeState) + { + ZEN_WARN("Wiping state at '{}' - reason: '{}'", m_DataRoot, WipeReason); + + std::error_code Ec; + for (const std::filesystem::directory_entry& DirEntry : std::filesystem::directory_iterator{m_DataRoot, Ec}) + { + if (DirEntry.is_directory() && (DirEntry.path().filename() != "logs")) + { + ZEN_INFO("Deleting '{}'", DirEntry.path()); + + std::filesystem::remove_all(DirEntry.path(), Ec); + + if (Ec) + { + ZEN_WARN("Delete of '{}' returned error: '{}'", DirEntry.path(), Ec.message()); + } + } + } + + ZEN_INFO("Wiped all directories in data root"); + + UpdateManifest = true; + } + + if (UpdateManifest) + { + // Write new manifest + + const DateTime Now = DateTime::Now(); + + CbObjectWriter Cbo; + Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << Now << "updated" << Now << "state_id" << Oid::NewOid(); + + m_RootManifest = Cbo.Save(); + + WriteFile(ManifestPath, m_RootManifest.GetBuffer().AsIoBuffer()); + } +} + +void +ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) +{ + using namespace std::literals; + + ZEN_INFO("instantiating structured cache service"); + m_CacheStore = std::make_unique<ZenCacheStore>( + m_GcManager, + ZenCacheStore::Configuration{.BasePath = m_DataRoot / "cache", .AllowAutomaticCreationOfNamespaces = true}); + + const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; + + zen::UpstreamCacheOptions UpstreamOptions; + UpstreamOptions.ReadUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; + UpstreamOptions.WriteUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; + + if (UpstreamConfig.UpstreamThreadCount < 32) + { + UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); + } + + m_UpstreamCache = zen::UpstreamCache::Create(UpstreamOptions, *m_CacheStore, *m_CidStore); + m_UpstreamService = std::make_unique<HttpUpstreamService>(*m_UpstreamCache, *m_AuthMgr); + m_UpstreamCache->Initialize(); + + if (ServerOptions.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) + { + // Zen upstream + { + std::vector<std::string> ZenUrls = UpstreamConfig.ZenConfig.Urls; + if (!UpstreamConfig.ZenConfig.Dns.empty()) + { + for (const std::string& Dns : UpstreamConfig.ZenConfig.Dns) + { + if (!Dns.empty()) + { + const asio::error_code Err = zen::utils::ResolveHostname(m_IoContext, Dns, "1337"sv, ZenUrls); + if (Err) + { + ZEN_ERROR("resolve FAILED, reason '{}'", Err.message()); + } + } + } + } + + std::erase_if(ZenUrls, [](const auto& Url) { return Url.empty(); }); + + if (!ZenUrls.empty()) + { + const auto ZenEndpointName = UpstreamConfig.ZenConfig.Name.empty() ? "Zen"sv : UpstreamConfig.ZenConfig.Name; + + std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::UpstreamEndpoint::CreateZenEndpoint( + {.Name = ZenEndpointName, + .Urls = ZenUrls, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}); + + m_UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); + } + } + + // Jupiter upstream + if (UpstreamConfig.JupiterConfig.Url.empty() == false) + { + std::string_view EndpointName = UpstreamConfig.JupiterConfig.Name.empty() ? "Jupiter"sv : UpstreamConfig.JupiterConfig.Name; + + auto Options = + zen::CloudCacheClientOptions{.Name = EndpointName, + .ServiceUrl = UpstreamConfig.JupiterConfig.Url, + .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace, + .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; + + auto AuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl, + .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId, + .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret, + .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider, + .AccessToken = UpstreamConfig.JupiterConfig.AccessToken}; + + std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = + zen::UpstreamEndpoint::CreateJupiterEndpoint(Options, AuthConfig, *m_AuthMgr); + + m_UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); + } + } + + m_StructuredCacheService = + std::make_unique<HttpStructuredCacheService>(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, *m_UpstreamCache); + + m_Http->RegisterService(*m_StructuredCacheService); + m_Http->RegisterService(*m_UpstreamService); +} + +#if ZEN_WITH_COMPUTE_SERVICES +void +ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) +{ + ServerOptions; + const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; + + // Horde compute upstream + if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.HordeConfig.StorageUrl.empty() == false) + { + ZEN_INFO("instantiating compute service"); + + std::string_view EndpointName = UpstreamConfig.HordeConfig.Name.empty() ? "Horde"sv : UpstreamConfig.HordeConfig.Name; + + auto ComputeOptions = + zen::CloudCacheClientOptions{.Name = EndpointName, + .ServiceUrl = UpstreamConfig.HordeConfig.Url, + .ComputeCluster = UpstreamConfig.HordeConfig.Cluster, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; + + auto ComputeAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.OAuthUrl, + .OAuthClientId = UpstreamConfig.HordeConfig.OAuthClientId, + .OAuthClientSecret = UpstreamConfig.HordeConfig.OAuthClientSecret, + .OpenIdProvider = UpstreamConfig.HordeConfig.OpenIdProvider, + .AccessToken = UpstreamConfig.HordeConfig.AccessToken}; + + auto StorageOptions = + zen::CloudCacheClientOptions{.Name = EndpointName, + .ServiceUrl = UpstreamConfig.HordeConfig.StorageUrl, + .BlobStoreNamespace = UpstreamConfig.HordeConfig.Namespace, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; + + auto StorageAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.StorageOAuthUrl, + .OAuthClientId = UpstreamConfig.HordeConfig.StorageOAuthClientId, + .OAuthClientSecret = UpstreamConfig.HordeConfig.StorageOAuthClientSecret, + .OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider, + .AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken}; + + m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CidStore, + ComputeOptions, + StorageOptions, + ComputeAuthConfig, + StorageAuthConfig, + *m_AuthMgr); + } + else + { + ZEN_INFO("NOT instantiating compute service (missing Horde or Storage config)"); + } +} +#endif // ZEN_WITH_COMPUTE_SERVICES + +//////////////////////////////////////////////////////////////////////////////// + +class ZenEntryPoint +{ +public: + ZenEntryPoint(ZenServerOptions& ServerOptions); + ZenEntryPoint(const ZenEntryPoint&) = delete; + ZenEntryPoint& operator=(const ZenEntryPoint&) = delete; + int Run(); + +private: + ZenServerOptions& m_ServerOptions; + zen::LockFile m_LockFile; +}; + +ZenEntryPoint::ZenEntryPoint(ZenServerOptions& ServerOptions) : m_ServerOptions(ServerOptions) +{ +} + +#if ZEN_USE_SENTRY +static void +SentryLogFunction(sentry_level_t Level, const char* Message, va_list Args, [[maybe_unused]] void* Userdata) +{ + char LogMessageBuffer[160]; + std::string LogMessage; + const char* MessagePtr = LogMessageBuffer; + + int n = vsnprintf(LogMessageBuffer, sizeof LogMessageBuffer, Message, Args); + + if (n >= int(sizeof LogMessageBuffer)) + { + LogMessage.resize(n + 1); + + n = vsnprintf(LogMessage.data(), LogMessage.size(), Message, Args); + + MessagePtr = LogMessage.c_str(); + } + + switch (Level) + { + case SENTRY_LEVEL_DEBUG: + ConsoleLog().debug("sentry: {}", MessagePtr); + break; + + case SENTRY_LEVEL_INFO: + ConsoleLog().info("sentry: {}", MessagePtr); + break; + + case SENTRY_LEVEL_WARNING: + ConsoleLog().warn("sentry: {}", MessagePtr); + break; + + case SENTRY_LEVEL_ERROR: + ConsoleLog().error("sentry: {}", MessagePtr); + break; + + case SENTRY_LEVEL_FATAL: + ConsoleLog().critical("sentry: {}", MessagePtr); + break; + } +} +#endif + +int +ZenEntryPoint::Run() +{ +#if ZEN_USE_SENTRY + std::string SentryDatabasePath = PathToUtf8(m_ServerOptions.DataDir / ".sentry-native"); + int SentryErrorCode = 0; + if (m_ServerOptions.NoSentry == false) + { + sentry_options_t* SentryOptions = sentry_options_new(); + sentry_options_set_dsn(SentryOptions, "https://[email protected]/5919284"); + if (SentryDatabasePath.starts_with("\\\\?\\")) + { + SentryDatabasePath = SentryDatabasePath.substr(4); + } + sentry_options_set_database_path(SentryOptions, SentryDatabasePath.c_str()); + sentry_options_set_logger(SentryOptions, SentryLogFunction, this); + std::string SentryAttachmentPath = m_ServerOptions.AbsLogFile.string(); + if (SentryAttachmentPath.starts_with("\\\\?\\")) + { + SentryAttachmentPath = SentryAttachmentPath.substr(4); + } + sentry_options_add_attachment(SentryOptions, SentryAttachmentPath.c_str()); + sentry_options_set_release(SentryOptions, ZEN_CFG_VERSION); + // sentry_options_set_debug(SentryOptions, 1); + + SentryErrorCode = sentry_init(SentryOptions); + + auto SentrySink = spdlog::create<utils::sentry_sink>("sentry"); + zen::logging::SetErrorLog(std::move(SentrySink)); + } + + auto _ = zen::MakeGuard([] { + zen::logging::SetErrorLog(std::shared_ptr<spdlog::logger>()); + sentry_close(); + }); +#endif + + auto& ServerOptions = m_ServerOptions; + + try + { + // Mutual exclusion and synchronization + ZenServerState ServerState; + ServerState.Initialize(); + ServerState.Sweep(); + + ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(ServerOptions.BasePort); + + if (Entry) + { + if (ServerOptions.OwnerPid) + { + ConsoleLog().info( + "Looks like there is already a process listening to this port {} (pid: {}), attaching owner pid {} to running instance", + ServerOptions.BasePort, + Entry->Pid, + ServerOptions.OwnerPid); + + Entry->AddSponsorProcess(ServerOptions.OwnerPid); + + std::exit(0); + } + else + { + ConsoleLog().warn("Exiting since there is already a process listening to port {} (pid: {})", + ServerOptions.BasePort, + Entry->Pid); + std::exit(1); + } + } + + std::error_code Ec; + + std::filesystem::path LockFilePath = ServerOptions.DataDir / ".lock"; + + bool IsReady = false; + + auto MakeLockData = [&] { + CbObjectWriter Cbo; + Cbo << "pid" << zen::GetCurrentProcessId() << "data" << PathToUtf8(ServerOptions.DataDir) << "port" << ServerOptions.BasePort + << "session_id" << GetSessionId() << "ready" << IsReady; + return Cbo.Save(); + }; + + m_LockFile.Create(LockFilePath, MakeLockData(), Ec); + + if (Ec) + { + ConsoleLog().warn("ERROR: Unable to grab lock at '{}' (error: '{}')", LockFilePath, Ec.message()); + + std::exit(99); + } + + InitializeLogging(ServerOptions); + +#if ZEN_USE_SENTRY + if (m_ServerOptions.NoSentry == false) + { + if (SentryErrorCode == 0) + { + ZEN_INFO("sentry initialized"); + } + else + { + ZEN_WARN("sentry_init returned failure! (error code: {})", SentryErrorCode); + } + } +#endif + + MaximizeOpenFileCount(); + + ZEN_INFO(ZEN_APP_NAME " - using lock file at '{}'", LockFilePath); + + ZEN_INFO(ZEN_APP_NAME " - starting on port {}, version '{}'", ServerOptions.BasePort, ZEN_CFG_VERSION_BUILD_STRING_FULL); + + Entry = ServerState.Register(ServerOptions.BasePort); + + if (ServerOptions.OwnerPid) + { + Entry->AddSponsorProcess(ServerOptions.OwnerPid); + } + + ZenServer Server; + Server.SetDataRoot(ServerOptions.DataDir); + Server.SetContentRoot(ServerOptions.ContentDir); + Server.SetTestMode(ServerOptions.IsTest); + Server.SetDedicatedMode(ServerOptions.IsDedicated); + + int EffectiveBasePort = Server.Initialize(ServerOptions, Entry); + + Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); + if (EffectiveBasePort != ServerOptions.BasePort) + { + ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); + ServerOptions.BasePort = EffectiveBasePort; + } + + std::unique_ptr<std::thread> ShutdownThread; + std::unique_ptr<zen::NamedEvent> ShutdownEvent; + + zen::ExtendableStringBuilder<64> ShutdownEventName; + ShutdownEventName << "Zen_" << ServerOptions.BasePort << "_Shutdown"; + ShutdownEvent.reset(new zen::NamedEvent{ShutdownEventName}); + + // Monitor shutdown signals + + ShutdownThread.reset(new std::thread{[&] { + ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}'", ShutdownEventName); + if (ShutdownEvent->Wait()) + { + ZEN_INFO("shutdown signal received"); + Server.RequestExit(0); + } + else + { + ZEN_INFO("shutdown signal wait() failed"); + } + }}); + + // If we have a parent process, establish the mechanisms we need + // to be able to communicate readiness with the parent + + Server.SetIsReadyFunc([&] { + IsReady = true; + + m_LockFile.Update(MakeLockData(), Ec); + + if (!ServerOptions.ChildId.empty()) + { + zen::NamedEvent ParentEvent{ServerOptions.ChildId}; + ParentEvent.Set(); + } + }); + + Server.Run(); + Server.Cleanup(); + + ShutdownEvent->Set(); + ShutdownThread->join(); + } + catch (std::exception& e) + { + SPDLOG_CRITICAL("Caught exception in main: {}", e.what()); + } + + ShutdownLogging(); + + return 0; +} + +} // namespace zen + +//////////////////////////////////////////////////////////////////////////////// + +#if ZEN_PLATFORM_WINDOWS + +class ZenWindowsService : public WindowsService +{ +public: + ZenWindowsService(ZenServerOptions& ServerOptions) : m_EntryPoint(ServerOptions) {} + + ZenWindowsService(const ZenWindowsService&) = delete; + ZenWindowsService& operator=(const ZenWindowsService&) = delete; + + virtual int Run() override; + +private: + zen::ZenEntryPoint m_EntryPoint; +}; + +int +ZenWindowsService::Run() +{ + return m_EntryPoint.Run(); +} + +#endif // ZEN_PLATFORM_WINDOWS + +//////////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS +int +test_main(int argc, char** argv) +{ + zen::zencore_forcelinktests(); + zen::zenhttp_forcelinktests(); + zen::zenstore_forcelinktests(); + zen::z$_forcelink(); + zen::z$service_forcelink(); + + zen::logging::InitializeLogging(); + spdlog::set_level(spdlog::level::debug); + + zen::MaximizeOpenFileCount(); + + return ZEN_RUN_TESTS(argc, argv); +} +#endif + +int +main(int argc, char* argv[]) +{ + using namespace zen; + +#if ZEN_USE_MIMALLOC + mi_version(); +#endif + +#if ZEN_WITH_TESTS + if (argc >= 2) + { + if (argv[1] == "test"sv) + { + return test_main(argc, argv); + } + } +#endif + + try + { + ZenServerOptions ServerOptions; + ParseCliOptions(argc, argv, ServerOptions); + + if (!std::filesystem::exists(ServerOptions.DataDir)) + { + ServerOptions.IsFirstRun = true; + std::filesystem::create_directories(ServerOptions.DataDir); + } + +#if ZEN_WITH_TRACE + if (ServerOptions.TraceHost.size()) + { + TraceInit(ServerOptions.TraceHost.c_str(), TraceType::Network); + } + else if (ServerOptions.TraceFile.size()) + { + TraceInit(ServerOptions.TraceFile.c_str(), TraceType::File); + } + else + { + TraceInit(nullptr, TraceType::None); + } +#endif // ZEN_WITH_TRACE + +#if ZEN_PLATFORM_WINDOWS + if (ServerOptions.InstallService) + { + WindowsService::Install(); + + std::exit(0); + } + + if (ServerOptions.UninstallService) + { + WindowsService::Delete(); + + std::exit(0); + } + + ZenWindowsService App(ServerOptions); + return App.ServiceMain(); +#else + if (ServerOptions.InstallService || ServerOptions.UninstallService) + { + throw std::runtime_error("Service mode is not supported on this platform"); + } + + ZenEntryPoint App(ServerOptions); + return App.Run(); +#endif // ZEN_PLATFORM_WINDOWS + } + catch (std::exception& Ex) + { + fprintf(stderr, "ERROR: Caught exception in main: '%s'", Ex.what()); + + return 1; + } +} |