// Copyright Epic Games, Inc. All Rights Reserved. #include "computeserver.h" #include #include "computeservice.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include # include # include # include # include # include # include ZEN_THIRD_PARTY_INCLUDES_START # include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { void ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) { Options.add_option("compute", "", "upstream-notification-endpoint", "Endpoint URL for upstream notifications", cxxopts::value(m_ServerOptions.UpstreamNotificationEndpoint)->default_value(""), ""); Options.add_option("compute", "", "instance-id", "Instance ID for use in notifications", cxxopts::value(m_ServerOptions.InstanceId)->default_value(""), ""); } void ZenComputeServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) { ZEN_UNUSED(Options); } void ZenComputeServerConfigurator::ApplyOptions(cxxopts::Options& Options) { ZEN_UNUSED(Options); } void ZenComputeServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions) { ZEN_UNUSED(LuaOptions); } void ZenComputeServerConfigurator::ValidateOptions() { } /////////////////////////////////////////////////////////////////////////// ZenComputeServer::ZenComputeServer() { } ZenComputeServer::~ZenComputeServer() { Cleanup(); } int ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry) { ZEN_TRACE_CPU("ZenComputeServer::Initialize"); ZEN_MEMSCOPE(GetZenserverTag()); ZEN_INFO(ZEN_APP_NAME " initializing in COMPUTE server mode"); const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry); if (EffectiveBasePort < 0) { return EffectiveBasePort; } // This is a workaround to make sure we can have automated tests. Without // this the ranges for different child zen compute processes could overlap with // the main test range. ZenServerEnvironment::SetBaseChildId(1000); m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; InitializeState(ServerConfig); InitializeServices(ServerConfig); RegisterServices(ServerConfig); ZenServerBase::Finalize(); return EffectiveBasePort; } void ZenComputeServer::Cleanup() { ZEN_TRACE_CPU("ZenComputeServer::Cleanup"); ZEN_INFO(ZEN_APP_NAME " cleaning up"); try { m_IoContext.stop(); if (m_IoRunner.joinable()) { m_IoRunner.join(); } if (m_Http) { m_Http->Close(); } } catch (const std::exception& Ex) { ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); } } void ZenComputeServer::InitializeState(const ZenComputeServerConfig& ServerConfig) { ZEN_UNUSED(ServerConfig); } void ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) { ZEN_INFO("initializing storage"); CidStoreConfiguration Config; Config.RootDirectory = m_DataRoot / "cas"; m_CidStore = std::make_unique(m_GcManager); m_CidStore->Initialize(Config); ZEN_INFO("instantiating API service"); m_ApiService = std::make_unique(*m_Http); ZEN_INFO("instantiating compute service"); m_ComputeService = std::make_unique(ServerConfig.DataDir / "compute"); // Ref Runner; // Runner = zen::compute::CreateLocalRunner(*m_CidStore, ServerConfig.DataDir / "runner"); // TODO: (re)implement default configuration here ZEN_INFO("instantiating function service"); m_FunctionService = std::make_unique(*m_CidStore, m_StatsService, ServerConfig.DataDir / "functions"); } void ZenComputeServer::RegisterServices(const ZenComputeServerConfig& ServerConfig) { ZEN_UNUSED(ServerConfig); if (m_ComputeService) { m_Http->RegisterService(*m_ComputeService); } if (m_ApiService) { m_Http->RegisterService(*m_ApiService); } if (m_FunctionService) { m_Http->RegisterService(*m_FunctionService); } } void ZenComputeServer::Run() { if (m_ProcessMonitor.IsActive()) { CheckOwnerPid(); } if (!m_TestMode) { // clang-format off ZEN_INFO( R"(__________ _________ __ )" "\n" R"(\____ /____ ____ \_ ___ \ ____ _____ ______ __ ___/ |_ ____ )" "\n" R"( / // __ \ / \/ \ \/ / _ \ / \\____ \| | \ __\/ __ \ )" "\n" R"( / /\ ___/| | \ \___( <_> ) Y Y \ |_> > | /| | \ ___/ )" "\n" R"(/_______ \___ >___| /\______ /\____/|__|_| / __/|____/ |__| \___ >)" "\n" R"( \/ \/ \/ \/ \/|__| \/ )"); // clang-format on ExtendableStringBuilder<256> BuildOptions; GetBuildOptions(BuildOptions, '\n'); ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions); } ZEN_INFO(ZEN_APP_NAME " now running as COMPUTE (pid: {})", GetCurrentProcessId()); # if ZEN_PLATFORM_WINDOWS if (zen::windows::IsRunningOnWine()) { ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well"); } # endif # if ZEN_USE_SENTRY ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); if (m_UseSentry) { SentryIntegration::ClearCaches(); } # endif if (m_DebugOptionForcedCrash) { ZEN_DEBUG_BREAK(); } const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode; SetNewState(kRunning); OnReady(); m_Http->Run(IsInteractiveMode); SetNewState(kShuttingDown); ZEN_INFO(ZEN_APP_NAME " exiting"); } ////////////////////////////////////////////////////////////////////////////////// ZenComputeServerMain::ZenComputeServerMain(ZenComputeServerConfig& ServerOptions) : ZenServerMain(ServerOptions) , m_ServerOptions(ServerOptions) { } void ZenComputeServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) { ZenComputeServer Server; Server.SetDataRoot(m_ServerOptions.DataDir); Server.SetContentRoot(m_ServerOptions.ContentDir); Server.SetTestMode(m_ServerOptions.IsTest); Server.SetDedicatedMode(m_ServerOptions.IsDedicated); const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry); if (EffectiveBasePort == -1) { // Server.Initialize has already logged what the issue is - just exit with failure code here. std::exit(1); } Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); if (EffectiveBasePort != m_ServerOptions.BasePort) { ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); m_ServerOptions.BasePort = EffectiveBasePort; } std::unique_ptr ShutdownThread; std::unique_ptr ShutdownEvent; ExtendableStringBuilder<64> ShutdownEventName; ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown"; ShutdownEvent.reset(new NamedEvent{ShutdownEventName}); // Monitor shutdown signals ShutdownThread.reset(new std::thread{[&] { SetCurrentThreadName("shutdown_mon"); ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId()); if (ShutdownEvent->Wait()) { ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId()); Server.RequestExit(0); } else { ZEN_INFO("shutdown signal wait() failed"); } }}); auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] { ReportServiceStatus(ServiceStatus::Stopping); if (ShutdownEvent) { ShutdownEvent->Set(); } if (ShutdownThread && ShutdownThread->joinable()) { ShutdownThread->join(); } }); // If we have a parent process, establish the mechanisms we need // to be able to communicate readiness with the parent Server.SetIsReadyFunc([&] { std::error_code Ec; m_LockFile.Update(MakeLockData(true), Ec); ReportServiceStatus(ServiceStatus::Running); NotifyReady(); }); Server.Run(); } } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES