aboutsummaryrefslogtreecommitdiff
path: root/zenserver/zenserver.cpp
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-09-21 11:06:13 +0200
committerMartin Ridgers <[email protected]>2021-09-21 11:06:13 +0200
commit68c951e0f440ffd483795dced737e88152c1a581 (patch)
tree5c0910ca2a85b45fb05dba3ce457b7d156213894 /zenserver/zenserver.cpp
parentMerge main into linux-mac (diff)
parentTrigger storage scrubbing pass at startup (diff)
downloadzen-68c951e0f440ffd483795dced737e88152c1a581.tar.xz
zen-68c951e0f440ffd483795dced737e88152c1a581.zip
Merged main into linux-mac
Diffstat (limited to 'zenserver/zenserver.cpp')
-rw-r--r--zenserver/zenserver.cpp220
1 files changed, 178 insertions, 42 deletions
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 53dc41a24..cf24dc224 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -11,28 +11,35 @@
#include <zencore/timer.h>
#include <zencore/windows.h>
#include <zenhttp/httpserver.h>
-#include <zenserverprocess.h>
#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
+#include <zenutil/zenserverprocess.h>
#include <fmt/format.h>
-#include <mimalloc-new-delete.h>
-#include <mimalloc.h>
+
+#if ZEN_USE_MIMALLOC
+# include <mimalloc-new-delete.h>
+# include <mimalloc.h>
+#endif
+
#include <asio.hpp>
#include <exception>
#include <list>
#include <lua.hpp>
#include <optional>
#include <regex>
+#include <set>
#include <unordered_map>
//////////////////////////////////////////////////////////////////////////
// We don't have any doctest code in this file but this is needed to bring
// in some shared code into the executable
-#define DOCTEST_CONFIG_IMPLEMENT
-#include <doctest/doctest.h>
-#undef DOCTEST_CONFIG_IMPLEMENT
+#if ZEN_WITH_TESTS
+# define DOCTEST_CONFIG_IMPLEMENT
+# include <zencore/testing.h>
+# undef DOCTEST_CONFIG_IMPLEMENT
+#endif
//////////////////////////////////////////////////////////////////////////
@@ -40,6 +47,10 @@
#include "config.h"
#include "diag/logging.h"
+#if ZEN_PLATFORM_WINDOWS
+# include "windows/service.h"
+#endif
+
//////////////////////////////////////////////////////////////////////////
// Sentry
//
@@ -82,11 +93,16 @@
#define ZEN_APP_NAME "Zen store"
+namespace zen {
+
class ZenServer
{
+ ZenServerState::ZenServerEntry* m_ServerEntry = nullptr;
+
public:
- void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid)
+ void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid, ZenServerState::ZenServerEntry* ServerEntry)
{
+ m_ServerEntry = ServerEntry;
using namespace fmt::literals;
ZEN_INFO(ZEN_APP_NAME " initializing");
@@ -94,23 +110,29 @@ public:
if (ParentPid)
{
- m_Process.Initialize(ParentPid);
+ zen::ProcessHandle OwnerProcess;
+ OwnerProcess.Initialize(ParentPid);
- if (!m_Process.IsValid())
+ if (!OwnerProcess.IsValid())
{
ZEN_WARN("Unable to initialize process handle for specified parent pid #{}", ParentPid);
+
+ // If the pid is not reachable should we just shut down immediately? the intended owner process
+ // could have been killed or somehow crashed already
}
else
{
ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid);
}
+
+ m_ProcessMonitor.AddPid(ParentPid);
}
// Initialize/check mutex based on base port
std::string MutexName = "zen_{}"_format(BasePort);
- if (zen::NamedMutex::Exists(MutexName) || (m_ServerMutex.Create(MutexName) == false))
+ if (zen::NamedMutex::Exists(MutexName) || ((m_ServerMutex.Create(MutexName) == false)))
{
throw std::runtime_error("Failed to create mutex '{}' - is another instance already running?"_format(MutexName).c_str());
}
@@ -151,11 +173,15 @@ public:
m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache");
std::unique_ptr<zen::UpstreamCache> UpstreamCache;
- if (ServiceConfig.UpstreamCacheConfig.Enabled)
+ if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
{
const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig;
zen::UpstreamCacheOptions UpstreamOptions;
+ UpstreamOptions.ReadUpstream =
+ (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
+ UpstreamOptions.WriteUpstream =
+ (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
if (UpstreamConfig.UpstreamThreadCount < 32)
{
@@ -201,7 +227,11 @@ public:
if (UpstreamCache->Initialize())
{
- ZEN_INFO("upstream cache active");
+ ZEN_INFO("upstream cache active ({})",
+ UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE"
+ : UpstreamOptions.ReadUpstream ? "READONLY"
+ : UpstreamOptions.WriteUpstream ? "WRITEONLY"
+ : "DISABLED");
}
else
{
@@ -267,7 +297,9 @@ public:
void Run()
{
- if (m_Process.IsValid())
+ Scrub();
+
+ if (m_ProcessMonitor.IsActive())
{
EnqueueTimer();
}
@@ -282,7 +314,7 @@ public:
ZEN_INFO(" \\/ \\/ \\/ \\/ \\/ ");
}
- ZEN_INFO(ZEN_APP_NAME " now running");
+ ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", zen::GetCurrentProcessId());
#if USE_SENTRY
sentry_clear_modulecache();
@@ -293,7 +325,9 @@ public:
__debugbreak();
}
- m_Http->Run(m_TestMode);
+ const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode;
+
+ m_Http->Run(IsInteractiveMode);
ZEN_INFO(ZEN_APP_NAME " exiting");
@@ -332,18 +366,51 @@ public:
void CheckOwnerPid()
{
- if (m_Process.IsRunning())
+ // Pick up any new "owner" processes
+
+ std::set<uint32_t> AddedPids;
+
+ for (auto& PidEntry : m_ServerEntry->SponsorPids)
+ {
+ if (uint32_t ThisPid = PidEntry.load(std::memory_order::memory_order_relaxed))
+ {
+ if (PidEntry.compare_exchange_strong(ThisPid, 0))
+ {
+ if (AddedPids.insert(ThisPid).second)
+ {
+ m_ProcessMonitor.AddPid(ThisPid);
+
+ ZEN_INFO("added process with pid #{} as a sponsor process", ThisPid);
+ }
+ }
+ }
+ }
+
+ if (m_ProcessMonitor.IsRunning())
{
EnqueueTimer();
}
else
{
- ZEN_INFO(ZEN_APP_NAME " exiting since parent process id {} is gone", m_Process.Pid());
+ ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone");
RequestExit(0);
}
}
+ void Scrub()
+ {
+ ZEN_INFO("Storage validation STARTING");
+
+ ScrubContext Ctx;
+ m_CasStore->Scrub(Ctx);
+ m_CidStore->Scrub(Ctx);
+ m_ProjectStore->Scrub(Ctx);
+ m_StructuredCacheService->Scrub(Ctx);
+
+ ZEN_INFO("Storage validation DONE");
+ }
+
void Flush()
{
if (m_CasStore)
@@ -366,16 +433,16 @@ private:
std::jthread m_IoRunner;
asio::io_context m_IoContext;
asio::steady_timer m_PidCheckTimer{m_IoContext};
- zen::ProcessHandle m_Process;
+ zen::ProcessMonitor m_ProcessMonitor;
zen::NamedMutex m_ServerMutex;
zen::Ref<zen::HttpServer> m_Http;
std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()};
std::unique_ptr<zen::CidStore> m_CidStore;
- std::unique_ptr<ZenCacheStore> m_CacheStore;
+ std::unique_ptr<zen::ZenCacheStore> m_CacheStore;
zen::CasGc m_Gc{*m_CasStore};
zen::CasScrubber m_Scrubber{*m_CasStore};
- HttpTestService m_TestService;
+ zen::HttpTestService m_TestService;
zen::HttpTestingService m_TestingService;
zen::HttpCasService m_CasService{*m_CasStore};
zen::RefPtr<zen::ProjectStore> m_ProjectStore;
@@ -383,23 +450,39 @@ private:
std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService;
std::unique_ptr<zen::HttpProjectService> m_HttpProjectService;
std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService;
- HttpAdminService m_AdminService;
- HttpHealthService m_HealthService;
+ zen::HttpAdminService m_AdminService;
+ zen::HttpHealthService m_HealthService;
zen::Mesh m_ZenMesh{m_IoContext};
std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService;
bool m_DebugOptionForcedCrash = false;
};
-int
-main(int argc, char* argv[])
+} // namespace zen
+
+class ZenWindowsService : public WindowsService
{
- mi_version();
+public:
+ ZenWindowsService(ZenServerOptions& GlobalOptions, ZenServiceConfig& ServiceConfig)
+ : m_GlobalOptions(GlobalOptions)
+ , m_ServiceConfig(ServiceConfig)
+ {
+ }
- ZenServerOptions GlobalOptions;
- ZenServiceConfig ServiceConfig;
- ParseGlobalCliOptions(argc, argv, GlobalOptions, ServiceConfig);
- InitializeLogging(GlobalOptions);
+ ZenWindowsService(const ZenWindowsService&) = delete;
+ ZenWindowsService& operator=(const ZenWindowsService&) = delete;
+
+ virtual int Run() override;
+
+private:
+ ZenServerOptions& m_GlobalOptions;
+ ZenServiceConfig& m_ServiceConfig;
+};
+
+int
+ZenWindowsService::Run()
+{
+ using namespace zen;
#if USE_SENTRY
// Initialize sentry.io client
@@ -408,30 +491,47 @@ main(int argc, char* argv[])
sentry_options_set_dsn(SentryOptions, "https://[email protected]/5919284");
sentry_init(SentryOptions);
- auto _ = zen::MakeGuard([&] { sentry_close(); });
+ auto _ = zen::MakeGuard([] { sentry_close(); });
#endif
- // Prototype config system, let's see how this pans out
-
- ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig);
-
- ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort);
+ auto& GlobalOptions = m_GlobalOptions;
+ auto& ServiceConfig = m_ServiceConfig;
try
{
+ // Prototype config system, we'll see how this pans out
+ //
+ // TODO: we need to report any parse errors here
+
+ ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig);
+
+ ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort);
+
ZenServerState ServerState;
ServerState.Initialize();
ServerState.Sweep();
- if (ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(GlobalOptions.BasePort))
+ ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(GlobalOptions.BasePort);
+
+ if (Entry)
{
// Instance already running for this port? Should double check pid
ZEN_WARN("Looks like there is already a process listening to this port (pid: {})", Entry->Pid);
+
+ if (GlobalOptions.OwnerPid)
+ {
+ Entry->AddSponsorProcess(GlobalOptions.OwnerPid);
+
+ std::exit(0);
+ }
}
- else
+
+ Entry = ServerState.Register(GlobalOptions.BasePort);
+
+ if (GlobalOptions.OwnerPid)
{
- ServerState.Register(GlobalOptions.BasePort);
+ Entry->AddSponsorProcess(GlobalOptions.OwnerPid);
}
std::unique_ptr<std::thread> ShutdownThread;
@@ -445,15 +545,17 @@ main(int argc, char* argv[])
Server.SetDataRoot(GlobalOptions.DataDir);
Server.SetTestMode(GlobalOptions.IsTest);
Server.SetDedicatedMode(GlobalOptions.IsDedicated);
- Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid);
+ Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid, Entry);
// Monitor shutdown signals
ShutdownThread.reset(new std::thread{[&] {
ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}'", ShutdownEventName);
- ShutdownEvent->Wait();
- ZEN_INFO("shutdown signal received");
- Server.RequestExit(0);
+ if (ShutdownEvent->Wait())
+ {
+ ZEN_INFO("shutdown signal received");
+ Server.RequestExit(0);
+ }
}});
// If we have a parent process, establish the mechanisms we need
@@ -480,3 +582,37 @@ main(int argc, char* argv[])
return 0;
}
+
+int
+main(int argc, char* argv[])
+{
+ using namespace zen;
+
+#if ZEN_USE_MIMALLOC
+ mi_version();
+#endif
+
+ ZenServerOptions GlobalOptions;
+ ZenServiceConfig ServiceConfig;
+ ParseGlobalCliOptions(argc, argv, GlobalOptions, ServiceConfig);
+ InitializeLogging(GlobalOptions);
+
+#if ZEN_PLATFORM_WINDOWS
+ if (GlobalOptions.InstallService)
+ {
+ WindowsService::Install();
+
+ std::exit(0);
+ }
+
+ if (GlobalOptions.UninstallService)
+ {
+ WindowsService::Delete();
+
+ std::exit(0);
+ }
+#endif
+
+ ZenWindowsService App(GlobalOptions, ServiceConfig);
+ return App.ServiceMain();
+}