// Copyright Epic Games, Inc. All Rights Reserved. #include "zenhubserver.h" #include "frontend/frontend.h" #include "httphubservice.h" #include "hub.h" #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { void ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) { Options.add_option("hub", "", "upstream-notification-endpoint", "Endpoint URL for upstream notifications", cxxopts::value(m_ServerOptions.UpstreamNotificationEndpoint)->default_value(""), ""); Options.add_option("hub", "", "instance-id", "Instance ID for use in notifications", cxxopts::value(m_ServerOptions.InstanceId)->default_value(""), ""); Options.add_option("hub", "", "consul-endpoint", "Consul endpoint URL for service registration (empty = disabled)", cxxopts::value(m_ServerOptions.ConsulEndpoint)->default_value(""), ""); Options.add_option("hub", "", "hub-base-port-number", "Base port number for provisioned instances", cxxopts::value(m_ServerOptions.HubBasePortNumber)->default_value("21000"), ""); Options.add_option("hub", "", "hub-instance-limit", "Maximum number of provisioned instances for this hub", cxxopts::value(m_ServerOptions.HubInstanceLimit)->default_value("1000"), ""); #if ZEN_PLATFORM_WINDOWS Options.add_option("hub", "", "hub-use-job-object", "Enable the use of a Windows Job Object for child process management", cxxopts::value(m_ServerOptions.HubUseJobObject)->default_value("true"), ""); #endif // ZEN_PLATFORM_WINDOWS } void ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) { ZEN_UNUSED(Options); } void ZenHubServerConfigurator::ApplyOptions(cxxopts::Options& Options) { ZEN_UNUSED(Options); } void ZenHubServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions) { ZEN_UNUSED(LuaOptions); } void ZenHubServerConfigurator::ValidateOptions() { } /////////////////////////////////////////////////////////////////////////// ZenHubServer::ZenHubServer() { } ZenHubServer::~ZenHubServer() { Cleanup(); } void ZenHubServer::OnProvisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { if (m_ConsulClient) { consul::ServiceRegistrationInfo ServiceInfo{ .ServiceId = std::string(ModuleId), .ServiceName = "zen-storage", // .Address = "localhost", // Let the consul agent figure out out external address // TODO: Info.BaseUri? .Port = Info.Port, .HealthEndpoint = "health", .Tags = std::vector>{std::make_pair("module", std::string(ModuleId)), std::make_pair("zen-hub", std::string(HubInstanceId)), std::make_pair("version", std::string(ZEN_CFG_VERSION))}, .HealthIntervalSeconds = 10, .DeregisterAfterSeconds = 30}; if (!m_ConsulClient->RegisterService(ServiceInfo)) { ZEN_WARN("Failed to register storage server instance for module '{}' with Consul, continuing anyway", ModuleId); } else { ZEN_INFO("Registered storage server instance for module '{}' at port {} with Consul as '{}'", ModuleId, Info.Port, ServiceInfo.ServiceName); } } } void ZenHubServer::OnDeprovisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { ZEN_UNUSED(HubInstanceId); if (m_ConsulClient) { if (!m_ConsulClient->DeregisterService(ModuleId)) { ZEN_WARN("Failed to deregister storage server instance for module '{}' at port {} from Consul, continuing anyway", ModuleId, Info.Port); } else { ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port); } } } int ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry) { ZEN_TRACE_CPU("ZenHubServer::Initialize"); ZEN_MEMSCOPE(GetZenserverTag()); ZEN_INFO(ZEN_APP_NAME " initializing in HUB server mode"); const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry); if (EffectiveBasePort < 0) { return EffectiveBasePort; } // This is a workaround to make sure we can have automated tests. Without // this the ranges for different child zen hub processes could overlap with // the main test range. ZenServerEnvironment::SetBaseChildId(1000); m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; InitializeState(ServerConfig); InitializeConsulRegistration(ServerConfig, EffectiveBasePort); InitializeServices(ServerConfig); RegisterServices(ServerConfig); ZenServerBase::Finalize(); return EffectiveBasePort; } void ZenHubServer::Cleanup() { ZEN_TRACE_CPU("ZenHubServer::Cleanup"); ZEN_INFO(ZEN_APP_NAME " cleaning up"); try { m_IoContext.stop(); if (m_IoRunner.joinable()) { m_IoRunner.join(); } ShutdownServices(); if (m_Http) { m_Http->Close(); } m_FrontendService.reset(); m_HubService.reset(); m_ApiService.reset(); m_Hub.reset(); m_ConsulRegistration.reset(); m_ConsulClient.reset(); } catch (const std::exception& Ex) { ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); } } void ZenHubServer::InitializeState(const ZenHubServerConfig& ServerConfig) { ZEN_UNUSED(ServerConfig); } void ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) { ZEN_UNUSED(ServerConfig); ZEN_INFO("instantiating Hub"); m_Hub = std::make_unique( Hub::Configuration{.UseJobObject = ServerConfig.HubUseJobObject, .BasePortNumber = ServerConfig.HubBasePortNumber, .InstanceLimit = ServerConfig.HubInstanceLimit}, ZenServerEnvironment(ZenServerEnvironment::Hub, ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers"), m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)]( std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { OnProvisioned(HubInstanceId, ModuleId, Info); } : Hub::ProvisionModuleCallbackFunc{}, m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)]( std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) { OnDeprovisioned(HubInstanceId, ModuleId, Info); } : Hub::ProvisionModuleCallbackFunc{}); ZEN_INFO("instantiating API service"); m_ApiService = std::make_unique(*m_Http); ZEN_INFO("instantiating hub service"); m_HubService = std::make_unique(*m_Hub); m_HubService->SetNotificationEndpoint(ServerConfig.UpstreamNotificationEndpoint, ServerConfig.InstanceId); m_FrontendService = std::make_unique(m_ContentRoot, m_StatusService); } void ZenHubServer::RegisterServices(const ZenHubServerConfig& ServerConfig) { ZEN_UNUSED(ServerConfig); if (m_HubService) { m_Http->RegisterService(*m_HubService); } if (m_ApiService) { m_Http->RegisterService(*m_ApiService); } if (m_FrontendService) { m_Http->RegisterService(*m_FrontendService); } } void ZenHubServer::InitializeConsulRegistration(const ZenHubServerConfig& ServerConfig, int EffectivePort) { if (ServerConfig.ConsulEndpoint.empty()) { ZEN_INFO("Consul registration disabled (no endpoint configured)"); return; } ZEN_INFO("Initializing Consul registration with endpoint: {}", ServerConfig.ConsulEndpoint); try { m_ConsulClient = std::make_unique(ServerConfig.ConsulEndpoint); consul::ServiceRegistrationInfo Info; Info.ServiceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId); Info.ServiceName = "zen-hub"; // Info.Address = "localhost"; // Let the consul agent figure out out external address // TODO: Info.BaseUri? Info.Port = static_cast(EffectivePort); Info.HealthEndpoint = "hub/health"; Info.Tags = std::vector>{ std::make_pair("zen-hub", Info.ServiceId), std::make_pair("version", std::string(ZEN_CFG_VERSION)), std::make_pair("base-port-number", fmt::format("{}", ServerConfig.HubBasePortNumber)), std::make_pair("instance-limit", fmt::format("{}", ServerConfig.HubInstanceLimit)), std::make_pair("use-job-object", fmt::format("{}", ServerConfig.HubUseJobObject))}; m_ConsulRegistration = std::make_unique(m_ConsulClient.get(), Info); ZEN_INFO("Consul service registration initiated for service ID: {}", Info.ServiceId); } catch (const std::exception& Ex) { // REQ-F-12: Hub should start successfully even if Consul registration fails ZEN_WARN("Failed to initialize Consul registration (hub will continue without it): {}", Ex.what()); m_ConsulRegistration.reset(); m_ConsulClient.reset(); } } void ZenHubServer::Run() { if (m_ProcessMonitor.IsActive()) { CheckOwnerPid(); } if (!m_TestMode) { // clang-format off ZEN_INFO(R"(__________ ___ ___ ___. )" "\n" R"(\____ /____ ____ / | \ __ _\_ |__ )" "\n" R"( / // __ \ / \ / ~ \ | \ __ \ )" "\n" R"( / /\ ___/| | \ \ Y / | / \_\ \)" "\n" R"(/_______ \___ >___| / \___|_ /|____/|___ /)" "\n" R"( \/ \/ \/ \/ \/ )"); // clang-format on ExtendableStringBuilder<256> BuildOptions; GetBuildOptions(BuildOptions, '\n'); ZEN_INFO("Build options ({}/{}, {}):\n{}", GetOperatingSystemName(), GetCpuName(), GetCompilerName(), BuildOptions); } ZEN_INFO(ZEN_APP_NAME " now running as HUB (pid: {})", GetCurrentProcessId()); #if ZEN_PLATFORM_WINDOWS if (zen::windows::IsRunningOnWine()) { ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well"); } #endif #if ZEN_USE_SENTRY ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); if (m_UseSentry) { SentryIntegration::ClearCaches(); } #endif if (m_DebugOptionForcedCrash) { ZEN_DEBUG_BREAK(); } const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode; if (m_ConsulRegistration) { if (!m_ConsulRegistration->IsRegistered()) { ZEN_INFO("Waiting for consul integration to register..."); m_ConsulRegistration->WaitForReadyEvent(2000); } if (!m_ConsulRegistration->IsRegistered()) { m_ConsulClient.reset(); m_ConsulRegistration.reset(); ZEN_WARN("Consul registration failed, running without consul integration"); } } SetNewState(kRunning); OnReady(); m_Http->Run(IsInteractiveMode); SetNewState(kShuttingDown); ZEN_INFO(ZEN_APP_NAME " exiting"); } ////////////////////////////////////////////////////////////////////////////////// ZenHubServerMain::ZenHubServerMain(ZenHubServerConfig& ServerOptions) : ZenServerMain(ServerOptions), m_ServerOptions(ServerOptions) { } void ZenHubServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) { ZenHubServer Server; Server.SetServerMode("Hub"); Server.SetDataRoot(m_ServerOptions.DataDir); Server.SetContentRoot(m_ServerOptions.ContentDir); Server.SetTestMode(m_ServerOptions.IsTest); Server.SetDedicatedMode(m_ServerOptions.IsDedicated); const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry); if (EffectiveBasePort == -1) { // Server.Initialize has already logged what the issue is - just exit with failure code here. std::exit(1); } Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); if (EffectiveBasePort != m_ServerOptions.BasePort) { ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); m_ServerOptions.BasePort = EffectiveBasePort; } std::unique_ptr ShutdownThread; std::unique_ptr ShutdownEvent; ExtendableStringBuilder<64> ShutdownEventName; ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown"; ShutdownEvent.reset(new NamedEvent{ShutdownEventName}); // Monitor shutdown signals ShutdownThread.reset(new std::thread{[&] { SetCurrentThreadName("shutdown_mon"); ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId()); if (ShutdownEvent->Wait()) { ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId()); Server.RequestExit(0); } else { ZEN_INFO("shutdown signal wait() failed"); } }}); auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] { ReportServiceStatus(ServiceStatus::Stopping); if (ShutdownEvent) { ShutdownEvent->Set(); } if (ShutdownThread && ShutdownThread->joinable()) { ShutdownThread->join(); } }); // If we have a parent process, establish the mechanisms we need // to be able to communicate readiness with the parent Server.SetIsReadyFunc([&] { std::error_code Ec; m_LockFile.Update(MakeLockData(true), Ec); ReportServiceStatus(ServiceStatus::Running); NotifyReady(); }); Server.Run(); } } // namespace zen