// Copyright Epic Games, Inc. All Rights Reserved. #include "computeserver.h" #include #include #include #include "computeservice.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include # include # include # include # include # include # include # include # include # if ZEN_WITH_HORDE # include # include # endif # if ZEN_WITH_NOMAD # include # include # endif ZEN_THIRD_PARTY_INCLUDES_START # include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { void ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) { Options.add_option("compute", "", "upstream-notification-endpoint", "Endpoint URL for upstream notifications", cxxopts::value(m_ServerOptions.UpstreamNotificationEndpoint)->default_value(""), ""); Options.add_option("compute", "", "instance-id", "Instance ID for use in notifications", cxxopts::value(m_ServerOptions.InstanceId)->default_value(""), ""); Options.add_option("compute", "", "coordinator-endpoint", "Endpoint URL for coordinator service", cxxopts::value(m_ServerOptions.CoordinatorEndpoint)->default_value(""), ""); Options.add_option("compute", "", "idms", "Enable IDMS cloud detection; optionally specify a custom probe endpoint", cxxopts::value(m_ServerOptions.IdmsEndpoint)->default_value("")->implicit_value("auto"), ""); # if ZEN_WITH_HORDE // Horde provisioning options Options.add_option("horde", "", "horde-enabled", "Enable Horde worker provisioning", cxxopts::value(m_ServerOptions.HordeConfig.Enabled)->default_value("false"), ""); Options.add_option("horde", "", "horde-server", "Horde server URL", cxxopts::value(m_ServerOptions.HordeConfig.ServerUrl)->default_value(""), ""); Options.add_option("horde", "", "horde-token", "Horde authentication token", cxxopts::value(m_ServerOptions.HordeConfig.AuthToken)->default_value(""), ""); Options.add_option("horde", "", "horde-pool", "Horde pool name", cxxopts::value(m_ServerOptions.HordeConfig.Pool)->default_value(""), ""); Options.add_option("horde", "", "horde-cluster", "Horde cluster ID ('default' or '_auto' for auto-resolve)", cxxopts::value(m_ServerOptions.HordeConfig.Cluster)->default_value("default"), ""); Options.add_option("horde", "", "horde-mode", "Horde connection mode (direct, tunnel, relay)", cxxopts::value(m_HordeModeStr)->default_value("direct"), ""); Options.add_option("horde", "", "horde-encryption", "Horde transport encryption (none, aes)", cxxopts::value(m_HordeEncryptionStr)->default_value("none"), ""); Options.add_option("horde", "", "horde-max-cores", "Maximum number of Horde cores to provision", cxxopts::value(m_ServerOptions.HordeConfig.MaxCores)->default_value("2048"), ""); Options.add_option("horde", "", "horde-host", "Host address for Horde agents to connect back to", cxxopts::value(m_ServerOptions.HordeConfig.HostAddress)->default_value(""), ""); Options.add_option("horde", "", "horde-condition", "Additional Horde agent filter condition", cxxopts::value(m_ServerOptions.HordeConfig.Condition)->default_value(""), ""); Options.add_option("horde", "", "horde-binaries", "Path to directory containing zenserver binary for remote upload", cxxopts::value(m_ServerOptions.HordeConfig.BinariesPath)->default_value(""), ""); Options.add_option("horde", "", "horde-zen-service-port", "Port number for Zen service communication", cxxopts::value(m_ServerOptions.HordeConfig.ZenServicePort)->default_value("8558"), ""); # endif # if ZEN_WITH_NOMAD // Nomad provisioning options Options.add_option("nomad", "", "nomad-enabled", "Enable Nomad worker provisioning", cxxopts::value(m_ServerOptions.NomadConfig.Enabled)->default_value("false"), ""); Options.add_option("nomad", "", "nomad-server", "Nomad HTTP API URL", cxxopts::value(m_ServerOptions.NomadConfig.ServerUrl)->default_value(""), ""); Options.add_option("nomad", "", "nomad-token", "Nomad ACL token", cxxopts::value(m_ServerOptions.NomadConfig.AclToken)->default_value(""), ""); Options.add_option("nomad", "", "nomad-datacenter", "Nomad target datacenter", cxxopts::value(m_ServerOptions.NomadConfig.Datacenter)->default_value("dc1"), ""); Options.add_option("nomad", "", "nomad-namespace", "Nomad namespace", cxxopts::value(m_ServerOptions.NomadConfig.Namespace)->default_value("default"), ""); Options.add_option("nomad", "", "nomad-region", "Nomad region (empty for server default)", cxxopts::value(m_ServerOptions.NomadConfig.Region)->default_value(""), ""); Options.add_option("nomad", "", "nomad-driver", "Nomad task driver (raw_exec, docker)", cxxopts::value(m_NomadDriverStr)->default_value("raw_exec"), ""); Options.add_option("nomad", "", "nomad-distribution", "Binary distribution mode (predeployed, artifact)", cxxopts::value(m_NomadDistributionStr)->default_value("predeployed"), ""); Options.add_option("nomad", "", "nomad-binary-path", "Path to zenserver on Nomad clients (predeployed mode)", cxxopts::value(m_ServerOptions.NomadConfig.BinaryPath)->default_value(""), ""); Options.add_option("nomad", "", "nomad-artifact-source", "URL to download zenserver binary (artifact mode)", cxxopts::value(m_ServerOptions.NomadConfig.ArtifactSource)->default_value(""), ""); Options.add_option("nomad", "", "nomad-docker-image", "Docker image for zenserver (docker driver)", cxxopts::value(m_ServerOptions.NomadConfig.DockerImage)->default_value(""), ""); Options.add_option("nomad", "", "nomad-max-jobs", "Maximum concurrent Nomad jobs", cxxopts::value(m_ServerOptions.NomadConfig.MaxJobs)->default_value("64"), ""); Options.add_option("nomad", "", "nomad-cpu-mhz", "CPU MHz allocated per Nomad task", cxxopts::value(m_ServerOptions.NomadConfig.CpuMhz)->default_value("1000"), ""); Options.add_option("nomad", "", "nomad-memory-mb", "Memory MB allocated per Nomad task", cxxopts::value(m_ServerOptions.NomadConfig.MemoryMb)->default_value("2048"), ""); Options.add_option("nomad", "", "nomad-cores-per-job", "Estimated cores per Nomad job (for scaling)", cxxopts::value(m_ServerOptions.NomadConfig.CoresPerJob)->default_value("32"), ""); Options.add_option("nomad", "", "nomad-max-cores", "Maximum total cores to provision via Nomad", cxxopts::value(m_ServerOptions.NomadConfig.MaxCores)->default_value("2048"), ""); Options.add_option("nomad", "", "nomad-job-prefix", "Prefix for generated Nomad job IDs", cxxopts::value(m_ServerOptions.NomadConfig.JobPrefix)->default_value("zenserver-worker"), ""); # endif } void ZenComputeServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) { ZEN_UNUSED(Options); } void ZenComputeServerConfigurator::ApplyOptions(cxxopts::Options& Options) { ZEN_UNUSED(Options); } void ZenComputeServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions) { ZEN_UNUSED(LuaOptions); } void ZenComputeServerConfigurator::ValidateOptions() { # if ZEN_WITH_HORDE horde::FromString(m_ServerOptions.HordeConfig.Mode, m_HordeModeStr); horde::FromString(m_ServerOptions.HordeConfig.EncryptionMode, m_HordeEncryptionStr); # endif # if ZEN_WITH_NOMAD nomad::FromString(m_ServerOptions.NomadConfig.TaskDriver, m_NomadDriverStr); nomad::FromString(m_ServerOptions.NomadConfig.BinDistribution, m_NomadDistributionStr); # endif } /////////////////////////////////////////////////////////////////////////// ZenComputeServer::ZenComputeServer() { } ZenComputeServer::~ZenComputeServer() { Cleanup(); } int ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry) { ZEN_TRACE_CPU("ZenComputeServer::Initialize"); ZEN_MEMSCOPE(GetZenserverTag()); ZEN_INFO(ZEN_APP_NAME " initializing in COMPUTE server mode"); const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry); if (EffectiveBasePort < 0) { return EffectiveBasePort; } m_CoordinatorEndpoint = ServerConfig.CoordinatorEndpoint; m_InstanceId = ServerConfig.InstanceId; // This is a workaround to make sure we can have automated tests. Without // this the ranges for different child zen compute processes could overlap with // the main test range. ZenServerEnvironment::SetBaseChildId(2000); m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; InitializeState(ServerConfig); InitializeServices(ServerConfig); RegisterServices(ServerConfig); ZenServerBase::Finalize(); return EffectiveBasePort; } void ZenComputeServer::Cleanup() { ZEN_TRACE_CPU("ZenComputeServer::Cleanup"); ZEN_INFO(ZEN_APP_NAME " cleaning up"); try { # if ZEN_WITH_HORDE // Shut down Horde provisioner first — this signals all agent threads // to exit and joins them before we tear down HTTP services. m_HordeProvisioner.reset(); # endif # if ZEN_WITH_NOMAD // Shut down Nomad provisioner — stops the management thread and // sends stop requests for all tracked jobs. m_NomadProvisioner.reset(); # endif ResolveCloudMetadata(); m_CloudMetadata.reset(); // Shut down services that own threads or use the io_context before we // stop the io_context and close the HTTP server. if (m_OrchestratorService) { m_OrchestratorService->Shutdown(); } if (m_FunctionService) { m_FunctionService->Shutdown(); } m_IoContext.stop(); if (m_IoRunner.joinable()) { m_IoRunner.join(); } if (m_Http) { m_Http->Close(); } } catch (const std::exception& Ex) { ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); } } void ZenComputeServer::InitializeState(const ZenComputeServerConfig& ServerConfig) { ZEN_UNUSED(ServerConfig); } void ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) { ZEN_TRACE_CPU("ZenComputeServer::InitializeServices"); ZEN_INFO("initializing compute services"); CidStoreConfiguration Config; Config.RootDirectory = m_DataRoot / "cas"; m_CidStore = std::make_unique(m_GcManager); m_CidStore->Initialize(Config); if (!ServerConfig.IdmsEndpoint.empty()) { ZEN_INFO("detecting cloud environment (async)"); if (ServerConfig.IdmsEndpoint == "auto") { m_CloudMetadataFuture = std::async(std::launch::async, [DataDir = ServerConfig.DataDir] { return std::make_unique(DataDir / "cloud"); }); } else { ZEN_INFO("using custom IDMS endpoint: {}", ServerConfig.IdmsEndpoint); m_CloudMetadataFuture = std::async(std::launch::async, [DataDir = ServerConfig.DataDir, Endpoint = ServerConfig.IdmsEndpoint] { return std::make_unique(DataDir / "cloud", Endpoint); }); } } ZEN_INFO("instantiating API service"); m_ApiService = std::make_unique(*m_Http); ZEN_INFO("instantiating compute service"); m_ComputeService = std::make_unique(ServerConfig.DataDir / "compute"); ZEN_INFO("instantiating orchestrator service"); m_OrchestratorService = std::make_unique(ServerConfig.DataDir / "orch"); ZEN_INFO("instantiating function service"); m_FunctionService = std::make_unique(*m_CidStore, m_StatsService, ServerConfig.DataDir / "functions"); m_FrontendService = std::make_unique(m_ContentRoot, m_StatusService); # if ZEN_WITH_NOMAD // Nomad provisioner if (ServerConfig.NomadConfig.Enabled && !ServerConfig.NomadConfig.ServerUrl.empty()) { ZEN_INFO("instantiating Nomad provisioner (server: {})", ServerConfig.NomadConfig.ServerUrl); const auto& NomadCfg = ServerConfig.NomadConfig; if (!NomadCfg.Validate()) { ZEN_ERROR("invalid Nomad configuration"); } else { ExtendableStringBuilder<256> OrchestratorEndpoint; OrchestratorEndpoint << m_Http->GetServiceUri(m_OrchestratorService.get()); if (auto View = OrchestratorEndpoint.ToView(); !View.empty() && View.back() != '/') { OrchestratorEndpoint << '/'; } m_NomadProvisioner = std::make_unique(NomadCfg, OrchestratorEndpoint); } } # endif # if ZEN_WITH_HORDE // Horde provisioner if (ServerConfig.HordeConfig.Enabled && !ServerConfig.HordeConfig.ServerUrl.empty()) { ZEN_INFO("instantiating Horde provisioner (server: {})", ServerConfig.HordeConfig.ServerUrl); const auto& HordeConfig = ServerConfig.HordeConfig; if (!HordeConfig.Validate()) { ZEN_ERROR("invalid Horde configuration"); } else { ExtendableStringBuilder<256> OrchestratorEndpoint; OrchestratorEndpoint << m_Http->GetServiceUri(m_OrchestratorService.get()); if (auto View = OrchestratorEndpoint.ToView(); !View.empty() && View.back() != '/') { OrchestratorEndpoint << '/'; } // If no binaries path is specified, just use the running executable's directory std::filesystem::path BinariesPath = HordeConfig.BinariesPath.empty() ? GetRunningExecutablePath().parent_path() : std::filesystem::path(HordeConfig.BinariesPath); std::filesystem::path WorkingDir = ServerConfig.DataDir / "horde"; m_HordeProvisioner = std::make_unique(HordeConfig, BinariesPath, WorkingDir, OrchestratorEndpoint); } } # endif } void ZenComputeServer::ResolveCloudMetadata() { if (m_CloudMetadataFuture.valid()) { m_CloudMetadata = m_CloudMetadataFuture.get(); } } std::string ZenComputeServer::GetInstanceId() const { if (!m_InstanceId.empty()) { return m_InstanceId; } return fmt::format("{}-{}", GetMachineName(), GetCurrentProcessId()); } std::string ZenComputeServer::GetAnnounceUrl() const { return m_Http->GetServiceUri(nullptr); } void ZenComputeServer::RegisterServices(const ZenComputeServerConfig& ServerConfig) { ZEN_TRACE_CPU("ZenComputeServer::RegisterServices"); ZEN_UNUSED(ServerConfig); m_Http->RegisterService(m_StatsService); if (m_ComputeService) { m_Http->RegisterService(*m_ComputeService); } if (m_ApiService) { m_Http->RegisterService(*m_ApiService); } if (m_OrchestratorService) { m_Http->RegisterService(*m_OrchestratorService); } if (m_FunctionService) { m_Http->RegisterService(*m_FunctionService); } if (m_FrontendService) { m_Http->RegisterService(*m_FrontendService); } } void ZenComputeServer::PostAnnounce() { ZEN_TRACE_CPU("ZenComputeServer::PostAnnounce"); if (!m_FunctionService || m_CoordinatorEndpoint.empty()) { return; } std::string AnnounceUrl = GetAnnounceUrl(); ZEN_INFO("notifying coordinator at '{}' of our availability at '{}'", m_CoordinatorEndpoint, AnnounceUrl); try { HttpClient CoordinatorHttp(m_CoordinatorEndpoint); CbObjectWriter AnnounceBody; AnnounceBody << "id" << GetInstanceId(); AnnounceBody << "uri" << AnnounceUrl; AnnounceBody << "hostname" << GetMachineName(); ExtendedSystemMetrics Sm = ApplyReportingOverrides(m_MetricsTracker.Query()); AnnounceBody.BeginObject("metrics"); Describe(Sm, AnnounceBody); AnnounceBody.EndObject(); AnnounceBody << "cpu_usage" << Sm.CpuUsagePercent; AnnounceBody << "memory_total" << Sm.SystemMemoryMiB * 1024 * 1024; AnnounceBody << "memory_used" << (Sm.SystemMemoryMiB - Sm.AvailSystemMemoryMiB) * 1024 * 1024; ResolveCloudMetadata(); if (m_CloudMetadata) { m_CloudMetadata->Describe(AnnounceBody); } HttpClient::Response Result = CoordinatorHttp.Post("announce", AnnounceBody.Save()); if (Result.Error) { ZEN_ERROR("failed to notify coordinator at '{}': HTTP error {} - {}", m_CoordinatorEndpoint, Result.Error->ErrorCode, Result.Error->ErrorMessage); } else if (!IsHttpOk(Result.StatusCode)) { ZEN_ERROR("failed to notify coordinator at '{}': unexpected HTTP status code {}", m_CoordinatorEndpoint, static_cast(Result.StatusCode)); } else { ZEN_INFO("successfully notified coordinator at '{}'", m_CoordinatorEndpoint); } } catch (const std::exception& Ex) { ZEN_ERROR("failed to notify coordinator at '{}': {}", m_CoordinatorEndpoint, Ex.what()); } } void ZenComputeServer::EnqueueAnnounceTimer() { if (!m_FunctionService || m_CoordinatorEndpoint.empty()) { return; } m_AnnounceTimer.expires_after(std::chrono::seconds(15)); m_AnnounceTimer.async_wait([this](const asio::error_code& Ec) { if (!Ec) { PostAnnounce(); EnqueueAnnounceTimer(); } }); EnsureIoRunner(); } void ZenComputeServer::Run() { ZEN_TRACE_CPU("ZenComputeServer::Run"); if (m_ProcessMonitor.IsActive()) { CheckOwnerPid(); } if (!m_TestMode) { // clang-format off ZEN_INFO( R"(__________ _________ __ )" "\n" R"(\____ /____ ____ \_ ___ \ ____ _____ ______ __ ___/ |_ ____ )" "\n" R"( / // __ \ / \/ \ \/ / _ \ / \\____ \| | \ __\/ __ \ )" "\n" R"( / /\ ___/| | \ \___( <_> ) Y Y \ |_> > | /| | \ ___/ )" "\n" R"(/_______ \___ >___| /\______ /\____/|__|_| / __/|____/ |__| \___ >)" "\n" R"( \/ \/ \/ \/ \/|__| \/ )"); // clang-format on ExtendableStringBuilder<256> BuildOptions; GetBuildOptions(BuildOptions, '\n'); ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions); } ZEN_INFO(ZEN_APP_NAME " now running as COMPUTE (pid: {})", GetCurrentProcessId()); # if ZEN_PLATFORM_WINDOWS if (zen::windows::IsRunningOnWine()) { ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well"); } # endif # if ZEN_USE_SENTRY ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); if (m_UseSentry) { SentryIntegration::ClearCaches(); } # endif if (m_DebugOptionForcedCrash) { ZEN_DEBUG_BREAK(); } const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode; SetNewState(kRunning); OnReady(); PostAnnounce(); EnqueueAnnounceTimer(); # if ZEN_WITH_HORDE // Start Horde provisioning if configured — request maximum allowed cores. // SetTargetCoreCount clamps to HordeConfig::MaxCores internally. if (m_HordeProvisioner) { m_HordeProvisioner->SetTargetCoreCount(UINT32_MAX); auto Stats = m_HordeProvisioner->GetStats(); ZEN_INFO("Horde provisioning started (target cores: {})", Stats.TargetCoreCount); } # endif # if ZEN_WITH_NOMAD // Start Nomad provisioning if configured — request maximum allowed cores. // SetTargetCoreCount clamps to NomadConfig::MaxCores internally. if (m_NomadProvisioner) { m_NomadProvisioner->SetTargetCoreCount(UINT32_MAX); auto Stats = m_NomadProvisioner->GetStats(); ZEN_INFO("Nomad provisioning started (target cores: {})", Stats.TargetCoreCount); } # endif m_Http->Run(IsInteractiveMode); SetNewState(kShuttingDown); ZEN_INFO(ZEN_APP_NAME " exiting"); } ////////////////////////////////////////////////////////////////////////////////// ZenComputeServerMain::ZenComputeServerMain(ZenComputeServerConfig& ServerOptions) : ZenServerMain(ServerOptions) , m_ServerOptions(ServerOptions) { } void ZenComputeServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) { ZEN_TRACE_CPU("ZenComputeServerMain::DoRun"); ZenComputeServer Server; Server.SetDataRoot(m_ServerOptions.DataDir); Server.SetContentRoot(m_ServerOptions.ContentDir); Server.SetTestMode(m_ServerOptions.IsTest); Server.SetDedicatedMode(m_ServerOptions.IsDedicated); const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry); if (EffectiveBasePort == -1) { // Server.Initialize has already logged what the issue is - just exit with failure code here. std::exit(1); } Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); if (EffectiveBasePort != m_ServerOptions.BasePort) { ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); m_ServerOptions.BasePort = EffectiveBasePort; } std::unique_ptr ShutdownThread; std::unique_ptr ShutdownEvent; ExtendableStringBuilder<64> ShutdownEventName; ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown"; ShutdownEvent.reset(new NamedEvent{ShutdownEventName}); // Monitor shutdown signals ShutdownThread.reset(new std::thread{[&] { SetCurrentThreadName("shutdown_mon"); ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId()); if (ShutdownEvent->Wait()) { ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId()); Server.RequestExit(0); } else { ZEN_INFO("shutdown signal wait() failed"); } }}); auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] { ReportServiceStatus(ServiceStatus::Stopping); if (ShutdownEvent) { ShutdownEvent->Set(); } if (ShutdownThread && ShutdownThread->joinable()) { ShutdownThread->join(); } }); // If we have a parent process, establish the mechanisms we need // to be able to communicate readiness with the parent Server.SetIsReadyFunc([&] { std::error_code Ec; m_LockFile.Update(MakeLockData(true), Ec); ReportServiceStatus(ServiceStatus::Running); NotifyReady(); }); Server.Run(); } } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES