aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/zenserver.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/zenserver.cpp
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'src/zenserver/zenserver.cpp')
-rw-r--r--src/zenserver/zenserver.cpp1261
1 files changed, 1261 insertions, 0 deletions
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
new file mode 100644
index 000000000..635fd04e0
--- /dev/null
+++ b/src/zenserver/zenserver.cpp
@@ -0,0 +1,1261 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/config.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
+#include <zencore/logging.h>
+#include <zencore/refcount.h>
+#include <zencore/scopeguard.h>
+#include <zencore/session.h>
+#include <zencore/string.h>
+#include <zencore/thread.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zenhttp/httpserver.h>
+#include <zenhttp/websocket.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/scrubcontext.h>
+#include <zenutil/basicfile.h>
+#include <zenutil/zenserverprocess.h>
+
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+#endif
+
+#if ZEN_USE_MIMALLOC
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <mimalloc-new-delete.h>
+# include <mimalloc.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+#endif
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+#include <asio.hpp>
+#include <lua.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <exception>
+#include <list>
+#include <optional>
+#include <regex>
+#include <set>
+#include <unordered_map>
+
+//////////////////////////////////////////////////////////////////////////
+// We don't have any doctest code in this file but this is needed to bring
+// in some shared code into the executable
+
+#if ZEN_WITH_TESTS
+# define ZEN_TEST_WITH_RUNNER 1
+# include <zencore/testing.h>
+#endif
+
+//////////////////////////////////////////////////////////////////////////
+
+#include "config.h"
+#include "diag/logging.h"
+
+#if ZEN_PLATFORM_WINDOWS
+# include "windows/service.h"
+#endif
+
+//////////////////////////////////////////////////////////////////////////
+// Sentry
+//
+
+#if !defined(ZEN_USE_SENTRY)
+# if ZEN_PLATFORM_MAC && ZEN_ARCH_ARM64
+// vcpkg's sentry-native port does not support Arm on Mac.
+# define ZEN_USE_SENTRY 0
+# else
+# define ZEN_USE_SENTRY 1
+# endif
+#endif
+
+#if ZEN_USE_SENTRY
+# define SENTRY_BUILD_STATIC 1
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <sentry.h>
+# include <spdlog/sinks/base_sink.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+// Sentry currently does not automatically add all required Windows
+// libraries to the linker when consumed via vcpkg
+
+# if ZEN_PLATFORM_WINDOWS
+# pragma comment(lib, "sentry.lib")
+# pragma comment(lib, "dbghelp.lib")
+# pragma comment(lib, "winhttp.lib")
+# pragma comment(lib, "version.lib")
+# endif
+#endif
+
+//////////////////////////////////////////////////////////////////////////
+// Services
+//
+
+#include "admin/admin.h"
+#include "auth/authmgr.h"
+#include "auth/authservice.h"
+#include "cache/structuredcache.h"
+#include "cache/structuredcachestore.h"
+#include "cidstore.h"
+#include "compute/function.h"
+#include "diag/diagsvcs.h"
+#include "frontend/frontend.h"
+#include "monitoring/httpstats.h"
+#include "monitoring/httpstatus.h"
+#include "objectstore/objectstore.h"
+#include "projectstore/projectstore.h"
+#include "testing/httptest.h"
+#include "upstream/upstream.h"
+#include "zenstore/gc.h"
+
+#define ZEN_APP_NAME "Zen store"
+
+namespace zen {
+
+using namespace std::literals;
+
+namespace utils {
+#if ZEN_USE_SENTRY
+ class sentry_sink final : public spdlog::sinks::base_sink<spdlog::details::null_mutex>
+ {
+ public:
+ sentry_sink() {}
+
+ protected:
+ static constexpr sentry_level_t MapToSentryLevel[spdlog::level::level_enum::n_levels] = {SENTRY_LEVEL_DEBUG,
+ SENTRY_LEVEL_DEBUG,
+ SENTRY_LEVEL_INFO,
+ SENTRY_LEVEL_WARNING,
+ SENTRY_LEVEL_ERROR,
+ SENTRY_LEVEL_FATAL,
+ SENTRY_LEVEL_DEBUG};
+
+ void sink_it_(const spdlog::details::log_msg& msg) override
+ {
+ std::string Message = fmt::format("{}\n{}({}) [{}]", msg.payload, msg.source.filename, msg.source.line, msg.source.funcname);
+ sentry_value_t event = sentry_value_new_message_event(
+ /* level */ MapToSentryLevel[msg.level],
+ /* logger */ nullptr,
+ /* message */ Message.c_str());
+ sentry_event_value_add_stacktrace(event, NULL, 0);
+ sentry_capture_event(event);
+ }
+ void flush_() override {}
+ };
+#endif
+
+ asio::error_code ResolveHostname(asio::io_context& Ctx,
+ std::string_view Host,
+ std::string_view DefaultPort,
+ std::vector<std::string>& OutEndpoints)
+ {
+ std::string_view Port = DefaultPort;
+
+ if (const size_t Idx = Host.find(":"); Idx != std::string_view::npos)
+ {
+ Port = Host.substr(Idx + 1);
+ Host = Host.substr(0, Idx);
+ }
+
+ asio::ip::tcp::resolver Resolver(Ctx);
+
+ asio::error_code ErrorCode;
+ asio::ip::tcp::resolver::results_type Endpoints = Resolver.resolve(Host, Port, ErrorCode);
+
+ if (!ErrorCode)
+ {
+ for (const asio::ip::tcp::endpoint Ep : Endpoints)
+ {
+ OutEndpoints.push_back(fmt::format("http://{}:{}", Ep.address().to_string(), Ep.port()));
+ }
+ }
+
+ return ErrorCode;
+ }
+} // namespace utils
+
+class ZenServer : public IHttpStatusProvider
+{
+public:
+ int Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry)
+ {
+ m_UseSentry = ServerOptions.NoSentry == false;
+ m_ServerEntry = ServerEntry;
+ m_DebugOptionForcedCrash = ServerOptions.ShouldCrash;
+ const int ParentPid = ServerOptions.OwnerPid;
+
+ if (ParentPid)
+ {
+ zen::ProcessHandle OwnerProcess;
+ OwnerProcess.Initialize(ParentPid);
+
+ if (!OwnerProcess.IsValid())
+ {
+ ZEN_WARN("Unable to initialize process handle for specified parent pid #{}", ParentPid);
+
+ // If the pid is not reachable should we just shut down immediately? the intended owner process
+ // could have been killed or somehow crashed already
+ }
+ else
+ {
+ ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid);
+ }
+
+ m_ProcessMonitor.AddPid(ParentPid);
+ }
+
+ // Initialize/check mutex based on base port
+
+ std::string MutexName = fmt::format("zen_{}", ServerOptions.BasePort);
+
+ if (zen::NamedMutex::Exists(MutexName) || ((m_ServerMutex.Create(MutexName) == false)))
+ {
+ throw std::runtime_error(fmt::format("Failed to create mutex '{}' - is another instance already running?", MutexName).c_str());
+ }
+
+ InitializeState(ServerOptions);
+
+ m_HealthService.SetHealthInfo({.DataRoot = m_DataRoot,
+ .AbsLogPath = ServerOptions.AbsLogFile,
+ .HttpServerClass = std::string(ServerOptions.HttpServerClass),
+ .BuildVersion = std::string(ZEN_CFG_VERSION_BUILD_STRING_FULL)});
+
+ // Ok so now we're configured, let's kick things off
+
+ m_Http = zen::CreateHttpServer(ServerOptions.HttpServerClass);
+ int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort);
+
+ if (ServerOptions.WebSocketPort != 0)
+ {
+ const uint32 ThreadCount =
+ ServerOptions.WebSocketThreads > 0 ? uint32_t(ServerOptions.WebSocketThreads) : std::thread::hardware_concurrency();
+
+ m_WebSocket = zen::WebSocketServer::Create(
+ {.Port = gsl::narrow<uint16_t>(ServerOptions.WebSocketPort), .ThreadCount = Max(ThreadCount, uint32_t(16))});
+ }
+
+ // Setup authentication manager
+ {
+ std::string EncryptionKey = ServerOptions.EncryptionKey;
+
+ if (EncryptionKey.empty())
+ {
+ EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456";
+
+ ZEN_WARN("using default encryption key");
+ }
+
+ std::string EncryptionIV = ServerOptions.EncryptionIV;
+
+ if (EncryptionIV.empty())
+ {
+ EncryptionIV = "0123456789abcdef";
+
+ ZEN_WARN("using default encryption initialization vector");
+ }
+
+ m_AuthMgr = AuthMgr::Create({.RootDirectory = m_DataRoot / "auth",
+ .EncryptionKey = AesKey256Bit::FromString(EncryptionKey),
+ .EncryptionIV = AesIV128Bit::FromString(EncryptionIV)});
+
+ for (const ZenOpenIdProviderConfig& OpenIdProvider : ServerOptions.AuthConfig.OpenIdProviders)
+ {
+ m_AuthMgr->AddOpenIdProvider({.Name = OpenIdProvider.Name, .Url = OpenIdProvider.Url, .ClientId = OpenIdProvider.ClientId});
+ }
+ }
+
+ m_AuthService = std::make_unique<zen::HttpAuthService>(*m_AuthMgr);
+ m_Http->RegisterService(*m_AuthService);
+
+ m_Http->RegisterService(m_HealthService);
+ m_Http->RegisterService(m_StatsService);
+ m_Http->RegisterService(m_StatusService);
+ m_StatusService.RegisterHandler("status", *this);
+
+ // Initialize storage and services
+
+ ZEN_INFO("initializing storage");
+
+ zen::CidStoreConfiguration Config;
+ Config.RootDirectory = m_DataRoot / "cas";
+
+ m_CidStore = std::make_unique<zen::CidStore>(m_GcManager);
+ m_CidStore->Initialize(Config);
+ m_CidService.reset(new zen::HttpCidService{*m_CidStore});
+
+ ZEN_INFO("instantiating project service");
+
+ m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager);
+ m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr});
+
+#if ZEN_WITH_COMPUTE_SERVICES
+ if (ServerOptions.ComputeServiceEnabled)
+ {
+ InitializeCompute(ServerOptions);
+ }
+ else
+ {
+ ZEN_INFO("NOT instantiating compute services");
+ }
+#endif // ZEN_WITH_COMPUTE_SERVICES
+
+ if (ServerOptions.StructuredCacheEnabled)
+ {
+ InitializeStructuredCache(ServerOptions);
+ }
+ else
+ {
+ ZEN_INFO("NOT instantiating structured cache service");
+ }
+
+ m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics
+ m_Http->RegisterService(m_TestingService);
+ m_Http->RegisterService(m_AdminService);
+
+ if (m_WebSocket)
+ {
+ m_WebSocket->RegisterService(m_TestingService);
+ }
+
+ if (m_HttpProjectService)
+ {
+ m_Http->RegisterService(*m_HttpProjectService);
+ }
+
+ m_Http->RegisterService(*m_CidService);
+
+#if ZEN_WITH_COMPUTE_SERVICES
+ if (ServerOptions.ComputeServiceEnabled)
+ {
+ if (m_HttpFunctionService != nullptr)
+ {
+ m_Http->RegisterService(*m_HttpFunctionService);
+ }
+ }
+#endif // ZEN_WITH_COMPUTE_SERVICES
+
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot);
+
+ if (m_FrontendService)
+ {
+ m_Http->RegisterService(*m_FrontendService);
+ }
+
+ if (ServerOptions.ObjectStoreEnabled)
+ {
+ ObjectStoreConfig ObjCfg;
+ ObjCfg.RootDirectory = m_DataRoot / "obj";
+ ObjCfg.ServerPort = static_cast<uint16_t>(EffectiveBasePort);
+
+ for (const auto& Bucket : ServerOptions.ObjectStoreConfig.Buckets)
+ {
+ ObjectStoreConfig::BucketConfig NewBucket{.Name = Bucket.Name};
+ NewBucket.Directory = Bucket.Directory.empty() ? (ObjCfg.RootDirectory / Bucket.Name) : Bucket.Directory;
+ ObjCfg.Buckets.push_back(std::move(NewBucket));
+ }
+
+ m_ObjStoreService = std::make_unique<HttpObjectStoreService>(std::move(ObjCfg));
+ m_Http->RegisterService(*m_ObjStoreService);
+ }
+
+ ZEN_INFO("initializing GC, enabled '{}', interval {}s", ServerOptions.GcConfig.Enabled, ServerOptions.GcConfig.IntervalSeconds);
+ zen::GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc",
+ .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds),
+ .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds),
+ .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds),
+ .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects,
+ .Enabled = ServerOptions.GcConfig.Enabled,
+ .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize,
+ .DiskSizeSoftLimit = ServerOptions.GcConfig.Cache.DiskSizeSoftLimit};
+ m_GcScheduler.Initialize(GcConfig);
+
+ return EffectiveBasePort;
+ }
+
+ void InitializeState(const ZenServerOptions& ServerOptions);
+ void InitializeStructuredCache(const ZenServerOptions& ServerOptions);
+ void InitializeCompute(const ZenServerOptions& ServerOptions);
+
+ void Run()
+ {
+ // This is disabled for now, awaiting better scheduling
+ //
+ // Scrub();
+
+ if (m_ProcessMonitor.IsActive())
+ {
+ EnqueueTimer();
+ }
+
+ if (!m_TestMode)
+ {
+ ZEN_INFO("__________ _________ __ ");
+ ZEN_INFO("\\____ /____ ____ / _____// |_ ___________ ____ ");
+ ZEN_INFO(" / // __ \\ / \\ \\_____ \\\\ __\\/ _ \\_ __ \\_/ __ \\ ");
+ ZEN_INFO(" / /\\ ___/| | \\ / \\| | ( <_> ) | \\/\\ ___/ ");
+ ZEN_INFO("/_______ \\___ >___| / /_______ /|__| \\____/|__| \\___ >");
+ ZEN_INFO(" \\/ \\/ \\/ \\/ \\/ ");
+ }
+
+ ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", zen::GetCurrentProcessId());
+
+#if ZEN_USE_SENTRY
+ ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED");
+ if (m_UseSentry)
+ {
+ sentry_clear_modulecache();
+ }
+#endif
+
+ if (m_DebugOptionForcedCrash)
+ {
+ ZEN_DEBUG_BREAK();
+ }
+
+ const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode;
+
+ SetNewState(kRunning);
+
+ OnReady();
+
+ if (m_WebSocket)
+ {
+ m_WebSocket->Run();
+ }
+
+ m_Http->Run(IsInteractiveMode);
+
+ SetNewState(kShuttingDown);
+
+ ZEN_INFO(ZEN_APP_NAME " exiting");
+
+ m_IoContext.stop();
+ if (m_IoRunner.joinable())
+ {
+ m_IoRunner.join();
+ }
+
+ Flush();
+ }
+
+ void RequestExit(int ExitCode)
+ {
+ RequestApplicationExit(ExitCode);
+ m_Http->RequestExit();
+ }
+
+ void Cleanup()
+ {
+ ZEN_INFO(ZEN_APP_NAME " cleaning up");
+ m_GcScheduler.Shutdown();
+ }
+
+ void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; }
+ void SetTestMode(bool State) { m_TestMode = State; }
+ void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; }
+ void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; }
+
+ std::function<void()> m_IsReadyFunc;
+ void SetIsReadyFunc(std::function<void()>&& IsReadyFunc) { m_IsReadyFunc = std::move(IsReadyFunc); }
+ void OnReady();
+
+ void EnsureIoRunner()
+ {
+ if (!m_IoRunner.joinable())
+ {
+ m_IoRunner = std::thread{[this] { m_IoContext.run(); }};
+ }
+ }
+
+ void EnqueueTimer()
+ {
+ m_PidCheckTimer.expires_after(std::chrono::seconds(1));
+ m_PidCheckTimer.async_wait([this](const asio::error_code&) { CheckOwnerPid(); });
+
+ EnsureIoRunner();
+ }
+
+ void CheckOwnerPid()
+ {
+ // Pick up any new "owner" processes
+
+ std::set<uint32_t> AddedPids;
+
+ for (auto& PidEntry : m_ServerEntry->SponsorPids)
+ {
+ if (uint32_t ThisPid = PidEntry.load(std::memory_order_relaxed))
+ {
+ if (PidEntry.compare_exchange_strong(ThisPid, 0))
+ {
+ if (AddedPids.insert(ThisPid).second)
+ {
+ m_ProcessMonitor.AddPid(ThisPid);
+
+ ZEN_INFO("added process with pid #{} as a sponsor process", ThisPid);
+ }
+ }
+ }
+ }
+
+ if (m_ProcessMonitor.IsRunning())
+ {
+ EnqueueTimer();
+ }
+ else
+ {
+ ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone");
+
+ RequestExit(0);
+ }
+ }
+
+ void Scrub()
+ {
+ Stopwatch Timer;
+ ZEN_INFO("Storage validation STARTING");
+
+ ScrubContext Ctx;
+ m_CidStore->Scrub(Ctx);
+ m_ProjectStore->Scrub(Ctx);
+ m_StructuredCacheService->Scrub(Ctx);
+
+ const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
+
+ ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})",
+ NiceTimeSpanMs(ElapsedTimeMs),
+ NiceBytes(Ctx.ScrubbedBytes()),
+ Ctx.ScrubbedChunks(),
+ NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs));
+ }
+
+ void Flush()
+ {
+ if (m_CidStore)
+ m_CidStore->Flush();
+
+ if (m_StructuredCacheService)
+ m_StructuredCacheService->Flush();
+
+ if (m_ProjectStore)
+ m_ProjectStore->Flush();
+ }
+
+ virtual void HandleStatusRequest(HttpServerRequest& Request) override
+ {
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Cbo << "state" << ToString(m_CurrentState);
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+
+private:
+ ZenServerState::ZenServerEntry* m_ServerEntry = nullptr;
+ bool m_IsDedicatedMode = false;
+ bool m_TestMode = false;
+ CbObject m_RootManifest;
+ std::filesystem::path m_DataRoot;
+ std::filesystem::path m_ContentRoot;
+ std::thread m_IoRunner;
+ asio::io_context m_IoContext;
+ asio::steady_timer m_PidCheckTimer{m_IoContext};
+ zen::ProcessMonitor m_ProcessMonitor;
+ zen::NamedMutex m_ServerMutex;
+
+ enum ServerState
+ {
+ kInitializing,
+ kRunning,
+ kShuttingDown
+ } m_CurrentState = kInitializing;
+
+ inline void SetNewState(ServerState NewState) { m_CurrentState = NewState; }
+
+ std::string_view ToString(ServerState Value)
+ {
+ switch (Value)
+ {
+ case kInitializing:
+ return "initializing"sv;
+ case kRunning:
+ return "running"sv;
+ case kShuttingDown:
+ return "shutdown"sv;
+ default:
+ return "unknown"sv;
+ }
+ }
+
+ zen::Ref<zen::HttpServer> m_Http;
+ std::unique_ptr<zen::WebSocketServer> m_WebSocket;
+ std::unique_ptr<zen::AuthMgr> m_AuthMgr;
+ std::unique_ptr<zen::HttpAuthService> m_AuthService;
+ zen::HttpStatusService m_StatusService;
+ zen::HttpStatsService m_StatsService;
+ zen::GcManager m_GcManager;
+ zen::GcScheduler m_GcScheduler{m_GcManager};
+ std::unique_ptr<zen::CidStore> m_CidStore;
+ std::unique_ptr<zen::ZenCacheStore> m_CacheStore;
+ zen::HttpTestService m_TestService;
+ zen::HttpTestingService m_TestingService;
+ std::unique_ptr<zen::HttpCidService> m_CidService;
+ zen::RefPtr<zen::ProjectStore> m_ProjectStore;
+ std::unique_ptr<zen::HttpProjectService> m_HttpProjectService;
+ std::unique_ptr<zen::UpstreamCache> m_UpstreamCache;
+ std::unique_ptr<zen::HttpUpstreamService> m_UpstreamService;
+ std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService;
+ zen::HttpAdminService m_AdminService{m_GcScheduler};
+ zen::HttpHealthService m_HealthService;
+#if ZEN_WITH_COMPUTE_SERVICES
+ std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService;
+#endif // ZEN_WITH_COMPUTE_SERVICES
+ std::unique_ptr<zen::HttpFrontendService> m_FrontendService;
+ std::unique_ptr<zen::HttpObjectStoreService> m_ObjStoreService;
+
+ bool m_DebugOptionForcedCrash = false;
+ bool m_UseSentry = false;
+};
+
+void
+ZenServer::OnReady()
+{
+ m_ServerEntry->SignalReady();
+
+ if (m_IsReadyFunc)
+ {
+ m_IsReadyFunc();
+ }
+}
+
+void
+ZenServer::InitializeState(const ZenServerOptions& ServerOptions)
+{
+ // Check root manifest to deal with schema versioning
+
+ bool WipeState = false;
+ std::string WipeReason = "Unspecified";
+
+ bool UpdateManifest = false;
+ std::filesystem::path ManifestPath = m_DataRoot / "root_manifest";
+ FileContents ManifestData = zen::ReadFile(ManifestPath);
+
+ if (ManifestData.ErrorCode)
+ {
+ if (ServerOptions.IsFirstRun)
+ {
+ ZEN_INFO("Initializing state at '{}'", m_DataRoot);
+
+ UpdateManifest = true;
+ }
+ else
+ {
+ WipeState = true;
+ WipeReason = fmt::format("No manifest present at '{}'", ManifestPath);
+ }
+ }
+ else
+ {
+ IoBuffer Manifest = ManifestData.Flatten();
+
+ if (CbValidateError ValidationResult = ValidateCompactBinary(Manifest, CbValidateMode::All);
+ ValidationResult != CbValidateError::None)
+ {
+ ZEN_WARN("Manifest validation failed: {}, state will be wiped", uint32_t(ValidationResult));
+
+ WipeState = true;
+ WipeReason = fmt::format("Validation of manifest at '{}' failed: {}", ManifestPath, uint32_t(ValidationResult));
+ }
+ else
+ {
+ m_RootManifest = LoadCompactBinaryObject(Manifest);
+
+ const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0);
+
+ if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION)
+ {
+ WipeState = true;
+ WipeReason = fmt::format("Manifest schema version: {}, differs from required: {}", ManifestVersion, ZEN_CFG_SCHEMA_VERSION);
+ }
+ }
+ }
+
+ // Release any open handles so we can overwrite the manifest
+ ManifestData = {};
+
+ // Handle any state wipe
+
+ if (WipeState)
+ {
+ ZEN_WARN("Wiping state at '{}' - reason: '{}'", m_DataRoot, WipeReason);
+
+ std::error_code Ec;
+ for (const std::filesystem::directory_entry& DirEntry : std::filesystem::directory_iterator{m_DataRoot, Ec})
+ {
+ if (DirEntry.is_directory() && (DirEntry.path().filename() != "logs"))
+ {
+ ZEN_INFO("Deleting '{}'", DirEntry.path());
+
+ std::filesystem::remove_all(DirEntry.path(), Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Delete of '{}' returned error: '{}'", DirEntry.path(), Ec.message());
+ }
+ }
+ }
+
+ ZEN_INFO("Wiped all directories in data root");
+
+ UpdateManifest = true;
+ }
+
+ if (UpdateManifest)
+ {
+ // Write new manifest
+
+ const DateTime Now = DateTime::Now();
+
+ CbObjectWriter Cbo;
+ Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << Now << "updated" << Now << "state_id" << Oid::NewOid();
+
+ m_RootManifest = Cbo.Save();
+
+ WriteFile(ManifestPath, m_RootManifest.GetBuffer().AsIoBuffer());
+ }
+}
+
+void
+ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
+{
+ using namespace std::literals;
+
+ ZEN_INFO("instantiating structured cache service");
+ m_CacheStore = std::make_unique<ZenCacheStore>(
+ m_GcManager,
+ ZenCacheStore::Configuration{.BasePath = m_DataRoot / "cache", .AllowAutomaticCreationOfNamespaces = true});
+
+ const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;
+
+ zen::UpstreamCacheOptions UpstreamOptions;
+ UpstreamOptions.ReadUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
+ UpstreamOptions.WriteUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
+
+ if (UpstreamConfig.UpstreamThreadCount < 32)
+ {
+ UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
+ }
+
+ m_UpstreamCache = zen::UpstreamCache::Create(UpstreamOptions, *m_CacheStore, *m_CidStore);
+ m_UpstreamService = std::make_unique<HttpUpstreamService>(*m_UpstreamCache, *m_AuthMgr);
+ m_UpstreamCache->Initialize();
+
+ if (ServerOptions.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
+ {
+ // Zen upstream
+ {
+ std::vector<std::string> ZenUrls = UpstreamConfig.ZenConfig.Urls;
+ if (!UpstreamConfig.ZenConfig.Dns.empty())
+ {
+ for (const std::string& Dns : UpstreamConfig.ZenConfig.Dns)
+ {
+ if (!Dns.empty())
+ {
+ const asio::error_code Err = zen::utils::ResolveHostname(m_IoContext, Dns, "1337"sv, ZenUrls);
+ if (Err)
+ {
+ ZEN_ERROR("resolve FAILED, reason '{}'", Err.message());
+ }
+ }
+ }
+ }
+
+ std::erase_if(ZenUrls, [](const auto& Url) { return Url.empty(); });
+
+ if (!ZenUrls.empty())
+ {
+ const auto ZenEndpointName = UpstreamConfig.ZenConfig.Name.empty() ? "Zen"sv : UpstreamConfig.ZenConfig.Name;
+
+ std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::UpstreamEndpoint::CreateZenEndpoint(
+ {.Name = ZenEndpointName,
+ .Urls = ZenUrls,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)});
+
+ m_UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
+ }
+ }
+
+ // Jupiter upstream
+ if (UpstreamConfig.JupiterConfig.Url.empty() == false)
+ {
+ std::string_view EndpointName = UpstreamConfig.JupiterConfig.Name.empty() ? "Jupiter"sv : UpstreamConfig.JupiterConfig.Name;
+
+ auto Options =
+ zen::CloudCacheClientOptions{.Name = EndpointName,
+ .ServiceUrl = UpstreamConfig.JupiterConfig.Url,
+ .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace,
+ .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
+
+ auto AuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl,
+ .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId,
+ .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret,
+ .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider,
+ .AccessToken = UpstreamConfig.JupiterConfig.AccessToken};
+
+ std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint =
+ zen::UpstreamEndpoint::CreateJupiterEndpoint(Options, AuthConfig, *m_AuthMgr);
+
+ m_UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
+ }
+ }
+
+ m_StructuredCacheService =
+ std::make_unique<HttpStructuredCacheService>(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, *m_UpstreamCache);
+
+ m_Http->RegisterService(*m_StructuredCacheService);
+ m_Http->RegisterService(*m_UpstreamService);
+}
+
+#if ZEN_WITH_COMPUTE_SERVICES
+void
+ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions)
+{
+ ServerOptions;
+ const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;
+
+ // Horde compute upstream
+ if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.HordeConfig.StorageUrl.empty() == false)
+ {
+ ZEN_INFO("instantiating compute service");
+
+ std::string_view EndpointName = UpstreamConfig.HordeConfig.Name.empty() ? "Horde"sv : UpstreamConfig.HordeConfig.Name;
+
+ auto ComputeOptions =
+ zen::CloudCacheClientOptions{.Name = EndpointName,
+ .ServiceUrl = UpstreamConfig.HordeConfig.Url,
+ .ComputeCluster = UpstreamConfig.HordeConfig.Cluster,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
+
+ auto ComputeAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.OAuthUrl,
+ .OAuthClientId = UpstreamConfig.HordeConfig.OAuthClientId,
+ .OAuthClientSecret = UpstreamConfig.HordeConfig.OAuthClientSecret,
+ .OpenIdProvider = UpstreamConfig.HordeConfig.OpenIdProvider,
+ .AccessToken = UpstreamConfig.HordeConfig.AccessToken};
+
+ auto StorageOptions =
+ zen::CloudCacheClientOptions{.Name = EndpointName,
+ .ServiceUrl = UpstreamConfig.HordeConfig.StorageUrl,
+ .BlobStoreNamespace = UpstreamConfig.HordeConfig.Namespace,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
+
+ auto StorageAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.StorageOAuthUrl,
+ .OAuthClientId = UpstreamConfig.HordeConfig.StorageOAuthClientId,
+ .OAuthClientSecret = UpstreamConfig.HordeConfig.StorageOAuthClientSecret,
+ .OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider,
+ .AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken};
+
+ m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CidStore,
+ ComputeOptions,
+ StorageOptions,
+ ComputeAuthConfig,
+ StorageAuthConfig,
+ *m_AuthMgr);
+ }
+ else
+ {
+ ZEN_INFO("NOT instantiating compute service (missing Horde or Storage config)");
+ }
+}
+#endif // ZEN_WITH_COMPUTE_SERVICES
+
+////////////////////////////////////////////////////////////////////////////////
+
+class ZenEntryPoint
+{
+public:
+ ZenEntryPoint(ZenServerOptions& ServerOptions);
+ ZenEntryPoint(const ZenEntryPoint&) = delete;
+ ZenEntryPoint& operator=(const ZenEntryPoint&) = delete;
+ int Run();
+
+private:
+ ZenServerOptions& m_ServerOptions;
+ zen::LockFile m_LockFile;
+};
+
+ZenEntryPoint::ZenEntryPoint(ZenServerOptions& ServerOptions) : m_ServerOptions(ServerOptions)
+{
+}
+
+#if ZEN_USE_SENTRY
+static void
+SentryLogFunction(sentry_level_t Level, const char* Message, va_list Args, [[maybe_unused]] void* Userdata)
+{
+ char LogMessageBuffer[160];
+ std::string LogMessage;
+ const char* MessagePtr = LogMessageBuffer;
+
+ int n = vsnprintf(LogMessageBuffer, sizeof LogMessageBuffer, Message, Args);
+
+ if (n >= int(sizeof LogMessageBuffer))
+ {
+ LogMessage.resize(n + 1);
+
+ n = vsnprintf(LogMessage.data(), LogMessage.size(), Message, Args);
+
+ MessagePtr = LogMessage.c_str();
+ }
+
+ switch (Level)
+ {
+ case SENTRY_LEVEL_DEBUG:
+ ConsoleLog().debug("sentry: {}", MessagePtr);
+ break;
+
+ case SENTRY_LEVEL_INFO:
+ ConsoleLog().info("sentry: {}", MessagePtr);
+ break;
+
+ case SENTRY_LEVEL_WARNING:
+ ConsoleLog().warn("sentry: {}", MessagePtr);
+ break;
+
+ case SENTRY_LEVEL_ERROR:
+ ConsoleLog().error("sentry: {}", MessagePtr);
+ break;
+
+ case SENTRY_LEVEL_FATAL:
+ ConsoleLog().critical("sentry: {}", MessagePtr);
+ break;
+ }
+}
+#endif
+
+int
+ZenEntryPoint::Run()
+{
+#if ZEN_USE_SENTRY
+ std::string SentryDatabasePath = PathToUtf8(m_ServerOptions.DataDir / ".sentry-native");
+ int SentryErrorCode = 0;
+ if (m_ServerOptions.NoSentry == false)
+ {
+ sentry_options_t* SentryOptions = sentry_options_new();
+ sentry_options_set_dsn(SentryOptions, "https://[email protected]/5919284");
+ if (SentryDatabasePath.starts_with("\\\\?\\"))
+ {
+ SentryDatabasePath = SentryDatabasePath.substr(4);
+ }
+ sentry_options_set_database_path(SentryOptions, SentryDatabasePath.c_str());
+ sentry_options_set_logger(SentryOptions, SentryLogFunction, this);
+ std::string SentryAttachmentPath = m_ServerOptions.AbsLogFile.string();
+ if (SentryAttachmentPath.starts_with("\\\\?\\"))
+ {
+ SentryAttachmentPath = SentryAttachmentPath.substr(4);
+ }
+ sentry_options_add_attachment(SentryOptions, SentryAttachmentPath.c_str());
+ sentry_options_set_release(SentryOptions, ZEN_CFG_VERSION);
+ // sentry_options_set_debug(SentryOptions, 1);
+
+ SentryErrorCode = sentry_init(SentryOptions);
+
+ auto SentrySink = spdlog::create<utils::sentry_sink>("sentry");
+ zen::logging::SetErrorLog(std::move(SentrySink));
+ }
+
+ auto _ = zen::MakeGuard([] {
+ zen::logging::SetErrorLog(std::shared_ptr<spdlog::logger>());
+ sentry_close();
+ });
+#endif
+
+ auto& ServerOptions = m_ServerOptions;
+
+ try
+ {
+ // Mutual exclusion and synchronization
+ ZenServerState ServerState;
+ ServerState.Initialize();
+ ServerState.Sweep();
+
+ ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(ServerOptions.BasePort);
+
+ if (Entry)
+ {
+ if (ServerOptions.OwnerPid)
+ {
+ ConsoleLog().info(
+ "Looks like there is already a process listening to this port {} (pid: {}), attaching owner pid {} to running instance",
+ ServerOptions.BasePort,
+ Entry->Pid,
+ ServerOptions.OwnerPid);
+
+ Entry->AddSponsorProcess(ServerOptions.OwnerPid);
+
+ std::exit(0);
+ }
+ else
+ {
+ ConsoleLog().warn("Exiting since there is already a process listening to port {} (pid: {})",
+ ServerOptions.BasePort,
+ Entry->Pid);
+ std::exit(1);
+ }
+ }
+
+ std::error_code Ec;
+
+ std::filesystem::path LockFilePath = ServerOptions.DataDir / ".lock";
+
+ bool IsReady = false;
+
+ auto MakeLockData = [&] {
+ CbObjectWriter Cbo;
+ Cbo << "pid" << zen::GetCurrentProcessId() << "data" << PathToUtf8(ServerOptions.DataDir) << "port" << ServerOptions.BasePort
+ << "session_id" << GetSessionId() << "ready" << IsReady;
+ return Cbo.Save();
+ };
+
+ m_LockFile.Create(LockFilePath, MakeLockData(), Ec);
+
+ if (Ec)
+ {
+ ConsoleLog().warn("ERROR: Unable to grab lock at '{}' (error: '{}')", LockFilePath, Ec.message());
+
+ std::exit(99);
+ }
+
+ InitializeLogging(ServerOptions);
+
+#if ZEN_USE_SENTRY
+ if (m_ServerOptions.NoSentry == false)
+ {
+ if (SentryErrorCode == 0)
+ {
+ ZEN_INFO("sentry initialized");
+ }
+ else
+ {
+ ZEN_WARN("sentry_init returned failure! (error code: {})", SentryErrorCode);
+ }
+ }
+#endif
+
+ MaximizeOpenFileCount();
+
+ ZEN_INFO(ZEN_APP_NAME " - using lock file at '{}'", LockFilePath);
+
+ ZEN_INFO(ZEN_APP_NAME " - starting on port {}, version '{}'", ServerOptions.BasePort, ZEN_CFG_VERSION_BUILD_STRING_FULL);
+
+ Entry = ServerState.Register(ServerOptions.BasePort);
+
+ if (ServerOptions.OwnerPid)
+ {
+ Entry->AddSponsorProcess(ServerOptions.OwnerPid);
+ }
+
+ ZenServer Server;
+ Server.SetDataRoot(ServerOptions.DataDir);
+ Server.SetContentRoot(ServerOptions.ContentDir);
+ Server.SetTestMode(ServerOptions.IsTest);
+ Server.SetDedicatedMode(ServerOptions.IsDedicated);
+
+ int EffectiveBasePort = Server.Initialize(ServerOptions, Entry);
+
+ Entry->EffectiveListenPort = uint16_t(EffectiveBasePort);
+ if (EffectiveBasePort != ServerOptions.BasePort)
+ {
+ ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort);
+ ServerOptions.BasePort = EffectiveBasePort;
+ }
+
+ std::unique_ptr<std::thread> ShutdownThread;
+ std::unique_ptr<zen::NamedEvent> ShutdownEvent;
+
+ zen::ExtendableStringBuilder<64> ShutdownEventName;
+ ShutdownEventName << "Zen_" << ServerOptions.BasePort << "_Shutdown";
+ ShutdownEvent.reset(new zen::NamedEvent{ShutdownEventName});
+
+ // Monitor shutdown signals
+
+ ShutdownThread.reset(new std::thread{[&] {
+ ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}'", ShutdownEventName);
+ if (ShutdownEvent->Wait())
+ {
+ ZEN_INFO("shutdown signal received");
+ Server.RequestExit(0);
+ }
+ else
+ {
+ ZEN_INFO("shutdown signal wait() failed");
+ }
+ }});
+
+ // If we have a parent process, establish the mechanisms we need
+ // to be able to communicate readiness with the parent
+
+ Server.SetIsReadyFunc([&] {
+ IsReady = true;
+
+ m_LockFile.Update(MakeLockData(), Ec);
+
+ if (!ServerOptions.ChildId.empty())
+ {
+ zen::NamedEvent ParentEvent{ServerOptions.ChildId};
+ ParentEvent.Set();
+ }
+ });
+
+ Server.Run();
+ Server.Cleanup();
+
+ ShutdownEvent->Set();
+ ShutdownThread->join();
+ }
+ catch (std::exception& e)
+ {
+ SPDLOG_CRITICAL("Caught exception in main: {}", e.what());
+ }
+
+ ShutdownLogging();
+
+ return 0;
+}
+
+} // namespace zen
+
+////////////////////////////////////////////////////////////////////////////////
+
+#if ZEN_PLATFORM_WINDOWS
+
+class ZenWindowsService : public WindowsService
+{
+public:
+ ZenWindowsService(ZenServerOptions& ServerOptions) : m_EntryPoint(ServerOptions) {}
+
+ ZenWindowsService(const ZenWindowsService&) = delete;
+ ZenWindowsService& operator=(const ZenWindowsService&) = delete;
+
+ virtual int Run() override;
+
+private:
+ zen::ZenEntryPoint m_EntryPoint;
+};
+
+int
+ZenWindowsService::Run()
+{
+ return m_EntryPoint.Run();
+}
+
+#endif // ZEN_PLATFORM_WINDOWS
+
+////////////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+int
+test_main(int argc, char** argv)
+{
+ zen::zencore_forcelinktests();
+ zen::zenhttp_forcelinktests();
+ zen::zenstore_forcelinktests();
+ zen::z$_forcelink();
+ zen::z$service_forcelink();
+
+ zen::logging::InitializeLogging();
+ spdlog::set_level(spdlog::level::debug);
+
+ zen::MaximizeOpenFileCount();
+
+ return ZEN_RUN_TESTS(argc, argv);
+}
+#endif
+
+int
+main(int argc, char* argv[])
+{
+ using namespace zen;
+
+#if ZEN_USE_MIMALLOC
+ mi_version();
+#endif
+
+#if ZEN_WITH_TESTS
+ if (argc >= 2)
+ {
+ if (argv[1] == "test"sv)
+ {
+ return test_main(argc, argv);
+ }
+ }
+#endif
+
+ try
+ {
+ ZenServerOptions ServerOptions;
+ ParseCliOptions(argc, argv, ServerOptions);
+
+ if (!std::filesystem::exists(ServerOptions.DataDir))
+ {
+ ServerOptions.IsFirstRun = true;
+ std::filesystem::create_directories(ServerOptions.DataDir);
+ }
+
+#if ZEN_WITH_TRACE
+ if (ServerOptions.TraceHost.size())
+ {
+ TraceInit(ServerOptions.TraceHost.c_str(), TraceType::Network);
+ }
+ else if (ServerOptions.TraceFile.size())
+ {
+ TraceInit(ServerOptions.TraceFile.c_str(), TraceType::File);
+ }
+ else
+ {
+ TraceInit(nullptr, TraceType::None);
+ }
+#endif // ZEN_WITH_TRACE
+
+#if ZEN_PLATFORM_WINDOWS
+ if (ServerOptions.InstallService)
+ {
+ WindowsService::Install();
+
+ std::exit(0);
+ }
+
+ if (ServerOptions.UninstallService)
+ {
+ WindowsService::Delete();
+
+ std::exit(0);
+ }
+
+ ZenWindowsService App(ServerOptions);
+ return App.ServiceMain();
+#else
+ if (ServerOptions.InstallService || ServerOptions.UninstallService)
+ {
+ throw std::runtime_error("Service mode is not supported on this platform");
+ }
+
+ ZenEntryPoint App(ServerOptions);
+ return App.Run();
+#endif // ZEN_PLATFORM_WINDOWS
+ }
+ catch (std::exception& Ex)
+ {
+ fprintf(stderr, "ERROR: Caught exception in main: '%s'", Ex.what());
+
+ return 1;
+ }
+}