aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/zenserver.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-10-06 10:45:48 +0200
committerGitHub <[email protected]>2023-10-06 10:45:48 +0200
commitfb70324d37282910d7fa3047f4ec290d0c5a94b1 (patch)
treea1bc82fcfdb96eb5b461742b613fcbb63f816a54 /src/zenserver/zenserver.cpp
parentreject known bad bucket names in structured cache (#452) (diff)
downloadzen-fb70324d37282910d7fa3047f4ec290d0c5a94b1.tar.xz
zen-fb70324d37282910d7fa3047f4ec290d0c5a94b1.zip
zenserver project restructuring (#442)
Diffstat (limited to 'src/zenserver/zenserver.cpp')
-rw-r--r--src/zenserver/zenserver.cpp1487
1 files changed, 410 insertions, 1077 deletions
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 0555fab79..a8273f6c9 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -1,5 +1,9 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include "zenserver.h"
+
+#include "sentryintegration.h"
+
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/config.h>
@@ -34,13 +38,6 @@
# include <pwd.h>
#endif
-#if ZEN_USE_MIMALLOC
-ZEN_THIRD_PARTY_INCLUDES_START
-# include <mimalloc-new-delete.h>
-# include <mimalloc.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-#endif
-
ZEN_THIRD_PARTY_INCLUDES_START
#include <fmt/format.h>
#include <asio.hpp>
@@ -48,181 +45,23 @@ ZEN_THIRD_PARTY_INCLUDES_START
ZEN_THIRD_PARTY_INCLUDES_END
#include <exception>
-#include <list>
-#include <optional>
-#include <regex>
#include <set>
-#include <unordered_map>
-
-//////////////////////////////////////////////////////////////////////////
-// We don't have any doctest code in this file but this is needed to bring
-// in some shared code into the executable
-
-#if ZEN_WITH_TESTS
-# define ZEN_TEST_WITH_RUNNER 1
-# include <zencore/testing.h>
-#endif
//////////////////////////////////////////////////////////////////////////
#include "config.h"
#include "diag/logging.h"
-
-#if ZEN_PLATFORM_WINDOWS
-# include "windows/service.h"
-#endif
-
-//////////////////////////////////////////////////////////////////////////
-// Sentry
-//
-
-#if !defined(ZEN_USE_SENTRY)
-# if ZEN_PLATFORM_MAC && ZEN_ARCH_ARM64
-// vcpkg's sentry-native port does not support Arm on Mac.
-# define ZEN_USE_SENTRY 0
-# else
-# define ZEN_USE_SENTRY 1
-# endif
-#endif
-
-#if ZEN_USE_SENTRY
-# define SENTRY_BUILD_STATIC 1
-ZEN_THIRD_PARTY_INCLUDES_START
-# include <sentry.h>
-# include <spdlog/sinks/base_sink.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-// Sentry currently does not automatically add all required Windows
-// libraries to the linker when consumed via vcpkg
-
-# if ZEN_PLATFORM_WINDOWS
-# pragma comment(lib, "sentry.lib")
-# pragma comment(lib, "dbghelp.lib")
-# pragma comment(lib, "winhttp.lib")
-# pragma comment(lib, "version.lib")
-# endif
-#endif
-
-//////////////////////////////////////////////////////////////////////////
-// Services
-//
-
-#include <zenhttp/auth/authmgr.h>
-#include <zenhttp/auth/authservice.h>
-#include <zenhttp/diagsvcs.h>
-#include <zenhttp/httpstats.h>
-#include <zenhttp/httpstatus.h>
-#include <zenhttp/httptest.h>
-#include <zenstore/gc.h>
-#include "admin/admin.h"
-#include "cache/httpstructuredcache.h"
-#include "cache/structuredcachestore.h"
-#include "compute/function.h"
-#include "frontend/frontend.h"
-#include "httpcidstore.h"
-#include "objectstore/objectstore.h"
-#include "projectstore/httpprojectstore.h"
-#include "projectstore/projectstore.h"
-#include "upstream/upstream.h"
-#include "vfs/vfsservice.h"
-
-#define ZEN_APP_NAME "Zen store"
+#include "sentryintegration.h"
namespace zen {
-using namespace std::literals;
-
namespace utils {
- static std::atomic_uint32_t SignalCounter[NSIG] = {0};
-
- static void SignalCallbackHandler(int SigNum)
- {
- if (SigNum >= 0 && SigNum < NSIG)
- {
- SignalCounter[SigNum].fetch_add(1);
- }
- }
-
-#if ZEN_USE_SENTRY
- class sentry_sink final : public spdlog::sinks::base_sink<spdlog::details::null_mutex>
- {
- public:
- sentry_sink() {}
-
- protected:
- static constexpr sentry_level_t MapToSentryLevel[spdlog::level::level_enum::n_levels] = {SENTRY_LEVEL_DEBUG,
- SENTRY_LEVEL_DEBUG,
- SENTRY_LEVEL_INFO,
- SENTRY_LEVEL_WARNING,
- SENTRY_LEVEL_ERROR,
- SENTRY_LEVEL_FATAL,
- SENTRY_LEVEL_DEBUG};
-
- void sink_it_(const spdlog::details::log_msg& msg) override
- {
- try
- {
- std::string Message =
- fmt::format("{}\n{}({}) [{}]", msg.payload, msg.source.filename, msg.source.line, msg.source.funcname);
- sentry_value_t event = sentry_value_new_message_event(
- /* level */ MapToSentryLevel[msg.level],
- /* logger */ nullptr,
- /* message */ Message.c_str());
- sentry_event_value_add_stacktrace(event, NULL, 0);
- sentry_capture_event(event);
- }
- catch (std::exception&)
- {
- // If our logging with Message formatting fails we do a non-allocating version and just post the msg.payload raw
- char TmpBuffer[256];
- size_t MaxCopy = Min<size_t>(msg.payload.size(), size_t(255));
- memcpy(TmpBuffer, msg.payload.data(), MaxCopy);
- TmpBuffer[MaxCopy] = '\0';
- sentry_value_t event = sentry_value_new_message_event(
- /* level */ SENTRY_LEVEL_ERROR,
- /* logger */ nullptr,
- /* message */ TmpBuffer);
- sentry_event_value_add_stacktrace(event, NULL, 0);
- sentry_capture_event(event);
- }
- }
- void flush_() override {}
- };
+ extern std::atomic_uint32_t SignalCounter[NSIG];
+}
- struct SentryAssertImpl : AssertImpl
- {
- ZEN_FORCENOINLINE ZEN_DEBUG_SECTION SentryAssertImpl() : PrevAssertImpl(CurrentAssertImpl) { CurrentAssertImpl = this; }
- virtual ZEN_FORCENOINLINE ZEN_DEBUG_SECTION ~SentryAssertImpl() { CurrentAssertImpl = PrevAssertImpl; }
- virtual void ZEN_FORCENOINLINE ZEN_DEBUG_SECTION OnAssert(const char* Filename,
- int LineNumber,
- const char* FunctionName,
- const char* Msg)
- {
- try
- {
- std::string Message = fmt::format("ASSERT {}:({}) [{}]\n\"{}\"", Filename, LineNumber, FunctionName, Msg);
- sentry_value_t event = sentry_value_new_message_event(
- /* level */ SENTRY_LEVEL_ERROR,
- /* logger */ nullptr,
- /* message */ Message.c_str());
- sentry_event_value_add_stacktrace(event, NULL, 0);
- sentry_capture_event(event);
- }
- catch (std::exception&)
- {
- // If our logging with Message formatting fails we do a non-allocating version and just post the Msg raw
- sentry_value_t event = sentry_value_new_message_event(
- /* level */ SENTRY_LEVEL_ERROR,
- /* logger */ nullptr,
- /* message */ Msg);
- sentry_event_value_add_stacktrace(event, NULL, 0);
- sentry_capture_event(event);
- }
- }
- AssertImpl* PrevAssertImpl;
- };
-#endif
+using namespace std::literals;
+namespace utils {
asio::error_code ResolveHostname(asio::io_context& Ctx,
std::string_view Host,
std::string_view DefaultPort,
@@ -253,561 +92,232 @@ namespace utils {
}
} // namespace utils
-class ZenServer : public IHttpStatusProvider
-{
-public:
- ~ZenServer() {}
-
- int Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry)
- {
- m_UseSentry = ServerOptions.NoSentry == false;
- m_ServerEntry = ServerEntry;
- m_DebugOptionForcedCrash = ServerOptions.ShouldCrash;
- const int ParentPid = ServerOptions.OwnerPid;
-
- if (ParentPid)
- {
- zen::ProcessHandle OwnerProcess;
- OwnerProcess.Initialize(ParentPid);
-
- if (!OwnerProcess.IsValid())
- {
- ZEN_WARN("Unable to initialize process handle for specified parent pid #{}", ParentPid);
-
- // If the pid is not reachable should we just shut down immediately? the intended owner process
- // could have been killed or somehow crashed already
- }
- else
- {
- ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid);
- }
-
- m_ProcessMonitor.AddPid(ParentPid);
- }
-
- // Initialize/check mutex based on base port
-
- std::string MutexName = fmt::format("zen_{}", ServerOptions.BasePort);
-
- if (zen::NamedMutex::Exists(MutexName) || ((m_ServerMutex.Create(MutexName) == false)))
- {
- throw std::runtime_error(fmt::format("Failed to create mutex '{}' - is another instance already running?", MutexName).c_str());
- }
-
- InitializeState(ServerOptions);
-
- m_JobQueue = MakeJobQueue(8, "backgroundjobs");
-
- m_HealthService.SetHealthInfo({.DataRoot = m_DataRoot,
- .AbsLogPath = ServerOptions.AbsLogFile,
- .HttpServerClass = std::string(ServerOptions.HttpServerConfig.ServerClass),
- .BuildVersion = std::string(ZEN_CFG_VERSION_BUILD_STRING_FULL)});
-
- // Ok so now we're configured, let's kick things off
-
- m_Http = zen::CreateHttpServer(ServerOptions.HttpServerConfig);
- int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort);
-
- // Setup authentication manager
- {
- std::string EncryptionKey = ServerOptions.EncryptionKey;
-
- if (EncryptionKey.empty())
- {
- EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456";
-
- ZEN_WARN("using default encryption key");
- }
-
- std::string EncryptionIV = ServerOptions.EncryptionIV;
-
- if (EncryptionIV.empty())
- {
- EncryptionIV = "0123456789abcdef";
-
- ZEN_WARN("using default encryption initialization vector");
- }
-
- m_AuthMgr = AuthMgr::Create({.RootDirectory = m_DataRoot / "auth",
- .EncryptionKey = AesKey256Bit::FromString(EncryptionKey),
- .EncryptionIV = AesIV128Bit::FromString(EncryptionIV)});
-
- for (const ZenOpenIdProviderConfig& OpenIdProvider : ServerOptions.AuthConfig.OpenIdProviders)
- {
- m_AuthMgr->AddOpenIdProvider({.Name = OpenIdProvider.Name, .Url = OpenIdProvider.Url, .ClientId = OpenIdProvider.ClientId});
- }
- }
-
- m_AuthService = std::make_unique<zen::HttpAuthService>(*m_AuthMgr);
- m_Http->RegisterService(*m_AuthService);
-
- m_Http->RegisterService(m_HealthService);
- m_Http->RegisterService(m_StatsService);
- m_Http->RegisterService(m_StatusService);
- m_StatusService.RegisterHandler("status", *this);
+//////////////////////////////////////////////////////////////////////////
- // Initialize storage and services
+ZenServer::ZenServer()
+{
+}
- ZEN_INFO("initializing storage");
+ZenServer::~ZenServer()
+{
+}
- zen::CidStoreConfiguration Config;
- Config.RootDirectory = m_DataRoot / "cas";
+void
+ZenServer::OnReady()
+{
+ m_ServerEntry->SignalReady();
- m_CidStore = std::make_unique<zen::CidStore>(m_GcManager);
- m_CidStore->Initialize(Config);
- m_CidService.reset(new zen::HttpCidService{*m_CidStore});
+ if (m_IsReadyFunc)
+ {
+ m_IsReadyFunc();
+ }
+}
- ZEN_INFO("instantiating project service");
+int
+ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry)
+{
+ m_UseSentry = ServerOptions.NoSentry == false;
+ m_ServerEntry = ServerEntry;
+ m_DebugOptionForcedCrash = ServerOptions.ShouldCrash;
+ const int ParentPid = ServerOptions.OwnerPid;
- m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue);
- m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr});
+ if (ParentPid)
+ {
+ ProcessHandle OwnerProcess;
+ OwnerProcess.Initialize(ParentPid);
-#if ZEN_WITH_COMPUTE_SERVICES
- if (ServerOptions.ComputeServiceEnabled)
+ if (!OwnerProcess.IsValid())
{
- InitializeCompute(ServerOptions);
- }
- else
- {
- ZEN_INFO("NOT instantiating compute services");
- }
-#endif // ZEN_WITH_COMPUTE_SERVICES
+ ZEN_WARN("Unable to initialize process handle for specified parent pid #{}", ParentPid);
- if (ServerOptions.StructuredCacheConfig.Enabled)
- {
- InitializeStructuredCache(ServerOptions);
+ // If the pid is not reachable should we just shut down immediately? the intended owner process
+ // could have been killed or somehow crashed already
}
else
{
- ZEN_INFO("NOT instantiating structured cache service");
+ ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid);
}
- m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics
-
-#if ZEN_WITH_TESTS
- m_Http->RegisterService(m_TestingService);
-#endif
+ m_ProcessMonitor.AddPid(ParentPid);
+ }
- if (m_HttpProjectService)
- {
- m_Http->RegisterService(*m_HttpProjectService);
- }
+ // Initialize/check mutex based on base port
- m_Http->RegisterService(*m_CidService);
+ std::string MutexName = fmt::format("zen_{}", ServerOptions.BasePort);
-#if ZEN_WITH_COMPUTE_SERVICES
- if (ServerOptions.ComputeServiceEnabled)
- {
- if (m_HttpFunctionService != nullptr)
- {
- m_Http->RegisterService(*m_HttpFunctionService);
- }
- }
-#endif // ZEN_WITH_COMPUTE_SERVICES
+ if (NamedMutex::Exists(MutexName) || ((m_ServerMutex.Create(MutexName) == false)))
+ {
+ throw std::runtime_error(fmt::format("Failed to create mutex '{}' - is another instance already running?", MutexName).c_str());
+ }
- m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot);
+ InitializeState(ServerOptions);
- if (m_FrontendService)
- {
- m_Http->RegisterService(*m_FrontendService);
- }
+ m_JobQueue = MakeJobQueue(8, "backgroundjobs");
- if (ServerOptions.ObjectStoreEnabled)
- {
- ObjectStoreConfig ObjCfg;
- ObjCfg.RootDirectory = m_DataRoot / "obj";
- ObjCfg.ServerPort = static_cast<uint16_t>(EffectiveBasePort);
+ m_HealthService.SetHealthInfo({.DataRoot = m_DataRoot,
+ .AbsLogPath = ServerOptions.AbsLogFile,
+ .HttpServerClass = std::string(ServerOptions.HttpServerConfig.ServerClass),
+ .BuildVersion = std::string(ZEN_CFG_VERSION_BUILD_STRING_FULL)});
- for (const auto& Bucket : ServerOptions.ObjectStoreConfig.Buckets)
- {
- ObjectStoreConfig::BucketConfig NewBucket{.Name = Bucket.Name};
- NewBucket.Directory = Bucket.Directory.empty() ? (ObjCfg.RootDirectory / Bucket.Name) : Bucket.Directory;
- ObjCfg.Buckets.push_back(std::move(NewBucket));
- }
+ // Ok so now we're configured, let's kick things off
- m_ObjStoreService = std::make_unique<HttpObjectStoreService>(std::move(ObjCfg));
- m_Http->RegisterService(*m_ObjStoreService);
- }
+ m_Http = CreateHttpServer(ServerOptions.HttpServerConfig);
+ int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort);
- m_VfsService = std::make_unique<VfsService>();
- m_VfsService->AddService(Ref<ProjectStore>(m_ProjectStore));
- m_VfsService->AddService(Ref<ZenCacheStore>(m_CacheStore));
- m_Http->RegisterService(*m_VfsService);
-
- ZEN_INFO("initializing GC, enabled '{}', interval {}s, lightweight interval {}s",
- ServerOptions.GcConfig.Enabled,
- ServerOptions.GcConfig.IntervalSeconds,
- ServerOptions.GcConfig.LightweightIntervalSeconds);
- zen::GcSchedulerConfig GcConfig{
- .RootDirectory = m_DataRoot / "gc",
- .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds),
- .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds),
- .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds),
- .MaxProjectStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds),
- .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects,
- .Enabled = ServerOptions.GcConfig.Enabled,
- .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize,
- .DiskSizeSoftLimit = ServerOptions.GcConfig.DiskSizeSoftLimit,
- .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites,
- .LightweightInterval = std::chrono::seconds(ServerOptions.GcConfig.LightweightIntervalSeconds)};
- m_GcScheduler.Initialize(GcConfig);
-
- // Create and register admin interface last to make sure all is properly initialized
- m_AdminService =
- std::make_unique<HttpAdminService>(m_GcScheduler,
- *m_JobQueue,
- *m_CacheStore,
- HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile,
- .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log",
- .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"});
- m_Http->RegisterService(*m_AdminService);
-
- return EffectiveBasePort;
- }
-
- void InitializeState(const ZenServerOptions& ServerOptions);
- void InitializeStructuredCache(const ZenServerOptions& ServerOptions);
- void InitializeCompute(const ZenServerOptions& ServerOptions);
-
- void Run()
+ // Setup authentication manager
{
- // This is disabled for now, awaiting better scheduling
- //
- // ScrubStorage();
+ std::string EncryptionKey = ServerOptions.EncryptionKey;
- if (m_ProcessMonitor.IsActive())
+ if (EncryptionKey.empty())
{
- EnqueueTimer();
- }
+ EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456";
- if (!m_TestMode)
- {
- ZEN_INFO("__________ _________ __ ");
- ZEN_INFO("\\____ /____ ____ / _____// |_ ___________ ____ ");
- ZEN_INFO(" / // __ \\ / \\ \\_____ \\\\ __\\/ _ \\_ __ \\_/ __ \\ ");
- ZEN_INFO(" / /\\ ___/| | \\ / \\| | ( <_> ) | \\/\\ ___/ ");
- ZEN_INFO("/_______ \\___ >___| / /_______ /|__| \\____/|__| \\___ >");
- ZEN_INFO(" \\/ \\/ \\/ \\/ \\/ ");
+ ZEN_WARN("using default encryption key");
}
- ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", zen::GetCurrentProcessId());
+ std::string EncryptionIV = ServerOptions.EncryptionIV;
-#if ZEN_USE_SENTRY
- ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED");
- if (m_UseSentry)
+ if (EncryptionIV.empty())
{
- sentry_clear_modulecache();
- }
-#endif
+ EncryptionIV = "0123456789abcdef";
- if (m_DebugOptionForcedCrash)
- {
- ZEN_DEBUG_BREAK();
+ ZEN_WARN("using default encryption initialization vector");
}
- const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode;
-
- SetNewState(kRunning);
-
- OnReady();
-
- m_Http->Run(IsInteractiveMode);
-
- SetNewState(kShuttingDown);
-
- ZEN_INFO(ZEN_APP_NAME " exiting");
+ m_AuthMgr = AuthMgr::Create({.RootDirectory = m_DataRoot / "auth",
+ .EncryptionKey = AesKey256Bit::FromString(EncryptionKey),
+ .EncryptionIV = AesIV128Bit::FromString(EncryptionIV)});
- Flush();
- }
-
- void RequestExit(int ExitCode)
- {
- RequestApplicationExit(ExitCode);
- if (m_Http)
+ for (const ZenOpenIdProviderConfig& OpenIdProvider : ServerOptions.AuthConfig.OpenIdProviders)
{
- m_Http->RequestExit();
+ m_AuthMgr->AddOpenIdProvider({.Name = OpenIdProvider.Name, .Url = OpenIdProvider.Url, .ClientId = OpenIdProvider.ClientId});
}
}
- void Cleanup()
- {
- ZEN_INFO(ZEN_APP_NAME " cleaning up");
- try
- {
- m_IoContext.stop();
- if (m_IoRunner.joinable())
- {
- m_IoRunner.join();
- }
+ m_AuthService = std::make_unique<HttpAuthService>(*m_AuthMgr);
+ m_Http->RegisterService(*m_AuthService);
- if (m_Http)
- {
- m_Http->Close();
- }
- if (m_JobQueue)
- {
- m_JobQueue->Stop();
- }
+ m_Http->RegisterService(m_HealthService);
+ m_Http->RegisterService(m_StatsService);
+ m_Http->RegisterService(m_StatusService);
+ m_StatusService.RegisterHandler("status", *this);
- m_GcScheduler.Shutdown();
- m_AdminService.reset();
- m_VfsService.reset();
- m_ObjStoreService.reset();
- m_FrontendService.reset();
+ // Initialize storage and services
- m_StructuredCacheService.reset();
- m_UpstreamService.reset();
- m_UpstreamCache.reset();
- m_CacheStore = {};
+ ZEN_INFO("initializing storage");
-#if ZEN_WITH_COMPUTE_SERVICES
- m_HttpFunctionService.reset();
-#endif // ZEN_WITH_COMPUTE_SERVICES
+ CidStoreConfiguration Config;
+ Config.RootDirectory = m_DataRoot / "cas";
- m_HttpProjectService.reset();
- m_ProjectStore = {};
- m_CidService.reset();
- m_CidStore.reset();
- m_AuthService.reset();
- m_AuthMgr.reset();
- m_Http = {};
- m_JobQueue.reset();
- }
- catch (std::exception& Ex)
- {
- ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what());
- }
- }
+ m_CidStore = std::make_unique<CidStore>(m_GcManager);
+ m_CidStore->Initialize(Config);
+ m_CidService.reset(new HttpCidService{*m_CidStore});
- void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; }
- void SetTestMode(bool State) { m_TestMode = State; }
- void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; }
- void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; }
+ ZEN_INFO("instantiating project service");
- std::function<void()> m_IsReadyFunc;
- void SetIsReadyFunc(std::function<void()>&& IsReadyFunc) { m_IsReadyFunc = std::move(IsReadyFunc); }
- void OnReady();
+ m_ProjectStore = new ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue);
+ m_HttpProjectService.reset(new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr});
- void EnsureIoRunner()
+#if ZEN_WITH_COMPUTE_SERVICES
+ if (ServerOptions.ComputeServiceEnabled)
{
- if (!m_IoRunner.joinable())
- {
- m_IoRunner = std::thread{[this] {
- zen::SetCurrentThreadName("timer_io");
- m_IoContext.run();
- }};
- }
+ InitializeCompute(ServerOptions);
}
-
- void EnqueueTimer()
+ else
{
- m_PidCheckTimer.expires_after(std::chrono::seconds(1));
- m_PidCheckTimer.async_wait([this](const asio::error_code&) { CheckOwnerPid(); });
-
- EnsureIoRunner();
+ ZEN_INFO("NOT instantiating compute services");
}
+#endif // ZEN_WITH_COMPUTE_SERVICES
- void EnqueueStateMarkerTimer()
+ if (ServerOptions.StructuredCacheConfig.Enabled)
{
- m_StateMakerTimer.expires_after(std::chrono::seconds(5));
- m_StateMakerTimer.async_wait([this](const asio::error_code&) { CheckStateMarker(); });
- EnsureIoRunner();
+ InitializeStructuredCache(ServerOptions);
}
-
- void EnqueueSigIntTimer()
+ else
{
- m_SigIntTimer.expires_after(std::chrono::milliseconds(500));
- m_SigIntTimer.async_wait([this](const asio::error_code&) { CheckSigInt(); });
- EnsureIoRunner();
+ ZEN_INFO("NOT instantiating structured cache service");
}
- void CheckStateMarker()
- {
- std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker";
- try
- {
- if (!std::filesystem::exists(StateMarkerPath))
- {
- ZEN_WARN("state marker at {} has been deleted, exiting", StateMarkerPath);
- RequestExit(1);
- return;
- }
- }
- catch (std::exception& Ex)
- {
- ZEN_WARN("state marker at {} could not be checked, reason: '{}'", StateMarkerPath, Ex.what());
- RequestExit(1);
- return;
- }
- EnqueueStateMarkerTimer();
- }
+ m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics
- void CheckSigInt()
- {
- if (utils::SignalCounter[SIGINT] > 0)
- {
- ZEN_INFO("SIGINT triggered (Ctrl+C), exiting");
- RequestExit(128 + SIGINT);
- return;
- }
- EnqueueSigIntTimer();
- }
+#if ZEN_WITH_TESTS
+ m_Http->RegisterService(m_TestingService);
+#endif
- void CheckOwnerPid()
+ if (m_HttpProjectService)
{
- // Pick up any new "owner" processes
-
- std::set<uint32_t> AddedPids;
+ m_Http->RegisterService(*m_HttpProjectService);
+ }
- for (auto& PidEntry : m_ServerEntry->SponsorPids)
- {
- if (uint32_t ThisPid = PidEntry.load(std::memory_order_relaxed))
- {
- if (PidEntry.compare_exchange_strong(ThisPid, 0))
- {
- if (AddedPids.insert(ThisPid).second)
- {
- m_ProcessMonitor.AddPid(ThisPid);
+ m_Http->RegisterService(*m_CidService);
- ZEN_INFO("added process with pid #{} as a sponsor process", ThisPid);
- }
- }
- }
- }
-
- if (m_ProcessMonitor.IsRunning())
+#if ZEN_WITH_COMPUTE_SERVICES
+ if (ServerOptions.ComputeServiceEnabled)
+ {
+ if (m_HttpFunctionService != nullptr)
{
- EnqueueTimer();
+ m_Http->RegisterService(*m_HttpFunctionService);
}
- else
- {
- ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone");
-
- RequestExit(0);
- }
- }
-
- void ScrubStorage()
- {
- Stopwatch Timer;
- ZEN_INFO("Storage validation STARTING");
-
- WorkerThreadPool ThreadPool{1};
- ScrubContext Ctx{ThreadPool};
- m_CidStore->ScrubStorage(Ctx);
- m_ProjectStore->ScrubStorage(Ctx);
- m_StructuredCacheService->ScrubStorage(Ctx);
-
- const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
-
- ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})",
- NiceTimeSpanMs(ElapsedTimeMs),
- NiceBytes(Ctx.ScrubbedBytes()),
- Ctx.ScrubbedChunks(),
- NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs));
}
+#endif // ZEN_WITH_COMPUTE_SERVICES
- void Flush()
- {
- if (m_CidStore)
- m_CidStore->Flush();
-
- if (m_StructuredCacheService)
- m_StructuredCacheService->Flush();
-
- if (m_ProjectStore)
- m_ProjectStore->Flush();
- }
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot);
- virtual void HandleStatusRequest(HttpServerRequest& Request) override
+ if (m_FrontendService)
{
- CbObjectWriter Cbo;
- Cbo << "ok" << true;
- Cbo << "state" << ToString(m_CurrentState);
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ m_Http->RegisterService(*m_FrontendService);
}
-private:
- ZenServerState::ZenServerEntry* m_ServerEntry = nullptr;
- bool m_IsDedicatedMode = false;
- bool m_TestMode = false;
- CbObject m_RootManifest;
- std::filesystem::path m_DataRoot;
- std::filesystem::path m_ContentRoot;
- std::thread m_IoRunner;
- asio::io_context m_IoContext;
- asio::steady_timer m_PidCheckTimer{m_IoContext};
- asio::steady_timer m_StateMakerTimer{m_IoContext};
- asio::steady_timer m_SigIntTimer{m_IoContext};
- zen::ProcessMonitor m_ProcessMonitor;
- zen::NamedMutex m_ServerMutex;
-
- enum ServerState
+ if (ServerOptions.ObjectStoreEnabled)
{
- kInitializing,
- kRunning,
- kShuttingDown
- } m_CurrentState = kInitializing;
+ ObjectStoreConfig ObjCfg;
+ ObjCfg.RootDirectory = m_DataRoot / "obj";
+ ObjCfg.ServerPort = static_cast<uint16_t>(EffectiveBasePort);
- inline void SetNewState(ServerState NewState) { m_CurrentState = NewState; }
-
- std::string_view ToString(ServerState Value)
- {
- switch (Value)
+ for (const auto& Bucket : ServerOptions.ObjectStoreConfig.Buckets)
{
- case kInitializing:
- return "initializing"sv;
- case kRunning:
- return "running"sv;
- case kShuttingDown:
- return "shutdown"sv;
- default:
- return "unknown"sv;
+ ObjectStoreConfig::BucketConfig NewBucket{.Name = Bucket.Name};
+ NewBucket.Directory = Bucket.Directory.empty() ? (ObjCfg.RootDirectory / Bucket.Name) : Bucket.Directory;
+ ObjCfg.Buckets.push_back(std::move(NewBucket));
}
+
+ m_ObjStoreService = std::make_unique<HttpObjectStoreService>(std::move(ObjCfg));
+ m_Http->RegisterService(*m_ObjStoreService);
}
- zen::Ref<zen::HttpServer> m_Http;
- std::unique_ptr<zen::AuthMgr> m_AuthMgr;
- std::unique_ptr<zen::HttpAuthService> m_AuthService;
- zen::HttpStatusService m_StatusService;
- zen::HttpStatsService m_StatsService;
- zen::GcManager m_GcManager;
- zen::GcScheduler m_GcScheduler{m_GcManager};
- std::unique_ptr<zen::CidStore> m_CidStore;
- Ref<zen::ZenCacheStore> m_CacheStore;
- zen::HttpTestService m_TestService;
-#if ZEN_WITH_TESTS
- zen::HttpTestingService m_TestingService;
-#endif
- std::unique_ptr<zen::HttpCidService> m_CidService;
- zen::RefPtr<zen::ProjectStore> m_ProjectStore;
- std::unique_ptr<zen::HttpProjectService> m_HttpProjectService;
- std::unique_ptr<zen::UpstreamCache> m_UpstreamCache;
- std::unique_ptr<zen::HttpUpstreamService> m_UpstreamService;
- std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService;
- zen::HttpHealthService m_HealthService;
-#if ZEN_WITH_COMPUTE_SERVICES
- std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService;
-#endif // ZEN_WITH_COMPUTE_SERVICES
- std::unique_ptr<zen::HttpFrontendService> m_FrontendService;
- std::unique_ptr<zen::HttpObjectStoreService> m_ObjStoreService;
- std::unique_ptr<zen::VfsService> m_VfsService;
- std::unique_ptr<JobQueue> m_JobQueue;
- std::unique_ptr<zen::HttpAdminService> m_AdminService;
+ m_VfsService = std::make_unique<VfsService>();
+ m_VfsService->AddService(Ref<ProjectStore>(m_ProjectStore));
+ m_VfsService->AddService(Ref<ZenCacheStore>(m_CacheStore));
+ m_Http->RegisterService(*m_VfsService);
- bool m_DebugOptionForcedCrash = false;
- bool m_UseSentry = false;
-};
+ ZEN_INFO("initializing GC, enabled '{}', interval {}s, lightweight interval {}s",
+ ServerOptions.GcConfig.Enabled,
+ ServerOptions.GcConfig.IntervalSeconds,
+ ServerOptions.GcConfig.LightweightIntervalSeconds);
+ GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc",
+ .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds),
+ .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds),
+ .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds),
+ .MaxProjectStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds),
+ .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects,
+ .Enabled = ServerOptions.GcConfig.Enabled,
+ .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize,
+ .DiskSizeSoftLimit = ServerOptions.GcConfig.DiskSizeSoftLimit,
+ .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites,
+ .LightweightInterval = std::chrono::seconds(ServerOptions.GcConfig.LightweightIntervalSeconds)};
+ m_GcScheduler.Initialize(GcConfig);
-void
-ZenServer::OnReady()
-{
- m_ServerEntry->SignalReady();
+ // Create and register admin interface last to make sure all is properly initialized
+ m_AdminService =
+ std::make_unique<HttpAdminService>(m_GcScheduler,
+ *m_JobQueue,
+ *m_CacheStore,
+ HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile,
+ .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log",
+ .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"});
+ m_Http->RegisterService(*m_AdminService);
- if (m_IsReadyFunc)
- {
- m_IsReadyFunc();
- }
+ return EffectiveBasePort;
}
void
@@ -822,7 +332,7 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions)
bool UpdateManifest = false;
std::filesystem::path ManifestPath = m_DataRoot / "root_manifest";
- FileContents ManifestData = zen::ReadFile(ManifestPath);
+ FileContents ManifestData = ReadFile(ManifestPath);
if (ManifestData.ErrorCode)
{
@@ -938,7 +448,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;
- zen::UpstreamCacheOptions UpstreamOptions;
+ UpstreamCacheOptions UpstreamOptions;
UpstreamOptions.ReadUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
UpstreamOptions.WriteUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
@@ -947,7 +457,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
}
- m_UpstreamCache = zen::UpstreamCache::Create(UpstreamOptions, *m_CacheStore, *m_CidStore);
+ m_UpstreamCache = UpstreamCache::Create(UpstreamOptions, *m_CacheStore, *m_CidStore);
m_UpstreamService = std::make_unique<HttpUpstreamService>(*m_UpstreamCache, *m_AuthMgr);
m_UpstreamCache->Initialize();
@@ -962,7 +472,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
{
if (!Dns.empty())
{
- const asio::error_code Err = zen::utils::ResolveHostname(m_IoContext, Dns, "1337"sv, ZenUrls);
+ const asio::error_code Err = utils::ResolveHostname(m_IoContext, Dns, "1337"sv, ZenUrls);
if (Err)
{
ZEN_ERROR("resolve FAILED, reason '{}'", Err.message());
@@ -977,7 +487,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
{
const auto ZenEndpointName = UpstreamConfig.ZenConfig.Name.empty() ? "Zen"sv : UpstreamConfig.ZenConfig.Name;
- std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::UpstreamEndpoint::CreateZenEndpoint(
+ std::unique_ptr<UpstreamEndpoint> ZenEndpoint = UpstreamEndpoint::CreateZenEndpoint(
{.Name = ZenEndpointName,
.Urls = ZenUrls,
.ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
@@ -992,22 +502,20 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
{
std::string_view EndpointName = UpstreamConfig.JupiterConfig.Name.empty() ? "Jupiter"sv : UpstreamConfig.JupiterConfig.Name;
- auto Options =
- zen::CloudCacheClientOptions{.Name = EndpointName,
- .ServiceUrl = UpstreamConfig.JupiterConfig.Url,
- .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace,
- .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace,
- .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
- .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
+ auto Options = CloudCacheClientOptions{.Name = EndpointName,
+ .ServiceUrl = UpstreamConfig.JupiterConfig.Url,
+ .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace,
+ .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
- auto AuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl,
- .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId,
- .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret,
- .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider,
- .AccessToken = UpstreamConfig.JupiterConfig.AccessToken};
+ auto AuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl,
+ .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId,
+ .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret,
+ .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider,
+ .AccessToken = UpstreamConfig.JupiterConfig.AccessToken};
- std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint =
- zen::UpstreamEndpoint::CreateJupiterEndpoint(Options, AuthConfig, *m_AuthMgr);
+ std::unique_ptr<UpstreamEndpoint> JupiterEndpoint = UpstreamEndpoint::CreateJupiterEndpoint(Options, AuthConfig, *m_AuthMgr);
m_UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
}
@@ -1039,37 +547,37 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions)
std::string_view EndpointName = UpstreamConfig.HordeConfig.Name.empty() ? "Horde"sv : UpstreamConfig.HordeConfig.Name;
auto ComputeOptions =
- zen::CloudCacheClientOptions{.Name = EndpointName,
- .ServiceUrl = UpstreamConfig.HordeConfig.Url,
- .ComputeCluster = UpstreamConfig.HordeConfig.Cluster,
- .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
- .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
-
- auto ComputeAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.OAuthUrl,
- .OAuthClientId = UpstreamConfig.HordeConfig.OAuthClientId,
- .OAuthClientSecret = UpstreamConfig.HordeConfig.OAuthClientSecret,
- .OpenIdProvider = UpstreamConfig.HordeConfig.OpenIdProvider,
- .AccessToken = UpstreamConfig.HordeConfig.AccessToken};
+ CloudCacheClientOptions{.Name = EndpointName,
+ .ServiceUrl = UpstreamConfig.HordeConfig.Url,
+ .ComputeCluster = UpstreamConfig.HordeConfig.Cluster,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
+
+ auto ComputeAuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.OAuthUrl,
+ .OAuthClientId = UpstreamConfig.HordeConfig.OAuthClientId,
+ .OAuthClientSecret = UpstreamConfig.HordeConfig.OAuthClientSecret,
+ .OpenIdProvider = UpstreamConfig.HordeConfig.OpenIdProvider,
+ .AccessToken = UpstreamConfig.HordeConfig.AccessToken};
auto StorageOptions =
- zen::CloudCacheClientOptions{.Name = EndpointName,
- .ServiceUrl = UpstreamConfig.HordeConfig.StorageUrl,
- .BlobStoreNamespace = UpstreamConfig.HordeConfig.Namespace,
- .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
- .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
-
- auto StorageAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.StorageOAuthUrl,
- .OAuthClientId = UpstreamConfig.HordeConfig.StorageOAuthClientId,
- .OAuthClientSecret = UpstreamConfig.HordeConfig.StorageOAuthClientSecret,
- .OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider,
- .AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken};
-
- m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CidStore,
- ComputeOptions,
- StorageOptions,
- ComputeAuthConfig,
- StorageAuthConfig,
- *m_AuthMgr);
+ CloudCacheClientOptions{.Name = EndpointName,
+ .ServiceUrl = UpstreamConfig.HordeConfig.StorageUrl,
+ .BlobStoreNamespace = UpstreamConfig.HordeConfig.Namespace,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
+
+ auto StorageAuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.StorageOAuthUrl,
+ .OAuthClientId = UpstreamConfig.HordeConfig.StorageOAuthClientId,
+ .OAuthClientSecret = UpstreamConfig.HordeConfig.StorageOAuthClientSecret,
+ .OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider,
+ .AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken};
+
+ m_HttpFunctionService = std::make_unique<HttpFunctionService>(*m_CidStore,
+ ComputeOptions,
+ StorageOptions,
+ ComputeAuthConfig,
+ StorageAuthConfig,
+ *m_AuthMgr);
}
else
{
@@ -1078,456 +586,281 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions)
}
#endif // ZEN_WITH_COMPUTE_SERVICES
-////////////////////////////////////////////////////////////////////////////////
-
-class ZenEntryPoint
-{
-public:
- ZenEntryPoint(ZenServerOptions& ServerOptions);
- ZenEntryPoint(const ZenEntryPoint&) = delete;
- ZenEntryPoint& operator=(const ZenEntryPoint&) = delete;
- int Run();
-
-private:
- ZenServerOptions& m_ServerOptions;
- zen::LockFile m_LockFile;
-};
-
-ZenEntryPoint::ZenEntryPoint(ZenServerOptions& ServerOptions) : m_ServerOptions(ServerOptions)
-{
-}
-
-#if ZEN_USE_SENTRY
-static void
-SentryLogFunction(sentry_level_t Level, const char* Message, va_list Args, [[maybe_unused]] void* Userdata)
+void
+ZenServer::Run()
{
- char LogMessageBuffer[160];
- std::string LogMessage;
- const char* MessagePtr = LogMessageBuffer;
-
- int n = vsnprintf(LogMessageBuffer, sizeof LogMessageBuffer, Message, Args);
+ // This is disabled for now, awaiting better scheduling
+ //
+ // ScrubStorage();
- if (n >= int(sizeof LogMessageBuffer))
+ if (m_ProcessMonitor.IsActive())
{
- LogMessage.resize(n + 1);
-
- n = vsnprintf(LogMessage.data(), LogMessage.size(), Message, Args);
-
- MessagePtr = LogMessage.c_str();
+ EnqueueTimer();
}
- switch (Level)
+ if (!m_TestMode)
{
- case SENTRY_LEVEL_DEBUG:
- ConsoleLog().debug("sentry: {}", MessagePtr);
- break;
-
- case SENTRY_LEVEL_INFO:
- ConsoleLog().info("sentry: {}", MessagePtr);
- break;
-
- case SENTRY_LEVEL_WARNING:
- ConsoleLog().warn("sentry: {}", MessagePtr);
- break;
-
- case SENTRY_LEVEL_ERROR:
- ConsoleLog().error("sentry: {}", MessagePtr);
- break;
-
- case SENTRY_LEVEL_FATAL:
- ConsoleLog().critical("sentry: {}", MessagePtr);
- break;
+ ZEN_INFO("__________ _________ __ ");
+ ZEN_INFO("\\____ /____ ____ / _____// |_ ___________ ____ ");
+ ZEN_INFO(" / // __ \\ / \\ \\_____ \\\\ __\\/ _ \\_ __ \\_/ __ \\ ");
+ ZEN_INFO(" / /\\ ___/| | \\ / \\| | ( <_> ) | \\/\\ ___/ ");
+ ZEN_INFO("/_______ \\___ >___| / /_______ /|__| \\____/|__| \\___ >");
+ ZEN_INFO(" \\/ \\/ \\/ \\/ \\/ ");
}
-}
-#endif
-
-int
-ZenEntryPoint::Run()
-{
- zen::SetCurrentThreadName("main");
+ ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", GetCurrentProcessId());
#if ZEN_USE_SENTRY
- int SentryErrorCode = 0;
- std::unique_ptr<zen::utils::SentryAssertImpl> SentryAssert;
- std::string SentryUserName;
-
- if (m_ServerOptions.NoSentry == false)
+ ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED");
+ if (m_UseSentry)
{
- std::string SentryDatabasePath = PathToUtf8(m_ServerOptions.DataDir / ".sentry-native");
- if (SentryDatabasePath.starts_with("\\\\?\\"))
- {
- SentryDatabasePath = SentryDatabasePath.substr(4);
- }
- sentry_options_t* SentryOptions = sentry_options_new();
- sentry_options_set_dsn(SentryOptions, "https://[email protected]/5919284");
- sentry_options_set_database_path(SentryOptions, SentryDatabasePath.c_str());
- sentry_options_set_logger(SentryOptions, SentryLogFunction, this);
- std::string SentryAttachmentPath = PathToUtf8(m_ServerOptions.AbsLogFile);
- if (SentryAttachmentPath.starts_with("\\\\?\\"))
- {
- SentryAttachmentPath = SentryAttachmentPath.substr(4);
- }
- sentry_options_add_attachment(SentryOptions, SentryAttachmentPath.c_str());
- sentry_options_set_release(SentryOptions, ZEN_CFG_VERSION);
-
- // sentry_options_set_debug(SentryOptions, 1);
-
- SentryErrorCode = sentry_init(SentryOptions);
-
- if (SentryErrorCode == 0)
- {
- if (m_ServerOptions.SentryAllowPII)
- {
-# if ZEN_PLATFORM_WINDOWS
- CHAR UserNameBuffer[511 + 1];
- DWORD UserNameLength = sizeof(UserNameBuffer) / sizeof(CHAR);
- BOOL OK = GetUserNameA(UserNameBuffer, &UserNameLength);
- if (OK && UserNameLength)
- {
- SentryUserName = std::string(UserNameBuffer, UserNameLength - 1);
- }
-# endif // ZEN_PLATFORM_WINDOWS
-# if (ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC)
- uid_t uid = geteuid();
- struct passwd* pw = getpwuid(uid);
- if (pw)
- {
- SentryUserName = std::string(pw->pw_name);
- }
-# endif
- sentry_value_t SentryUserObject = sentry_value_new_object();
- sentry_value_set_by_key(SentryUserObject, "id", sentry_value_new_string(SentryUserName.c_str()));
- sentry_value_set_by_key(SentryUserObject, "username", sentry_value_new_string(SentryUserName.c_str()));
- sentry_value_set_by_key(SentryUserObject, "ip_address", sentry_value_new_string("{{auto}}"));
- sentry_set_user(SentryUserObject);
- }
-
- auto SentrySink = spdlog::create<utils::sentry_sink>("sentry");
- zen::logging::SetErrorLog(std::move(SentrySink));
-
- SentryAssert = std::make_unique<zen::utils::SentryAssertImpl>();
- }
+ SentryIntegration::ClearCaches();
}
-
- auto _ = zen::MakeGuard([&SentryAssert, SentryErrorCode] {
- if (SentryErrorCode == 0)
- {
- zen::logging::SetErrorLog(std::shared_ptr<spdlog::logger>());
- SentryAssert.reset();
- sentry_close();
- }
- });
#endif
- auto& ServerOptions = m_ServerOptions;
-
- try
+ if (m_DebugOptionForcedCrash)
{
- // Mutual exclusion and synchronization
- ZenServerState ServerState;
- ServerState.Initialize();
- ServerState.Sweep();
-
- ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(ServerOptions.BasePort);
-
- if (Entry)
- {
- if (ServerOptions.OwnerPid)
- {
- ZEN_INFO(
- "Looks like there is already a process listening to this port {} (pid: {}), attaching owner pid {} to running instance",
- ServerOptions.BasePort,
- Entry->Pid.load(),
- ServerOptions.OwnerPid);
-
- Entry->AddSponsorProcess(ServerOptions.OwnerPid);
-
- std::exit(0);
- }
- else
- {
- ZEN_WARN("Exiting since there is already a process listening to port {} (pid: {})",
- ServerOptions.BasePort,
- Entry->Pid.load());
- std::exit(1);
- }
- }
+ ZEN_DEBUG_BREAK();
+ }
- std::error_code Ec;
+ const bool IsInteractiveMode = IsInteractiveSession() && !m_TestMode;
- std::filesystem::path LockFilePath = ServerOptions.DataDir / ".lock";
+ SetNewState(kRunning);
- bool IsReady = false;
+ OnReady();
- auto MakeLockData = [&] {
- CbObjectWriter Cbo;
- Cbo << "pid" << zen::GetCurrentProcessId() << "data" << PathToUtf8(ServerOptions.DataDir) << "port" << ServerOptions.BasePort
- << "session_id" << GetSessionId() << "ready" << IsReady;
- return Cbo.Save();
- };
+ m_Http->Run(IsInteractiveMode);
- m_LockFile.Create(LockFilePath, MakeLockData(), Ec);
+ SetNewState(kShuttingDown);
- if (Ec)
- {
- ZEN_WARN("ERROR: Unable to grab lock at '{}' (error: '{}')", LockFilePath, Ec.message());
+ ZEN_INFO(ZEN_APP_NAME " exiting");
- std::exit(99);
- }
+ Flush();
+}
- InitializeLogging(ServerOptions);
+void
+ZenServer::RequestExit(int ExitCode)
+{
+ RequestApplicationExit(ExitCode);
+ if (m_Http)
+ {
+ m_Http->RequestExit();
+ }
+}
-#if ZEN_USE_SENTRY
- if (m_ServerOptions.NoSentry == false)
+void
+ZenServer::Cleanup()
+{
+ ZEN_INFO(ZEN_APP_NAME " cleaning up");
+ try
+ {
+ m_IoContext.stop();
+ if (m_IoRunner.joinable())
{
- if (SentryErrorCode == 0)
- {
- if (m_ServerOptions.SentryAllowPII)
- {
- ZEN_INFO("sentry initialized, username: '{}'", SentryUserName);
- }
- else
- {
- ZEN_INFO("sentry initialized with anonymous reports");
- }
- }
- else
- {
- ZEN_WARN("sentry_init returned failure! (error code: {})", SentryErrorCode);
- }
+ m_IoRunner.join();
}
-#endif
-
- MaximizeOpenFileCount();
-
- ZEN_INFO(ZEN_APP_NAME " - using lock file at '{}'", LockFilePath);
- ZEN_INFO(ZEN_APP_NAME " - starting on port {}, version '{}'", ServerOptions.BasePort, ZEN_CFG_VERSION_BUILD_STRING_FULL);
-
- Entry = ServerState.Register(ServerOptions.BasePort);
-
- if (ServerOptions.OwnerPid)
+ if (m_Http)
{
- Entry->AddSponsorProcess(ServerOptions.OwnerPid);
+ m_Http->Close();
}
-
- ZenServer Server;
- Server.SetDataRoot(ServerOptions.DataDir);
- Server.SetContentRoot(ServerOptions.ContentDir);
- Server.SetTestMode(ServerOptions.IsTest);
- Server.SetDedicatedMode(ServerOptions.IsDedicated);
-
- auto ServerCleanup = zen::MakeGuard([&Server] { Server.Cleanup(); });
-
- int EffectiveBasePort = Server.Initialize(ServerOptions, Entry);
-
- Entry->EffectiveListenPort = uint16_t(EffectiveBasePort);
- if (EffectiveBasePort != ServerOptions.BasePort)
+ if (m_JobQueue)
{
- ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort);
- ServerOptions.BasePort = EffectiveBasePort;
+ m_JobQueue->Stop();
}
- std::unique_ptr<std::thread> ShutdownThread;
- std::unique_ptr<zen::NamedEvent> ShutdownEvent;
-
- zen::ExtendableStringBuilder<64> ShutdownEventName;
- ShutdownEventName << "Zen_" << ServerOptions.BasePort << "_Shutdown";
- ShutdownEvent.reset(new zen::NamedEvent{ShutdownEventName});
-
- // Monitor shutdown signals
-
- ShutdownThread.reset(new std::thread{[&] {
- zen::SetCurrentThreadName("shutdown_monitor");
-
- ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}'", ShutdownEventName);
-
- if (ShutdownEvent->Wait())
- {
- if (!IsApplicationExitRequested())
- {
- ZEN_INFO("shutdown signal received");
- Server.RequestExit(0);
- }
- }
- else
- {
- ZEN_INFO("shutdown signal wait() failed");
- }
- }});
- auto CleanupShutdown = zen::MakeGuard([&ShutdownEvent, &ShutdownThread] {
- if (ShutdownEvent)
- {
- ShutdownEvent->Set();
- }
- if (ShutdownThread && ShutdownThread->joinable())
- {
- ShutdownThread->join();
- }
- });
-
- // If we have a parent process, establish the mechanisms we need
- // to be able to communicate readiness with the parent
-
- Server.SetIsReadyFunc([&] {
- IsReady = true;
+ m_GcScheduler.Shutdown();
+ m_AdminService.reset();
+ m_VfsService.reset();
+ m_ObjStoreService.reset();
+ m_FrontendService.reset();
- m_LockFile.Update(MakeLockData(), Ec);
+ m_StructuredCacheService.reset();
+ m_UpstreamService.reset();
+ m_UpstreamCache.reset();
+ m_CacheStore = {};
- if (!ServerOptions.ChildId.empty())
- {
- zen::NamedEvent ParentEvent{ServerOptions.ChildId};
- ParentEvent.Set();
- }
- });
+#if ZEN_WITH_COMPUTE_SERVICES
+ m_HttpFunctionService.reset();
+#endif // ZEN_WITH_COMPUTE_SERVICES
- Server.Run();
+ m_HttpProjectService.reset();
+ m_ProjectStore = {};
+ m_CidService.reset();
+ m_CidStore.reset();
+ m_AuthService.reset();
+ m_AuthMgr.reset();
+ m_Http = {};
+ m_JobQueue.reset();
}
- catch (std::exception& e)
+ catch (std::exception& Ex)
{
- SPDLOG_CRITICAL("Caught exception in main: {}", e.what());
- if (!IsApplicationExitRequested())
- {
- RequestApplicationExit(1);
- }
+ ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what());
}
-
- ShutdownLogging();
-
- return ApplicationExitCode();
}
-} // namespace zen
-
-////////////////////////////////////////////////////////////////////////////////
-
-#if ZEN_PLATFORM_WINDOWS
-
-class ZenWindowsService : public WindowsService
+void
+ZenServer::EnsureIoRunner()
{
-public:
- ZenWindowsService(ZenServerOptions& ServerOptions) : m_EntryPoint(ServerOptions) {}
-
- ZenWindowsService(const ZenWindowsService&) = delete;
- ZenWindowsService& operator=(const ZenWindowsService&) = delete;
+ if (!m_IoRunner.joinable())
+ {
+ m_IoRunner = std::thread{[this] {
+ SetCurrentThreadName("timer_io");
+ m_IoContext.run();
+ }};
+ }
+}
- virtual int Run() override;
+void
+ZenServer::EnqueueTimer()
+{
+ m_PidCheckTimer.expires_after(std::chrono::seconds(1));
+ m_PidCheckTimer.async_wait([this](const asio::error_code&) { CheckOwnerPid(); });
-private:
- zen::ZenEntryPoint m_EntryPoint;
-};
+ EnsureIoRunner();
+}
-int
-ZenWindowsService::Run()
+void
+ZenServer::EnqueueStateMarkerTimer()
{
- return m_EntryPoint.Run();
+ m_StateMakerTimer.expires_after(std::chrono::seconds(5));
+ m_StateMakerTimer.async_wait([this](const asio::error_code&) { CheckStateMarker(); });
+ EnsureIoRunner();
}
-#endif // ZEN_PLATFORM_WINDOWS
-
-////////////////////////////////////////////////////////////////////////////////
-
-#if ZEN_WITH_TESTS
-int
-test_main(int argc, char** argv)
+void
+ZenServer::EnqueueSigIntTimer()
{
- zen::zencore_forcelinktests();
- zen::zenhttp_forcelinktests();
- zen::zenstore_forcelinktests();
- zen::z$_forcelink();
- zen::z$service_forcelink();
-
- zen::logging::InitializeLogging();
- spdlog::set_level(spdlog::level::debug);
+ m_SigIntTimer.expires_after(std::chrono::milliseconds(500));
+ m_SigIntTimer.async_wait([this](const asio::error_code&) { CheckSigInt(); });
+ EnsureIoRunner();
+}
- zen::MaximizeOpenFileCount();
+void
+ZenServer::CheckStateMarker()
+{
+ std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker";
+ try
+ {
+ if (!std::filesystem::exists(StateMarkerPath))
+ {
+ ZEN_WARN("state marker at {} has been deleted, exiting", StateMarkerPath);
+ RequestExit(1);
+ return;
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("state marker at {} could not be checked, reason: '{}'", StateMarkerPath, Ex.what());
+ RequestExit(1);
+ return;
+ }
+ EnqueueStateMarkerTimer();
+}
- return ZEN_RUN_TESTS(argc, argv);
+void
+ZenServer::CheckSigInt()
+{
+ if (utils::SignalCounter[SIGINT] > 0)
+ {
+ ZEN_INFO("SIGINT triggered (Ctrl+C), exiting");
+ RequestExit(128 + SIGINT);
+ return;
+ }
+ EnqueueSigIntTimer();
}
-#endif
-int
-main(int argc, char* argv[])
+void
+ZenServer::CheckOwnerPid()
{
- using namespace zen;
+ // Pick up any new "owner" processes
-#if ZEN_USE_MIMALLOC
- mi_version();
-#endif
+ std::set<uint32_t> AddedPids;
- if (argc >= 2)
+ for (auto& PidEntry : m_ServerEntry->SponsorPids)
{
- if (argv[1] == "test"sv)
+ if (uint32_t ThisPid = PidEntry.load(std::memory_order_relaxed))
{
-#if ZEN_WITH_TESTS
- return test_main(argc, argv);
-#else
- fprintf(stderr, "test option not available in release mode!\n");
- exit(5);
-#endif
+ if (PidEntry.compare_exchange_strong(ThisPid, 0))
+ {
+ if (AddedPids.insert(ThisPid).second)
+ {
+ m_ProcessMonitor.AddPid(ThisPid);
+
+ ZEN_INFO("added process with pid #{} as a sponsor process", ThisPid);
+ }
+ }
}
}
- signal(SIGINT, utils::SignalCallbackHandler);
-
- try
+ if (m_ProcessMonitor.IsRunning())
{
- ZenServerOptions ServerOptions;
- ParseCliOptions(argc, argv, ServerOptions);
-
- if (!std::filesystem::exists(ServerOptions.DataDir))
- {
- ServerOptions.IsFirstRun = true;
- std::filesystem::create_directories(ServerOptions.DataDir);
- }
+ EnqueueTimer();
+ }
+ else
+ {
+ ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone");
-#if ZEN_WITH_TRACE
- if (ServerOptions.TraceHost.size())
- {
- TraceStart(ServerOptions.TraceHost.c_str(), TraceType::Network);
- }
- else if (ServerOptions.TraceFile.size())
- {
- TraceStart(ServerOptions.TraceFile.c_str(), TraceType::File);
- }
- else
- {
- TraceInit();
- }
- atexit(TraceShutdown);
-#endif // ZEN_WITH_TRACE
+ RequestExit(0);
+ }
+}
-#if ZEN_PLATFORM_WINDOWS
- if (ServerOptions.InstallService)
- {
- WindowsService::Install();
+void
+ZenServer::ScrubStorage()
+{
+ Stopwatch Timer;
+ ZEN_INFO("Storage validation STARTING");
+
+ WorkerThreadPool ThreadPool{1};
+ ScrubContext Ctx{ThreadPool};
+ m_CidStore->ScrubStorage(Ctx);
+ m_ProjectStore->ScrubStorage(Ctx);
+ m_StructuredCacheService->ScrubStorage(Ctx);
+
+ const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
+
+ ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})",
+ NiceTimeSpanMs(ElapsedTimeMs),
+ NiceBytes(Ctx.ScrubbedBytes()),
+ Ctx.ScrubbedChunks(),
+ NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs));
+}
- std::exit(0);
- }
+void
+ZenServer::Flush()
+{
+ if (m_CidStore)
+ m_CidStore->Flush();
- if (ServerOptions.UninstallService)
- {
- WindowsService::Delete();
+ if (m_StructuredCacheService)
+ m_StructuredCacheService->Flush();
- std::exit(0);
- }
+ if (m_ProjectStore)
+ m_ProjectStore->Flush();
+}
- ZenWindowsService App(ServerOptions);
- return App.ServiceMain();
-#else
- if (ServerOptions.InstallService || ServerOptions.UninstallService)
- {
- throw std::runtime_error("Service mode is not supported on this platform");
- }
+void
+ZenServer::HandleStatusRequest(HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Cbo << "state" << ToString(m_CurrentState);
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
- ZenEntryPoint App(ServerOptions);
- return App.Run();
-#endif // ZEN_PLATFORM_WINDOWS
- }
- catch (std::exception& Ex)
+std::string_view
+ZenServer::ToString(ServerState Value)
+{
+ switch (Value)
{
- fprintf(stderr, "ERROR: Caught exception in main: '%s'", Ex.what());
-
- return 1;
+ case kInitializing:
+ return "initializing"sv;
+ case kRunning:
+ return "running"sv;
+ case kShuttingDown:
+ return "shutdown"sv;
+ default:
+ return "unknown"sv;
}
}
+
+} // namespace zen