diff options
| author | Stefan Boberg <[email protected]> | 2023-10-06 10:45:48 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-06 10:45:48 +0200 |
| commit | fb70324d37282910d7fa3047f4ec290d0c5a94b1 (patch) | |
| tree | a1bc82fcfdb96eb5b461742b613fcbb63f816a54 /src/zenserver/zenserver.cpp | |
| parent | reject known bad bucket names in structured cache (#452) (diff) | |
| download | zen-fb70324d37282910d7fa3047f4ec290d0c5a94b1.tar.xz zen-fb70324d37282910d7fa3047f4ec290d0c5a94b1.zip | |
zenserver project restructuring (#442)
Diffstat (limited to 'src/zenserver/zenserver.cpp')
| -rw-r--r-- | src/zenserver/zenserver.cpp | 1487 |
1 files changed, 410 insertions, 1077 deletions
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 0555fab79..a8273f6c9 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -1,5 +1,9 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include "zenserver.h" + +#include "sentryintegration.h" + #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/config.h> @@ -34,13 +38,6 @@ # include <pwd.h> #endif -#if ZEN_USE_MIMALLOC -ZEN_THIRD_PARTY_INCLUDES_START -# include <mimalloc-new-delete.h> -# include <mimalloc.h> -ZEN_THIRD_PARTY_INCLUDES_END -#endif - ZEN_THIRD_PARTY_INCLUDES_START #include <fmt/format.h> #include <asio.hpp> @@ -48,181 +45,23 @@ ZEN_THIRD_PARTY_INCLUDES_START ZEN_THIRD_PARTY_INCLUDES_END #include <exception> -#include <list> -#include <optional> -#include <regex> #include <set> -#include <unordered_map> - -////////////////////////////////////////////////////////////////////////// -// We don't have any doctest code in this file but this is needed to bring -// in some shared code into the executable - -#if ZEN_WITH_TESTS -# define ZEN_TEST_WITH_RUNNER 1 -# include <zencore/testing.h> -#endif ////////////////////////////////////////////////////////////////////////// #include "config.h" #include "diag/logging.h" - -#if ZEN_PLATFORM_WINDOWS -# include "windows/service.h" -#endif - -////////////////////////////////////////////////////////////////////////// -// Sentry -// - -#if !defined(ZEN_USE_SENTRY) -# if ZEN_PLATFORM_MAC && ZEN_ARCH_ARM64 -// vcpkg's sentry-native port does not support Arm on Mac. -# define ZEN_USE_SENTRY 0 -# else -# define ZEN_USE_SENTRY 1 -# endif -#endif - -#if ZEN_USE_SENTRY -# define SENTRY_BUILD_STATIC 1 -ZEN_THIRD_PARTY_INCLUDES_START -# include <sentry.h> -# include <spdlog/sinks/base_sink.h> -ZEN_THIRD_PARTY_INCLUDES_END - -// Sentry currently does not automatically add all required Windows -// libraries to the linker when consumed via vcpkg - -# if ZEN_PLATFORM_WINDOWS -# pragma comment(lib, "sentry.lib") -# pragma comment(lib, "dbghelp.lib") -# pragma comment(lib, "winhttp.lib") -# pragma comment(lib, "version.lib") -# endif -#endif - -////////////////////////////////////////////////////////////////////////// -// Services -// - -#include <zenhttp/auth/authmgr.h> -#include <zenhttp/auth/authservice.h> -#include <zenhttp/diagsvcs.h> -#include <zenhttp/httpstats.h> -#include <zenhttp/httpstatus.h> -#include <zenhttp/httptest.h> -#include <zenstore/gc.h> -#include "admin/admin.h" -#include "cache/httpstructuredcache.h" -#include "cache/structuredcachestore.h" -#include "compute/function.h" -#include "frontend/frontend.h" -#include "httpcidstore.h" -#include "objectstore/objectstore.h" -#include "projectstore/httpprojectstore.h" -#include "projectstore/projectstore.h" -#include "upstream/upstream.h" -#include "vfs/vfsservice.h" - -#define ZEN_APP_NAME "Zen store" +#include "sentryintegration.h" namespace zen { -using namespace std::literals; - namespace utils { - static std::atomic_uint32_t SignalCounter[NSIG] = {0}; - - static void SignalCallbackHandler(int SigNum) - { - if (SigNum >= 0 && SigNum < NSIG) - { - SignalCounter[SigNum].fetch_add(1); - } - } - -#if ZEN_USE_SENTRY - class sentry_sink final : public spdlog::sinks::base_sink<spdlog::details::null_mutex> - { - public: - sentry_sink() {} - - protected: - static constexpr sentry_level_t MapToSentryLevel[spdlog::level::level_enum::n_levels] = {SENTRY_LEVEL_DEBUG, - SENTRY_LEVEL_DEBUG, - SENTRY_LEVEL_INFO, - SENTRY_LEVEL_WARNING, - SENTRY_LEVEL_ERROR, - SENTRY_LEVEL_FATAL, - SENTRY_LEVEL_DEBUG}; - - void sink_it_(const spdlog::details::log_msg& msg) override - { - try - { - std::string Message = - fmt::format("{}\n{}({}) [{}]", msg.payload, msg.source.filename, msg.source.line, msg.source.funcname); - sentry_value_t event = sentry_value_new_message_event( - /* level */ MapToSentryLevel[msg.level], - /* logger */ nullptr, - /* message */ Message.c_str()); - sentry_event_value_add_stacktrace(event, NULL, 0); - sentry_capture_event(event); - } - catch (std::exception&) - { - // If our logging with Message formatting fails we do a non-allocating version and just post the msg.payload raw - char TmpBuffer[256]; - size_t MaxCopy = Min<size_t>(msg.payload.size(), size_t(255)); - memcpy(TmpBuffer, msg.payload.data(), MaxCopy); - TmpBuffer[MaxCopy] = '\0'; - sentry_value_t event = sentry_value_new_message_event( - /* level */ SENTRY_LEVEL_ERROR, - /* logger */ nullptr, - /* message */ TmpBuffer); - sentry_event_value_add_stacktrace(event, NULL, 0); - sentry_capture_event(event); - } - } - void flush_() override {} - }; + extern std::atomic_uint32_t SignalCounter[NSIG]; +} - struct SentryAssertImpl : AssertImpl - { - ZEN_FORCENOINLINE ZEN_DEBUG_SECTION SentryAssertImpl() : PrevAssertImpl(CurrentAssertImpl) { CurrentAssertImpl = this; } - virtual ZEN_FORCENOINLINE ZEN_DEBUG_SECTION ~SentryAssertImpl() { CurrentAssertImpl = PrevAssertImpl; } - virtual void ZEN_FORCENOINLINE ZEN_DEBUG_SECTION OnAssert(const char* Filename, - int LineNumber, - const char* FunctionName, - const char* Msg) - { - try - { - std::string Message = fmt::format("ASSERT {}:({}) [{}]\n\"{}\"", Filename, LineNumber, FunctionName, Msg); - sentry_value_t event = sentry_value_new_message_event( - /* level */ SENTRY_LEVEL_ERROR, - /* logger */ nullptr, - /* message */ Message.c_str()); - sentry_event_value_add_stacktrace(event, NULL, 0); - sentry_capture_event(event); - } - catch (std::exception&) - { - // If our logging with Message formatting fails we do a non-allocating version and just post the Msg raw - sentry_value_t event = sentry_value_new_message_event( - /* level */ SENTRY_LEVEL_ERROR, - /* logger */ nullptr, - /* message */ Msg); - sentry_event_value_add_stacktrace(event, NULL, 0); - sentry_capture_event(event); - } - } - AssertImpl* PrevAssertImpl; - }; -#endif +using namespace std::literals; +namespace utils { asio::error_code ResolveHostname(asio::io_context& Ctx, std::string_view Host, std::string_view DefaultPort, @@ -253,561 +92,232 @@ namespace utils { } } // namespace utils -class ZenServer : public IHttpStatusProvider -{ -public: - ~ZenServer() {} - - int Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry) - { - m_UseSentry = ServerOptions.NoSentry == false; - m_ServerEntry = ServerEntry; - m_DebugOptionForcedCrash = ServerOptions.ShouldCrash; - const int ParentPid = ServerOptions.OwnerPid; - - if (ParentPid) - { - zen::ProcessHandle OwnerProcess; - OwnerProcess.Initialize(ParentPid); - - if (!OwnerProcess.IsValid()) - { - ZEN_WARN("Unable to initialize process handle for specified parent pid #{}", ParentPid); - - // If the pid is not reachable should we just shut down immediately? the intended owner process - // could have been killed or somehow crashed already - } - else - { - ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid); - } - - m_ProcessMonitor.AddPid(ParentPid); - } - - // Initialize/check mutex based on base port - - std::string MutexName = fmt::format("zen_{}", ServerOptions.BasePort); - - if (zen::NamedMutex::Exists(MutexName) || ((m_ServerMutex.Create(MutexName) == false))) - { - throw std::runtime_error(fmt::format("Failed to create mutex '{}' - is another instance already running?", MutexName).c_str()); - } - - InitializeState(ServerOptions); - - m_JobQueue = MakeJobQueue(8, "backgroundjobs"); - - m_HealthService.SetHealthInfo({.DataRoot = m_DataRoot, - .AbsLogPath = ServerOptions.AbsLogFile, - .HttpServerClass = std::string(ServerOptions.HttpServerConfig.ServerClass), - .BuildVersion = std::string(ZEN_CFG_VERSION_BUILD_STRING_FULL)}); - - // Ok so now we're configured, let's kick things off - - m_Http = zen::CreateHttpServer(ServerOptions.HttpServerConfig); - int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort); - - // Setup authentication manager - { - std::string EncryptionKey = ServerOptions.EncryptionKey; - - if (EncryptionKey.empty()) - { - EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456"; - - ZEN_WARN("using default encryption key"); - } - - std::string EncryptionIV = ServerOptions.EncryptionIV; - - if (EncryptionIV.empty()) - { - EncryptionIV = "0123456789abcdef"; - - ZEN_WARN("using default encryption initialization vector"); - } - - m_AuthMgr = AuthMgr::Create({.RootDirectory = m_DataRoot / "auth", - .EncryptionKey = AesKey256Bit::FromString(EncryptionKey), - .EncryptionIV = AesIV128Bit::FromString(EncryptionIV)}); - - for (const ZenOpenIdProviderConfig& OpenIdProvider : ServerOptions.AuthConfig.OpenIdProviders) - { - m_AuthMgr->AddOpenIdProvider({.Name = OpenIdProvider.Name, .Url = OpenIdProvider.Url, .ClientId = OpenIdProvider.ClientId}); - } - } - - m_AuthService = std::make_unique<zen::HttpAuthService>(*m_AuthMgr); - m_Http->RegisterService(*m_AuthService); - - m_Http->RegisterService(m_HealthService); - m_Http->RegisterService(m_StatsService); - m_Http->RegisterService(m_StatusService); - m_StatusService.RegisterHandler("status", *this); +////////////////////////////////////////////////////////////////////////// - // Initialize storage and services +ZenServer::ZenServer() +{ +} - ZEN_INFO("initializing storage"); +ZenServer::~ZenServer() +{ +} - zen::CidStoreConfiguration Config; - Config.RootDirectory = m_DataRoot / "cas"; +void +ZenServer::OnReady() +{ + m_ServerEntry->SignalReady(); - m_CidStore = std::make_unique<zen::CidStore>(m_GcManager); - m_CidStore->Initialize(Config); - m_CidService.reset(new zen::HttpCidService{*m_CidStore}); + if (m_IsReadyFunc) + { + m_IsReadyFunc(); + } +} - ZEN_INFO("instantiating project service"); +int +ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry) +{ + m_UseSentry = ServerOptions.NoSentry == false; + m_ServerEntry = ServerEntry; + m_DebugOptionForcedCrash = ServerOptions.ShouldCrash; + const int ParentPid = ServerOptions.OwnerPid; - m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue); - m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr}); + if (ParentPid) + { + ProcessHandle OwnerProcess; + OwnerProcess.Initialize(ParentPid); -#if ZEN_WITH_COMPUTE_SERVICES - if (ServerOptions.ComputeServiceEnabled) + if (!OwnerProcess.IsValid()) { - InitializeCompute(ServerOptions); - } - else - { - ZEN_INFO("NOT instantiating compute services"); - } -#endif // ZEN_WITH_COMPUTE_SERVICES + ZEN_WARN("Unable to initialize process handle for specified parent pid #{}", ParentPid); - if (ServerOptions.StructuredCacheConfig.Enabled) - { - InitializeStructuredCache(ServerOptions); + // If the pid is not reachable should we just shut down immediately? the intended owner process + // could have been killed or somehow crashed already } else { - ZEN_INFO("NOT instantiating structured cache service"); + ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid); } - m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics - -#if ZEN_WITH_TESTS - m_Http->RegisterService(m_TestingService); -#endif + m_ProcessMonitor.AddPid(ParentPid); + } - if (m_HttpProjectService) - { - m_Http->RegisterService(*m_HttpProjectService); - } + // Initialize/check mutex based on base port - m_Http->RegisterService(*m_CidService); + std::string MutexName = fmt::format("zen_{}", ServerOptions.BasePort); -#if ZEN_WITH_COMPUTE_SERVICES - if (ServerOptions.ComputeServiceEnabled) - { - if (m_HttpFunctionService != nullptr) - { - m_Http->RegisterService(*m_HttpFunctionService); - } - } -#endif // ZEN_WITH_COMPUTE_SERVICES + if (NamedMutex::Exists(MutexName) || ((m_ServerMutex.Create(MutexName) == false))) + { + throw std::runtime_error(fmt::format("Failed to create mutex '{}' - is another instance already running?", MutexName).c_str()); + } - m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot); + InitializeState(ServerOptions); - if (m_FrontendService) - { - m_Http->RegisterService(*m_FrontendService); - } + m_JobQueue = MakeJobQueue(8, "backgroundjobs"); - if (ServerOptions.ObjectStoreEnabled) - { - ObjectStoreConfig ObjCfg; - ObjCfg.RootDirectory = m_DataRoot / "obj"; - ObjCfg.ServerPort = static_cast<uint16_t>(EffectiveBasePort); + m_HealthService.SetHealthInfo({.DataRoot = m_DataRoot, + .AbsLogPath = ServerOptions.AbsLogFile, + .HttpServerClass = std::string(ServerOptions.HttpServerConfig.ServerClass), + .BuildVersion = std::string(ZEN_CFG_VERSION_BUILD_STRING_FULL)}); - for (const auto& Bucket : ServerOptions.ObjectStoreConfig.Buckets) - { - ObjectStoreConfig::BucketConfig NewBucket{.Name = Bucket.Name}; - NewBucket.Directory = Bucket.Directory.empty() ? (ObjCfg.RootDirectory / Bucket.Name) : Bucket.Directory; - ObjCfg.Buckets.push_back(std::move(NewBucket)); - } + // Ok so now we're configured, let's kick things off - m_ObjStoreService = std::make_unique<HttpObjectStoreService>(std::move(ObjCfg)); - m_Http->RegisterService(*m_ObjStoreService); - } + m_Http = CreateHttpServer(ServerOptions.HttpServerConfig); + int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort); - m_VfsService = std::make_unique<VfsService>(); - m_VfsService->AddService(Ref<ProjectStore>(m_ProjectStore)); - m_VfsService->AddService(Ref<ZenCacheStore>(m_CacheStore)); - m_Http->RegisterService(*m_VfsService); - - ZEN_INFO("initializing GC, enabled '{}', interval {}s, lightweight interval {}s", - ServerOptions.GcConfig.Enabled, - ServerOptions.GcConfig.IntervalSeconds, - ServerOptions.GcConfig.LightweightIntervalSeconds); - zen::GcSchedulerConfig GcConfig{ - .RootDirectory = m_DataRoot / "gc", - .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), - .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), - .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), - .MaxProjectStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds), - .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, - .Enabled = ServerOptions.GcConfig.Enabled, - .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, - .DiskSizeSoftLimit = ServerOptions.GcConfig.DiskSizeSoftLimit, - .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites, - .LightweightInterval = std::chrono::seconds(ServerOptions.GcConfig.LightweightIntervalSeconds)}; - m_GcScheduler.Initialize(GcConfig); - - // Create and register admin interface last to make sure all is properly initialized - m_AdminService = - std::make_unique<HttpAdminService>(m_GcScheduler, - *m_JobQueue, - *m_CacheStore, - HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile, - .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log", - .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"}); - m_Http->RegisterService(*m_AdminService); - - return EffectiveBasePort; - } - - void InitializeState(const ZenServerOptions& ServerOptions); - void InitializeStructuredCache(const ZenServerOptions& ServerOptions); - void InitializeCompute(const ZenServerOptions& ServerOptions); - - void Run() + // Setup authentication manager { - // This is disabled for now, awaiting better scheduling - // - // ScrubStorage(); + std::string EncryptionKey = ServerOptions.EncryptionKey; - if (m_ProcessMonitor.IsActive()) + if (EncryptionKey.empty()) { - EnqueueTimer(); - } + EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456"; - if (!m_TestMode) - { - ZEN_INFO("__________ _________ __ "); - ZEN_INFO("\\____ /____ ____ / _____// |_ ___________ ____ "); - ZEN_INFO(" / // __ \\ / \\ \\_____ \\\\ __\\/ _ \\_ __ \\_/ __ \\ "); - ZEN_INFO(" / /\\ ___/| | \\ / \\| | ( <_> ) | \\/\\ ___/ "); - ZEN_INFO("/_______ \\___ >___| / /_______ /|__| \\____/|__| \\___ >"); - ZEN_INFO(" \\/ \\/ \\/ \\/ \\/ "); + ZEN_WARN("using default encryption key"); } - ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", zen::GetCurrentProcessId()); + std::string EncryptionIV = ServerOptions.EncryptionIV; -#if ZEN_USE_SENTRY - ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); - if (m_UseSentry) + if (EncryptionIV.empty()) { - sentry_clear_modulecache(); - } -#endif + EncryptionIV = "0123456789abcdef"; - if (m_DebugOptionForcedCrash) - { - ZEN_DEBUG_BREAK(); + ZEN_WARN("using default encryption initialization vector"); } - const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode; - - SetNewState(kRunning); - - OnReady(); - - m_Http->Run(IsInteractiveMode); - - SetNewState(kShuttingDown); - - ZEN_INFO(ZEN_APP_NAME " exiting"); + m_AuthMgr = AuthMgr::Create({.RootDirectory = m_DataRoot / "auth", + .EncryptionKey = AesKey256Bit::FromString(EncryptionKey), + .EncryptionIV = AesIV128Bit::FromString(EncryptionIV)}); - Flush(); - } - - void RequestExit(int ExitCode) - { - RequestApplicationExit(ExitCode); - if (m_Http) + for (const ZenOpenIdProviderConfig& OpenIdProvider : ServerOptions.AuthConfig.OpenIdProviders) { - m_Http->RequestExit(); + m_AuthMgr->AddOpenIdProvider({.Name = OpenIdProvider.Name, .Url = OpenIdProvider.Url, .ClientId = OpenIdProvider.ClientId}); } } - void Cleanup() - { - ZEN_INFO(ZEN_APP_NAME " cleaning up"); - try - { - m_IoContext.stop(); - if (m_IoRunner.joinable()) - { - m_IoRunner.join(); - } + m_AuthService = std::make_unique<HttpAuthService>(*m_AuthMgr); + m_Http->RegisterService(*m_AuthService); - if (m_Http) - { - m_Http->Close(); - } - if (m_JobQueue) - { - m_JobQueue->Stop(); - } + m_Http->RegisterService(m_HealthService); + m_Http->RegisterService(m_StatsService); + m_Http->RegisterService(m_StatusService); + m_StatusService.RegisterHandler("status", *this); - m_GcScheduler.Shutdown(); - m_AdminService.reset(); - m_VfsService.reset(); - m_ObjStoreService.reset(); - m_FrontendService.reset(); + // Initialize storage and services - m_StructuredCacheService.reset(); - m_UpstreamService.reset(); - m_UpstreamCache.reset(); - m_CacheStore = {}; + ZEN_INFO("initializing storage"); -#if ZEN_WITH_COMPUTE_SERVICES - m_HttpFunctionService.reset(); -#endif // ZEN_WITH_COMPUTE_SERVICES + CidStoreConfiguration Config; + Config.RootDirectory = m_DataRoot / "cas"; - m_HttpProjectService.reset(); - m_ProjectStore = {}; - m_CidService.reset(); - m_CidStore.reset(); - m_AuthService.reset(); - m_AuthMgr.reset(); - m_Http = {}; - m_JobQueue.reset(); - } - catch (std::exception& Ex) - { - ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); - } - } + m_CidStore = std::make_unique<CidStore>(m_GcManager); + m_CidStore->Initialize(Config); + m_CidService.reset(new HttpCidService{*m_CidStore}); - void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } - void SetTestMode(bool State) { m_TestMode = State; } - void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } - void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } + ZEN_INFO("instantiating project service"); - std::function<void()> m_IsReadyFunc; - void SetIsReadyFunc(std::function<void()>&& IsReadyFunc) { m_IsReadyFunc = std::move(IsReadyFunc); } - void OnReady(); + m_ProjectStore = new ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue); + m_HttpProjectService.reset(new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr}); - void EnsureIoRunner() +#if ZEN_WITH_COMPUTE_SERVICES + if (ServerOptions.ComputeServiceEnabled) { - if (!m_IoRunner.joinable()) - { - m_IoRunner = std::thread{[this] { - zen::SetCurrentThreadName("timer_io"); - m_IoContext.run(); - }}; - } + InitializeCompute(ServerOptions); } - - void EnqueueTimer() + else { - m_PidCheckTimer.expires_after(std::chrono::seconds(1)); - m_PidCheckTimer.async_wait([this](const asio::error_code&) { CheckOwnerPid(); }); - - EnsureIoRunner(); + ZEN_INFO("NOT instantiating compute services"); } +#endif // ZEN_WITH_COMPUTE_SERVICES - void EnqueueStateMarkerTimer() + if (ServerOptions.StructuredCacheConfig.Enabled) { - m_StateMakerTimer.expires_after(std::chrono::seconds(5)); - m_StateMakerTimer.async_wait([this](const asio::error_code&) { CheckStateMarker(); }); - EnsureIoRunner(); + InitializeStructuredCache(ServerOptions); } - - void EnqueueSigIntTimer() + else { - m_SigIntTimer.expires_after(std::chrono::milliseconds(500)); - m_SigIntTimer.async_wait([this](const asio::error_code&) { CheckSigInt(); }); - EnsureIoRunner(); + ZEN_INFO("NOT instantiating structured cache service"); } - void CheckStateMarker() - { - std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker"; - try - { - if (!std::filesystem::exists(StateMarkerPath)) - { - ZEN_WARN("state marker at {} has been deleted, exiting", StateMarkerPath); - RequestExit(1); - return; - } - } - catch (std::exception& Ex) - { - ZEN_WARN("state marker at {} could not be checked, reason: '{}'", StateMarkerPath, Ex.what()); - RequestExit(1); - return; - } - EnqueueStateMarkerTimer(); - } + m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics - void CheckSigInt() - { - if (utils::SignalCounter[SIGINT] > 0) - { - ZEN_INFO("SIGINT triggered (Ctrl+C), exiting"); - RequestExit(128 + SIGINT); - return; - } - EnqueueSigIntTimer(); - } +#if ZEN_WITH_TESTS + m_Http->RegisterService(m_TestingService); +#endif - void CheckOwnerPid() + if (m_HttpProjectService) { - // Pick up any new "owner" processes - - std::set<uint32_t> AddedPids; + m_Http->RegisterService(*m_HttpProjectService); + } - for (auto& PidEntry : m_ServerEntry->SponsorPids) - { - if (uint32_t ThisPid = PidEntry.load(std::memory_order_relaxed)) - { - if (PidEntry.compare_exchange_strong(ThisPid, 0)) - { - if (AddedPids.insert(ThisPid).second) - { - m_ProcessMonitor.AddPid(ThisPid); + m_Http->RegisterService(*m_CidService); - ZEN_INFO("added process with pid #{} as a sponsor process", ThisPid); - } - } - } - } - - if (m_ProcessMonitor.IsRunning()) +#if ZEN_WITH_COMPUTE_SERVICES + if (ServerOptions.ComputeServiceEnabled) + { + if (m_HttpFunctionService != nullptr) { - EnqueueTimer(); + m_Http->RegisterService(*m_HttpFunctionService); } - else - { - ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone"); - - RequestExit(0); - } - } - - void ScrubStorage() - { - Stopwatch Timer; - ZEN_INFO("Storage validation STARTING"); - - WorkerThreadPool ThreadPool{1}; - ScrubContext Ctx{ThreadPool}; - m_CidStore->ScrubStorage(Ctx); - m_ProjectStore->ScrubStorage(Ctx); - m_StructuredCacheService->ScrubStorage(Ctx); - - const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); - - ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})", - NiceTimeSpanMs(ElapsedTimeMs), - NiceBytes(Ctx.ScrubbedBytes()), - Ctx.ScrubbedChunks(), - NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs)); } +#endif // ZEN_WITH_COMPUTE_SERVICES - void Flush() - { - if (m_CidStore) - m_CidStore->Flush(); - - if (m_StructuredCacheService) - m_StructuredCacheService->Flush(); - - if (m_ProjectStore) - m_ProjectStore->Flush(); - } + m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot); - virtual void HandleStatusRequest(HttpServerRequest& Request) override + if (m_FrontendService) { - CbObjectWriter Cbo; - Cbo << "ok" << true; - Cbo << "state" << ToString(m_CurrentState); - Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + m_Http->RegisterService(*m_FrontendService); } -private: - ZenServerState::ZenServerEntry* m_ServerEntry = nullptr; - bool m_IsDedicatedMode = false; - bool m_TestMode = false; - CbObject m_RootManifest; - std::filesystem::path m_DataRoot; - std::filesystem::path m_ContentRoot; - std::thread m_IoRunner; - asio::io_context m_IoContext; - asio::steady_timer m_PidCheckTimer{m_IoContext}; - asio::steady_timer m_StateMakerTimer{m_IoContext}; - asio::steady_timer m_SigIntTimer{m_IoContext}; - zen::ProcessMonitor m_ProcessMonitor; - zen::NamedMutex m_ServerMutex; - - enum ServerState + if (ServerOptions.ObjectStoreEnabled) { - kInitializing, - kRunning, - kShuttingDown - } m_CurrentState = kInitializing; + ObjectStoreConfig ObjCfg; + ObjCfg.RootDirectory = m_DataRoot / "obj"; + ObjCfg.ServerPort = static_cast<uint16_t>(EffectiveBasePort); - inline void SetNewState(ServerState NewState) { m_CurrentState = NewState; } - - std::string_view ToString(ServerState Value) - { - switch (Value) + for (const auto& Bucket : ServerOptions.ObjectStoreConfig.Buckets) { - case kInitializing: - return "initializing"sv; - case kRunning: - return "running"sv; - case kShuttingDown: - return "shutdown"sv; - default: - return "unknown"sv; + ObjectStoreConfig::BucketConfig NewBucket{.Name = Bucket.Name}; + NewBucket.Directory = Bucket.Directory.empty() ? (ObjCfg.RootDirectory / Bucket.Name) : Bucket.Directory; + ObjCfg.Buckets.push_back(std::move(NewBucket)); } + + m_ObjStoreService = std::make_unique<HttpObjectStoreService>(std::move(ObjCfg)); + m_Http->RegisterService(*m_ObjStoreService); } - zen::Ref<zen::HttpServer> m_Http; - std::unique_ptr<zen::AuthMgr> m_AuthMgr; - std::unique_ptr<zen::HttpAuthService> m_AuthService; - zen::HttpStatusService m_StatusService; - zen::HttpStatsService m_StatsService; - zen::GcManager m_GcManager; - zen::GcScheduler m_GcScheduler{m_GcManager}; - std::unique_ptr<zen::CidStore> m_CidStore; - Ref<zen::ZenCacheStore> m_CacheStore; - zen::HttpTestService m_TestService; -#if ZEN_WITH_TESTS - zen::HttpTestingService m_TestingService; -#endif - std::unique_ptr<zen::HttpCidService> m_CidService; - zen::RefPtr<zen::ProjectStore> m_ProjectStore; - std::unique_ptr<zen::HttpProjectService> m_HttpProjectService; - std::unique_ptr<zen::UpstreamCache> m_UpstreamCache; - std::unique_ptr<zen::HttpUpstreamService> m_UpstreamService; - std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService; - zen::HttpHealthService m_HealthService; -#if ZEN_WITH_COMPUTE_SERVICES - std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService; -#endif // ZEN_WITH_COMPUTE_SERVICES - std::unique_ptr<zen::HttpFrontendService> m_FrontendService; - std::unique_ptr<zen::HttpObjectStoreService> m_ObjStoreService; - std::unique_ptr<zen::VfsService> m_VfsService; - std::unique_ptr<JobQueue> m_JobQueue; - std::unique_ptr<zen::HttpAdminService> m_AdminService; + m_VfsService = std::make_unique<VfsService>(); + m_VfsService->AddService(Ref<ProjectStore>(m_ProjectStore)); + m_VfsService->AddService(Ref<ZenCacheStore>(m_CacheStore)); + m_Http->RegisterService(*m_VfsService); - bool m_DebugOptionForcedCrash = false; - bool m_UseSentry = false; -}; + ZEN_INFO("initializing GC, enabled '{}', interval {}s, lightweight interval {}s", + ServerOptions.GcConfig.Enabled, + ServerOptions.GcConfig.IntervalSeconds, + ServerOptions.GcConfig.LightweightIntervalSeconds); + GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc", + .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), + .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), + .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), + .MaxProjectStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds), + .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, + .Enabled = ServerOptions.GcConfig.Enabled, + .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, + .DiskSizeSoftLimit = ServerOptions.GcConfig.DiskSizeSoftLimit, + .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites, + .LightweightInterval = std::chrono::seconds(ServerOptions.GcConfig.LightweightIntervalSeconds)}; + m_GcScheduler.Initialize(GcConfig); -void -ZenServer::OnReady() -{ - m_ServerEntry->SignalReady(); + // Create and register admin interface last to make sure all is properly initialized + m_AdminService = + std::make_unique<HttpAdminService>(m_GcScheduler, + *m_JobQueue, + *m_CacheStore, + HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile, + .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log", + .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"}); + m_Http->RegisterService(*m_AdminService); - if (m_IsReadyFunc) - { - m_IsReadyFunc(); - } + return EffectiveBasePort; } void @@ -822,7 +332,7 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions) bool UpdateManifest = false; std::filesystem::path ManifestPath = m_DataRoot / "root_manifest"; - FileContents ManifestData = zen::ReadFile(ManifestPath); + FileContents ManifestData = ReadFile(ManifestPath); if (ManifestData.ErrorCode) { @@ -938,7 +448,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; - zen::UpstreamCacheOptions UpstreamOptions; + UpstreamCacheOptions UpstreamOptions; UpstreamOptions.ReadUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; UpstreamOptions.WriteUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; @@ -947,7 +457,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); } - m_UpstreamCache = zen::UpstreamCache::Create(UpstreamOptions, *m_CacheStore, *m_CidStore); + m_UpstreamCache = UpstreamCache::Create(UpstreamOptions, *m_CacheStore, *m_CidStore); m_UpstreamService = std::make_unique<HttpUpstreamService>(*m_UpstreamCache, *m_AuthMgr); m_UpstreamCache->Initialize(); @@ -962,7 +472,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) { if (!Dns.empty()) { - const asio::error_code Err = zen::utils::ResolveHostname(m_IoContext, Dns, "1337"sv, ZenUrls); + const asio::error_code Err = utils::ResolveHostname(m_IoContext, Dns, "1337"sv, ZenUrls); if (Err) { ZEN_ERROR("resolve FAILED, reason '{}'", Err.message()); @@ -977,7 +487,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) { const auto ZenEndpointName = UpstreamConfig.ZenConfig.Name.empty() ? "Zen"sv : UpstreamConfig.ZenConfig.Name; - std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::UpstreamEndpoint::CreateZenEndpoint( + std::unique_ptr<UpstreamEndpoint> ZenEndpoint = UpstreamEndpoint::CreateZenEndpoint( {.Name = ZenEndpointName, .Urls = ZenUrls, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), @@ -992,22 +502,20 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) { std::string_view EndpointName = UpstreamConfig.JupiterConfig.Name.empty() ? "Jupiter"sv : UpstreamConfig.JupiterConfig.Name; - auto Options = - zen::CloudCacheClientOptions{.Name = EndpointName, - .ServiceUrl = UpstreamConfig.JupiterConfig.Url, - .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace, - .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace, - .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), - .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; + auto Options = CloudCacheClientOptions{.Name = EndpointName, + .ServiceUrl = UpstreamConfig.JupiterConfig.Url, + .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace, + .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; - auto AuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl, - .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId, - .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret, - .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider, - .AccessToken = UpstreamConfig.JupiterConfig.AccessToken}; + auto AuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl, + .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId, + .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret, + .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider, + .AccessToken = UpstreamConfig.JupiterConfig.AccessToken}; - std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = - zen::UpstreamEndpoint::CreateJupiterEndpoint(Options, AuthConfig, *m_AuthMgr); + std::unique_ptr<UpstreamEndpoint> JupiterEndpoint = UpstreamEndpoint::CreateJupiterEndpoint(Options, AuthConfig, *m_AuthMgr); m_UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); } @@ -1039,37 +547,37 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) std::string_view EndpointName = UpstreamConfig.HordeConfig.Name.empty() ? "Horde"sv : UpstreamConfig.HordeConfig.Name; auto ComputeOptions = - zen::CloudCacheClientOptions{.Name = EndpointName, - .ServiceUrl = UpstreamConfig.HordeConfig.Url, - .ComputeCluster = UpstreamConfig.HordeConfig.Cluster, - .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), - .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; - - auto ComputeAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.OAuthUrl, - .OAuthClientId = UpstreamConfig.HordeConfig.OAuthClientId, - .OAuthClientSecret = UpstreamConfig.HordeConfig.OAuthClientSecret, - .OpenIdProvider = UpstreamConfig.HordeConfig.OpenIdProvider, - .AccessToken = UpstreamConfig.HordeConfig.AccessToken}; + CloudCacheClientOptions{.Name = EndpointName, + .ServiceUrl = UpstreamConfig.HordeConfig.Url, + .ComputeCluster = UpstreamConfig.HordeConfig.Cluster, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; + + auto ComputeAuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.OAuthUrl, + .OAuthClientId = UpstreamConfig.HordeConfig.OAuthClientId, + .OAuthClientSecret = UpstreamConfig.HordeConfig.OAuthClientSecret, + .OpenIdProvider = UpstreamConfig.HordeConfig.OpenIdProvider, + .AccessToken = UpstreamConfig.HordeConfig.AccessToken}; auto StorageOptions = - zen::CloudCacheClientOptions{.Name = EndpointName, - .ServiceUrl = UpstreamConfig.HordeConfig.StorageUrl, - .BlobStoreNamespace = UpstreamConfig.HordeConfig.Namespace, - .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), - .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; - - auto StorageAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.StorageOAuthUrl, - .OAuthClientId = UpstreamConfig.HordeConfig.StorageOAuthClientId, - .OAuthClientSecret = UpstreamConfig.HordeConfig.StorageOAuthClientSecret, - .OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider, - .AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken}; - - m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CidStore, - ComputeOptions, - StorageOptions, - ComputeAuthConfig, - StorageAuthConfig, - *m_AuthMgr); + CloudCacheClientOptions{.Name = EndpointName, + .ServiceUrl = UpstreamConfig.HordeConfig.StorageUrl, + .BlobStoreNamespace = UpstreamConfig.HordeConfig.Namespace, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; + + auto StorageAuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.StorageOAuthUrl, + .OAuthClientId = UpstreamConfig.HordeConfig.StorageOAuthClientId, + .OAuthClientSecret = UpstreamConfig.HordeConfig.StorageOAuthClientSecret, + .OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider, + .AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken}; + + m_HttpFunctionService = std::make_unique<HttpFunctionService>(*m_CidStore, + ComputeOptions, + StorageOptions, + ComputeAuthConfig, + StorageAuthConfig, + *m_AuthMgr); } else { @@ -1078,456 +586,281 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) } #endif // ZEN_WITH_COMPUTE_SERVICES -//////////////////////////////////////////////////////////////////////////////// - -class ZenEntryPoint -{ -public: - ZenEntryPoint(ZenServerOptions& ServerOptions); - ZenEntryPoint(const ZenEntryPoint&) = delete; - ZenEntryPoint& operator=(const ZenEntryPoint&) = delete; - int Run(); - -private: - ZenServerOptions& m_ServerOptions; - zen::LockFile m_LockFile; -}; - -ZenEntryPoint::ZenEntryPoint(ZenServerOptions& ServerOptions) : m_ServerOptions(ServerOptions) -{ -} - -#if ZEN_USE_SENTRY -static void -SentryLogFunction(sentry_level_t Level, const char* Message, va_list Args, [[maybe_unused]] void* Userdata) +void +ZenServer::Run() { - char LogMessageBuffer[160]; - std::string LogMessage; - const char* MessagePtr = LogMessageBuffer; - - int n = vsnprintf(LogMessageBuffer, sizeof LogMessageBuffer, Message, Args); + // This is disabled for now, awaiting better scheduling + // + // ScrubStorage(); - if (n >= int(sizeof LogMessageBuffer)) + if (m_ProcessMonitor.IsActive()) { - LogMessage.resize(n + 1); - - n = vsnprintf(LogMessage.data(), LogMessage.size(), Message, Args); - - MessagePtr = LogMessage.c_str(); + EnqueueTimer(); } - switch (Level) + if (!m_TestMode) { - case SENTRY_LEVEL_DEBUG: - ConsoleLog().debug("sentry: {}", MessagePtr); - break; - - case SENTRY_LEVEL_INFO: - ConsoleLog().info("sentry: {}", MessagePtr); - break; - - case SENTRY_LEVEL_WARNING: - ConsoleLog().warn("sentry: {}", MessagePtr); - break; - - case SENTRY_LEVEL_ERROR: - ConsoleLog().error("sentry: {}", MessagePtr); - break; - - case SENTRY_LEVEL_FATAL: - ConsoleLog().critical("sentry: {}", MessagePtr); - break; + ZEN_INFO("__________ _________ __ "); + ZEN_INFO("\\____ /____ ____ / _____// |_ ___________ ____ "); + ZEN_INFO(" / // __ \\ / \\ \\_____ \\\\ __\\/ _ \\_ __ \\_/ __ \\ "); + ZEN_INFO(" / /\\ ___/| | \\ / \\| | ( <_> ) | \\/\\ ___/ "); + ZEN_INFO("/_______ \\___ >___| / /_______ /|__| \\____/|__| \\___ >"); + ZEN_INFO(" \\/ \\/ \\/ \\/ \\/ "); } -} -#endif - -int -ZenEntryPoint::Run() -{ - zen::SetCurrentThreadName("main"); + ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", GetCurrentProcessId()); #if ZEN_USE_SENTRY - int SentryErrorCode = 0; - std::unique_ptr<zen::utils::SentryAssertImpl> SentryAssert; - std::string SentryUserName; - - if (m_ServerOptions.NoSentry == false) + ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); + if (m_UseSentry) { - std::string SentryDatabasePath = PathToUtf8(m_ServerOptions.DataDir / ".sentry-native"); - if (SentryDatabasePath.starts_with("\\\\?\\")) - { - SentryDatabasePath = SentryDatabasePath.substr(4); - } - sentry_options_t* SentryOptions = sentry_options_new(); - sentry_options_set_dsn(SentryOptions, "https://[email protected]/5919284"); - sentry_options_set_database_path(SentryOptions, SentryDatabasePath.c_str()); - sentry_options_set_logger(SentryOptions, SentryLogFunction, this); - std::string SentryAttachmentPath = PathToUtf8(m_ServerOptions.AbsLogFile); - if (SentryAttachmentPath.starts_with("\\\\?\\")) - { - SentryAttachmentPath = SentryAttachmentPath.substr(4); - } - sentry_options_add_attachment(SentryOptions, SentryAttachmentPath.c_str()); - sentry_options_set_release(SentryOptions, ZEN_CFG_VERSION); - - // sentry_options_set_debug(SentryOptions, 1); - - SentryErrorCode = sentry_init(SentryOptions); - - if (SentryErrorCode == 0) - { - if (m_ServerOptions.SentryAllowPII) - { -# if ZEN_PLATFORM_WINDOWS - CHAR UserNameBuffer[511 + 1]; - DWORD UserNameLength = sizeof(UserNameBuffer) / sizeof(CHAR); - BOOL OK = GetUserNameA(UserNameBuffer, &UserNameLength); - if (OK && UserNameLength) - { - SentryUserName = std::string(UserNameBuffer, UserNameLength - 1); - } -# endif // ZEN_PLATFORM_WINDOWS -# if (ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC) - uid_t uid = geteuid(); - struct passwd* pw = getpwuid(uid); - if (pw) - { - SentryUserName = std::string(pw->pw_name); - } -# endif - sentry_value_t SentryUserObject = sentry_value_new_object(); - sentry_value_set_by_key(SentryUserObject, "id", sentry_value_new_string(SentryUserName.c_str())); - sentry_value_set_by_key(SentryUserObject, "username", sentry_value_new_string(SentryUserName.c_str())); - sentry_value_set_by_key(SentryUserObject, "ip_address", sentry_value_new_string("{{auto}}")); - sentry_set_user(SentryUserObject); - } - - auto SentrySink = spdlog::create<utils::sentry_sink>("sentry"); - zen::logging::SetErrorLog(std::move(SentrySink)); - - SentryAssert = std::make_unique<zen::utils::SentryAssertImpl>(); - } + SentryIntegration::ClearCaches(); } - - auto _ = zen::MakeGuard([&SentryAssert, SentryErrorCode] { - if (SentryErrorCode == 0) - { - zen::logging::SetErrorLog(std::shared_ptr<spdlog::logger>()); - SentryAssert.reset(); - sentry_close(); - } - }); #endif - auto& ServerOptions = m_ServerOptions; - - try + if (m_DebugOptionForcedCrash) { - // Mutual exclusion and synchronization - ZenServerState ServerState; - ServerState.Initialize(); - ServerState.Sweep(); - - ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(ServerOptions.BasePort); - - if (Entry) - { - if (ServerOptions.OwnerPid) - { - ZEN_INFO( - "Looks like there is already a process listening to this port {} (pid: {}), attaching owner pid {} to running instance", - ServerOptions.BasePort, - Entry->Pid.load(), - ServerOptions.OwnerPid); - - Entry->AddSponsorProcess(ServerOptions.OwnerPid); - - std::exit(0); - } - else - { - ZEN_WARN("Exiting since there is already a process listening to port {} (pid: {})", - ServerOptions.BasePort, - Entry->Pid.load()); - std::exit(1); - } - } + ZEN_DEBUG_BREAK(); + } - std::error_code Ec; + const bool IsInteractiveMode = IsInteractiveSession() && !m_TestMode; - std::filesystem::path LockFilePath = ServerOptions.DataDir / ".lock"; + SetNewState(kRunning); - bool IsReady = false; + OnReady(); - auto MakeLockData = [&] { - CbObjectWriter Cbo; - Cbo << "pid" << zen::GetCurrentProcessId() << "data" << PathToUtf8(ServerOptions.DataDir) << "port" << ServerOptions.BasePort - << "session_id" << GetSessionId() << "ready" << IsReady; - return Cbo.Save(); - }; + m_Http->Run(IsInteractiveMode); - m_LockFile.Create(LockFilePath, MakeLockData(), Ec); + SetNewState(kShuttingDown); - if (Ec) - { - ZEN_WARN("ERROR: Unable to grab lock at '{}' (error: '{}')", LockFilePath, Ec.message()); + ZEN_INFO(ZEN_APP_NAME " exiting"); - std::exit(99); - } + Flush(); +} - InitializeLogging(ServerOptions); +void +ZenServer::RequestExit(int ExitCode) +{ + RequestApplicationExit(ExitCode); + if (m_Http) + { + m_Http->RequestExit(); + } +} -#if ZEN_USE_SENTRY - if (m_ServerOptions.NoSentry == false) +void +ZenServer::Cleanup() +{ + ZEN_INFO(ZEN_APP_NAME " cleaning up"); + try + { + m_IoContext.stop(); + if (m_IoRunner.joinable()) { - if (SentryErrorCode == 0) - { - if (m_ServerOptions.SentryAllowPII) - { - ZEN_INFO("sentry initialized, username: '{}'", SentryUserName); - } - else - { - ZEN_INFO("sentry initialized with anonymous reports"); - } - } - else - { - ZEN_WARN("sentry_init returned failure! (error code: {})", SentryErrorCode); - } + m_IoRunner.join(); } -#endif - - MaximizeOpenFileCount(); - - ZEN_INFO(ZEN_APP_NAME " - using lock file at '{}'", LockFilePath); - ZEN_INFO(ZEN_APP_NAME " - starting on port {}, version '{}'", ServerOptions.BasePort, ZEN_CFG_VERSION_BUILD_STRING_FULL); - - Entry = ServerState.Register(ServerOptions.BasePort); - - if (ServerOptions.OwnerPid) + if (m_Http) { - Entry->AddSponsorProcess(ServerOptions.OwnerPid); + m_Http->Close(); } - - ZenServer Server; - Server.SetDataRoot(ServerOptions.DataDir); - Server.SetContentRoot(ServerOptions.ContentDir); - Server.SetTestMode(ServerOptions.IsTest); - Server.SetDedicatedMode(ServerOptions.IsDedicated); - - auto ServerCleanup = zen::MakeGuard([&Server] { Server.Cleanup(); }); - - int EffectiveBasePort = Server.Initialize(ServerOptions, Entry); - - Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); - if (EffectiveBasePort != ServerOptions.BasePort) + if (m_JobQueue) { - ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); - ServerOptions.BasePort = EffectiveBasePort; + m_JobQueue->Stop(); } - std::unique_ptr<std::thread> ShutdownThread; - std::unique_ptr<zen::NamedEvent> ShutdownEvent; - - zen::ExtendableStringBuilder<64> ShutdownEventName; - ShutdownEventName << "Zen_" << ServerOptions.BasePort << "_Shutdown"; - ShutdownEvent.reset(new zen::NamedEvent{ShutdownEventName}); - - // Monitor shutdown signals - - ShutdownThread.reset(new std::thread{[&] { - zen::SetCurrentThreadName("shutdown_monitor"); - - ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}'", ShutdownEventName); - - if (ShutdownEvent->Wait()) - { - if (!IsApplicationExitRequested()) - { - ZEN_INFO("shutdown signal received"); - Server.RequestExit(0); - } - } - else - { - ZEN_INFO("shutdown signal wait() failed"); - } - }}); - auto CleanupShutdown = zen::MakeGuard([&ShutdownEvent, &ShutdownThread] { - if (ShutdownEvent) - { - ShutdownEvent->Set(); - } - if (ShutdownThread && ShutdownThread->joinable()) - { - ShutdownThread->join(); - } - }); - - // If we have a parent process, establish the mechanisms we need - // to be able to communicate readiness with the parent - - Server.SetIsReadyFunc([&] { - IsReady = true; + m_GcScheduler.Shutdown(); + m_AdminService.reset(); + m_VfsService.reset(); + m_ObjStoreService.reset(); + m_FrontendService.reset(); - m_LockFile.Update(MakeLockData(), Ec); + m_StructuredCacheService.reset(); + m_UpstreamService.reset(); + m_UpstreamCache.reset(); + m_CacheStore = {}; - if (!ServerOptions.ChildId.empty()) - { - zen::NamedEvent ParentEvent{ServerOptions.ChildId}; - ParentEvent.Set(); - } - }); +#if ZEN_WITH_COMPUTE_SERVICES + m_HttpFunctionService.reset(); +#endif // ZEN_WITH_COMPUTE_SERVICES - Server.Run(); + m_HttpProjectService.reset(); + m_ProjectStore = {}; + m_CidService.reset(); + m_CidStore.reset(); + m_AuthService.reset(); + m_AuthMgr.reset(); + m_Http = {}; + m_JobQueue.reset(); } - catch (std::exception& e) + catch (std::exception& Ex) { - SPDLOG_CRITICAL("Caught exception in main: {}", e.what()); - if (!IsApplicationExitRequested()) - { - RequestApplicationExit(1); - } + ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); } - - ShutdownLogging(); - - return ApplicationExitCode(); } -} // namespace zen - -//////////////////////////////////////////////////////////////////////////////// - -#if ZEN_PLATFORM_WINDOWS - -class ZenWindowsService : public WindowsService +void +ZenServer::EnsureIoRunner() { -public: - ZenWindowsService(ZenServerOptions& ServerOptions) : m_EntryPoint(ServerOptions) {} - - ZenWindowsService(const ZenWindowsService&) = delete; - ZenWindowsService& operator=(const ZenWindowsService&) = delete; + if (!m_IoRunner.joinable()) + { + m_IoRunner = std::thread{[this] { + SetCurrentThreadName("timer_io"); + m_IoContext.run(); + }}; + } +} - virtual int Run() override; +void +ZenServer::EnqueueTimer() +{ + m_PidCheckTimer.expires_after(std::chrono::seconds(1)); + m_PidCheckTimer.async_wait([this](const asio::error_code&) { CheckOwnerPid(); }); -private: - zen::ZenEntryPoint m_EntryPoint; -}; + EnsureIoRunner(); +} -int -ZenWindowsService::Run() +void +ZenServer::EnqueueStateMarkerTimer() { - return m_EntryPoint.Run(); + m_StateMakerTimer.expires_after(std::chrono::seconds(5)); + m_StateMakerTimer.async_wait([this](const asio::error_code&) { CheckStateMarker(); }); + EnsureIoRunner(); } -#endif // ZEN_PLATFORM_WINDOWS - -//////////////////////////////////////////////////////////////////////////////// - -#if ZEN_WITH_TESTS -int -test_main(int argc, char** argv) +void +ZenServer::EnqueueSigIntTimer() { - zen::zencore_forcelinktests(); - zen::zenhttp_forcelinktests(); - zen::zenstore_forcelinktests(); - zen::z$_forcelink(); - zen::z$service_forcelink(); - - zen::logging::InitializeLogging(); - spdlog::set_level(spdlog::level::debug); + m_SigIntTimer.expires_after(std::chrono::milliseconds(500)); + m_SigIntTimer.async_wait([this](const asio::error_code&) { CheckSigInt(); }); + EnsureIoRunner(); +} - zen::MaximizeOpenFileCount(); +void +ZenServer::CheckStateMarker() +{ + std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker"; + try + { + if (!std::filesystem::exists(StateMarkerPath)) + { + ZEN_WARN("state marker at {} has been deleted, exiting", StateMarkerPath); + RequestExit(1); + return; + } + } + catch (std::exception& Ex) + { + ZEN_WARN("state marker at {} could not be checked, reason: '{}'", StateMarkerPath, Ex.what()); + RequestExit(1); + return; + } + EnqueueStateMarkerTimer(); +} - return ZEN_RUN_TESTS(argc, argv); +void +ZenServer::CheckSigInt() +{ + if (utils::SignalCounter[SIGINT] > 0) + { + ZEN_INFO("SIGINT triggered (Ctrl+C), exiting"); + RequestExit(128 + SIGINT); + return; + } + EnqueueSigIntTimer(); } -#endif -int -main(int argc, char* argv[]) +void +ZenServer::CheckOwnerPid() { - using namespace zen; + // Pick up any new "owner" processes -#if ZEN_USE_MIMALLOC - mi_version(); -#endif + std::set<uint32_t> AddedPids; - if (argc >= 2) + for (auto& PidEntry : m_ServerEntry->SponsorPids) { - if (argv[1] == "test"sv) + if (uint32_t ThisPid = PidEntry.load(std::memory_order_relaxed)) { -#if ZEN_WITH_TESTS - return test_main(argc, argv); -#else - fprintf(stderr, "test option not available in release mode!\n"); - exit(5); -#endif + if (PidEntry.compare_exchange_strong(ThisPid, 0)) + { + if (AddedPids.insert(ThisPid).second) + { + m_ProcessMonitor.AddPid(ThisPid); + + ZEN_INFO("added process with pid #{} as a sponsor process", ThisPid); + } + } } } - signal(SIGINT, utils::SignalCallbackHandler); - - try + if (m_ProcessMonitor.IsRunning()) { - ZenServerOptions ServerOptions; - ParseCliOptions(argc, argv, ServerOptions); - - if (!std::filesystem::exists(ServerOptions.DataDir)) - { - ServerOptions.IsFirstRun = true; - std::filesystem::create_directories(ServerOptions.DataDir); - } + EnqueueTimer(); + } + else + { + ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone"); -#if ZEN_WITH_TRACE - if (ServerOptions.TraceHost.size()) - { - TraceStart(ServerOptions.TraceHost.c_str(), TraceType::Network); - } - else if (ServerOptions.TraceFile.size()) - { - TraceStart(ServerOptions.TraceFile.c_str(), TraceType::File); - } - else - { - TraceInit(); - } - atexit(TraceShutdown); -#endif // ZEN_WITH_TRACE + RequestExit(0); + } +} -#if ZEN_PLATFORM_WINDOWS - if (ServerOptions.InstallService) - { - WindowsService::Install(); +void +ZenServer::ScrubStorage() +{ + Stopwatch Timer; + ZEN_INFO("Storage validation STARTING"); + + WorkerThreadPool ThreadPool{1}; + ScrubContext Ctx{ThreadPool}; + m_CidStore->ScrubStorage(Ctx); + m_ProjectStore->ScrubStorage(Ctx); + m_StructuredCacheService->ScrubStorage(Ctx); + + const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); + + ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})", + NiceTimeSpanMs(ElapsedTimeMs), + NiceBytes(Ctx.ScrubbedBytes()), + Ctx.ScrubbedChunks(), + NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs)); +} - std::exit(0); - } +void +ZenServer::Flush() +{ + if (m_CidStore) + m_CidStore->Flush(); - if (ServerOptions.UninstallService) - { - WindowsService::Delete(); + if (m_StructuredCacheService) + m_StructuredCacheService->Flush(); - std::exit(0); - } + if (m_ProjectStore) + m_ProjectStore->Flush(); +} - ZenWindowsService App(ServerOptions); - return App.ServiceMain(); -#else - if (ServerOptions.InstallService || ServerOptions.UninstallService) - { - throw std::runtime_error("Service mode is not supported on this platform"); - } +void +ZenServer::HandleStatusRequest(HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + Cbo << "ok" << true; + Cbo << "state" << ToString(m_CurrentState); + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} - ZenEntryPoint App(ServerOptions); - return App.Run(); -#endif // ZEN_PLATFORM_WINDOWS - } - catch (std::exception& Ex) +std::string_view +ZenServer::ToString(ServerState Value) +{ + switch (Value) { - fprintf(stderr, "ERROR: Caught exception in main: '%s'", Ex.what()); - - return 1; + case kInitializing: + return "initializing"sv; + case kRunning: + return "running"sv; + case kShuttingDown: + return "shutdown"sv; + default: + return "unknown"sv; } } + +} // namespace zen |