// Copyright Epic Games, Inc. All Rights Reserved. #include "zenproxyserver.h" #include "frontend/frontend.h" #include "proxy/httpproxystats.h" #include #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { ////////////////////////////////////////////////////////////////////////// // Configurator void ZenProxyServerConfigurator::AddCliOptions(cxxopts::Options& Options) { Options.add_option("proxy", "", "proxy-map", "Proxy mapping (see documentation for full format)", cxxopts::value>(m_RawProxyMappings), ""); Options.parse_positional({"proxy-map"}); Options.show_positional_help(); } void ZenProxyServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) { ZEN_UNUSED(Options); } void ZenProxyServerConfigurator::ApplyOptions(cxxopts::Options& Options) { ZEN_UNUSED(Options); } void ZenProxyServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions) { ZEN_UNUSED(LuaOptions); } static ProxyMapping ParseProxyMapping(const std::string& Raw) { // Preferred format using "=" as the listen/target separator: // listen_spec=target_spec // where listen_spec is [addr:]port or unix:path // and target_spec is host:port or unix:path // // Examples: // 9000=127.0.0.1:8558 (TCP -> TCP) // 10.0.0.1:9000=10.0.0.2:8558 (TCP -> TCP) // 9000=unix:/tmp/target.sock (TCP -> Unix) // 9000=unix:C:\Users\foo\zen.sock (TCP -> Unix, Windows path) // unix:/tmp/listen.sock=localhost:8558 (Unix -> TCP) // unix:C:\foo\l.sock=unix:C:\foo\t.sock (Unix -> Unix, Windows paths) // // Legacy format using colon-only separators (no "=" present): // [listen_addr:]listen_port:target_host:target_port (TCP -> TCP) // [listen_addr:]listen_port:unix:target_socket_path (TCP -> Unix) // unix:listen_socket_path:target_host:target_port (Unix -> TCP, path must not contain colons) // unix:listen_socket_path:unix:target_socket_path (Unix -> Unix, listen path must not contain colons) auto ThrowBadMapping = [&](std::string_view Detail) { throw OptionParseException(fmt::format("invalid proxy mapping '{}': {}", Raw, Detail), ""); }; auto ParsePort = [&](std::string_view Field) -> uint16_t { std::optional Port = ParseInt(Field); if (!Port) { ThrowBadMapping(fmt::format("'{}' is not a valid port number", Field)); } return *Port; }; auto RequireNonEmpty = [&](std::string_view Value, std::string_view Label) { if (Value.empty()) { ThrowBadMapping(fmt::format("empty {}", Label)); } }; // Parse a listen spec: [addr:]port or unix:path auto ParseListenSpec = [&](std::string_view Spec, ProxyMapping& Out) { if (Spec.substr(0, 5) == "unix:") { Out.ListenUnixSocket = Spec.substr(5); RequireNonEmpty(Out.ListenUnixSocket, "listen unix socket path"); } else { size_t ColonPos = Spec.find(':'); if (ColonPos == std::string_view::npos) { Out.ListenPort = ParsePort(Spec); } else { Out.ListenAddress = Spec.substr(0, ColonPos); Out.ListenPort = ParsePort(Spec.substr(ColonPos + 1)); } } }; // Parse a target spec: host:port or unix:path auto ParseTargetSpec = [&](std::string_view Spec, ProxyMapping& Out) { if (Spec.substr(0, 5) == "unix:") { Out.TargetUnixSocket = Spec.substr(5); RequireNonEmpty(Out.TargetUnixSocket, "target unix socket path"); } else { size_t ColonPos = Spec.rfind(':'); if (ColonPos == std::string_view::npos) { ThrowBadMapping("target must be host:port or unix:path"); } Out.TargetHost = Spec.substr(0, ColonPos); Out.TargetPort = ParsePort(Spec.substr(ColonPos + 1)); } }; ProxyMapping Mapping; // Check for the "=" separator first. size_t EqPos = Raw.find('='); if (EqPos != std::string::npos) { std::string_view ListenSpec = std::string_view(Raw).substr(0, EqPos); std::string_view TargetSpec = std::string_view(Raw).substr(EqPos + 1); RequireNonEmpty(ListenSpec, "listen spec"); RequireNonEmpty(TargetSpec, "target spec"); ParseListenSpec(ListenSpec, Mapping); ParseTargetSpec(TargetSpec, Mapping); return Mapping; } // Legacy colon-only format. Extract fields left-to-right; when we encounter the // "unix" keyword, everything after the next colon is the socket path taken verbatim. // Listen-side unix socket paths must not contain colons in this format. auto RequireColon = [&](size_t From) -> size_t { size_t Pos = Raw.find(':', From); if (Pos == std::string::npos) { ThrowBadMapping("expected [listen_addr:]listen_port:target_host:target_port or use '=' separator"); } return Pos; }; size_t Pos1 = RequireColon(0); std::string Field1 = Raw.substr(0, Pos1); size_t Pos2 = RequireColon(Pos1 + 1); std::string Field2 = Raw.substr(Pos1 + 1, Pos2 - Pos1 - 1); // unix:listen_path:... if (Field1 == "unix") { Mapping.ListenUnixSocket = Field2; RequireNonEmpty(Mapping.ListenUnixSocket, "listen unix socket path"); ParseTargetSpec(std::string_view(Raw).substr(Pos2 + 1), Mapping); return Mapping; } // listen_port:unix:target_socket_path if (Field2 == "unix") { Mapping.ListenPort = ParsePort(Field1); Mapping.TargetUnixSocket = Raw.substr(Pos2 + 1); RequireNonEmpty(Mapping.TargetUnixSocket, "target unix socket path"); return Mapping; } size_t Pos3 = Raw.find(':', Pos2 + 1); if (Pos3 == std::string::npos) { // listen_port:target_host:target_port Mapping.ListenPort = ParsePort(Field1); Mapping.TargetHost = Field2; Mapping.TargetPort = ParsePort(std::string_view(Raw).substr(Pos2 + 1)); return Mapping; } std::string Field3 = Raw.substr(Pos2 + 1, Pos3 - Pos2 - 1); // listen_addr:listen_port:unix:target_socket_path if (Field3 == "unix") { Mapping.ListenAddress = Field1; Mapping.ListenPort = ParsePort(Field2); Mapping.TargetUnixSocket = Raw.substr(Pos3 + 1); RequireNonEmpty(Mapping.TargetUnixSocket, "target unix socket path"); return Mapping; } // listen_addr:listen_port:target_host:target_port std::string Field4 = Raw.substr(Pos3 + 1); if (Field4.find(':') != std::string::npos) { ThrowBadMapping("expected [listen_addr:]listen_port:target_host:target_port or use '=' separator"); } Mapping.ListenAddress = Field1; Mapping.ListenPort = ParsePort(Field2); Mapping.TargetHost = Field3; Mapping.TargetPort = ParsePort(Field4); return Mapping; } void ZenProxyServerConfigurator::ValidateOptions() { if (m_ServerOptions.BasePort == 0) { m_ServerOptions.BasePort = ZenProxyServerConfig::kDefaultProxyPort; } if (m_ServerOptions.DataDir.empty()) { std::filesystem::path SystemRoot = m_ServerOptions.SystemRootDir; if (SystemRoot.empty()) { SystemRoot = PickDefaultSystemRootDirectory(); } if (!SystemRoot.empty()) { m_ServerOptions.DataDir = SystemRoot / "Proxy"; } } for (const std::string& Raw : m_RawProxyMappings) { // The mode keyword "proxy" from argv[1] gets captured as a positional // argument — skip it. if (Raw == "proxy") { continue; } m_ServerOptions.ProxyMappings.push_back(ParseProxyMapping(Raw)); } } ////////////////////////////////////////////////////////////////////////// // ZenProxyServer ZenProxyServer::ZenProxyServer() { } ZenProxyServer::~ZenProxyServer() { Cleanup(); } int ZenProxyServer::Initialize(const ZenProxyServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry) { ZEN_TRACE_CPU("ZenProxyServer::Initialize"); ZEN_MEMSCOPE(GetZenserverTag()); ZEN_INFO(ZEN_APP_NAME " initializing in PROXY server mode"); const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry); if (EffectiveBasePort < 0) { return EffectiveBasePort; } for (const ProxyMapping& Mapping : ServerConfig.ProxyMappings) { auto Service = std::make_unique(m_ProxyIoContext, Mapping); Service->Start(); m_ProxyServices.push_back(std::move(Service)); } // Keep the io_context alive even when there is no pending work, so that // worker threads don't exit prematurely between async operations. m_ProxyIoWorkGuard.emplace(m_ProxyIoContext.get_executor()); // Start proxy I/O worker threads. Use a modest thread count — proxy work is // I/O-bound so we don't need a thread per core, but having more than one // avoids head-of-line blocking when many connections are active. unsigned int ThreadCount = std::max(GetHardwareConcurrency() / 4, 4u); for (unsigned int i = 0; i < ThreadCount; ++i) { m_ProxyIoThreads.emplace_back([this, i] { ExtendableStringBuilder<32> ThreadName; ThreadName << "proxy_io_" << i; SetCurrentThreadName(ThreadName); m_ProxyIoContext.run(); }); } ZEN_INFO("proxy I/O thread pool started with {} threads", ThreadCount); m_ApiService = std::make_unique(*m_Http); m_Http->RegisterService(*m_ApiService); m_FrontendService = std::make_unique(m_ContentRoot, m_StatusService); m_Http->RegisterService(*m_FrontendService); std::string DefaultRecordDir = (m_DataRoot / "recordings").string(); m_ProxyStatsService = std::make_unique(m_ProxyServices, m_StatsService, std::move(DefaultRecordDir)); m_Http->RegisterService(*m_ProxyStatsService); EnsureIoRunner(); ZenServerBase::Finalize(); return EffectiveBasePort; } void ZenProxyServer::Run() { if (m_ProcessMonitor.IsActive()) { CheckOwnerPid(); } if (!m_TestMode) { // clang-format off ZEN_INFO(R"(__________ __________ )" "\n" R"(\____ /____ ____ \______ \_______ _______ ______.__. )" "\n" R"( / // __ \ / \ | ___/\_ __ \/ _ \ \/ < | | )" "\n" R"( / /\ ___/| | \ | | | | \( <_> > < \___ | )" "\n" R"(/_______ \___ >___| / |____| |__| \____/__/\_ \/ ____| )" "\n" R"( \/ \/ \/ \/\/ )"); // clang-format on ExtendableStringBuilder<256> BuildOptions; GetBuildOptions(BuildOptions, '\n'); ZEN_INFO("Build options ({}/{}, {}):\n{}", GetOperatingSystemName(), GetCpuName(), GetCompilerName(), BuildOptions); } ZEN_INFO(ZEN_APP_NAME " now running as PROXY (pid: {})", GetCurrentProcessId()); #if ZEN_PLATFORM_WINDOWS if (zen::windows::IsRunningOnWine()) { ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well"); } #endif #if ZEN_USE_SENTRY ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); if (m_UseSentry) { SentryIntegration::ClearCaches(); } #endif const bool IsInteractiveMode = IsInteractiveSession(); SetNewState(kRunning); OnReady(); m_Http->Run(IsInteractiveMode); SetNewState(kShuttingDown); ZEN_INFO(ZEN_APP_NAME " exiting"); } void ZenProxyServer::Cleanup() { ZEN_TRACE_CPU("ZenProxyServer::Cleanup"); ZEN_INFO(ZEN_APP_NAME " cleaning up"); try { for (auto& Service : m_ProxyServices) { Service->Stop(); } m_ProxyIoWorkGuard.reset(); // releases the work guard, allowing io_context to finish m_ProxyIoContext.stop(); for (auto& Thread : m_ProxyIoThreads) { if (Thread.joinable()) { Thread.join(); } } m_ProxyIoThreads.clear(); m_ProxyServices.clear(); m_IoContext.stop(); if (m_IoRunner.joinable()) { m_IoRunner.join(); } m_ProxyStatsService.reset(); m_FrontendService.reset(); m_ApiService.reset(); ShutdownServices(); if (m_Http) { m_Http->Close(); } } catch (const std::exception& Ex) { ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); } } ////////////////////////////////////////////////////////////////////////// // ZenProxyServerMain ZenProxyServerMain::ZenProxyServerMain(ZenProxyServerConfig& ServerOptions) : ZenServerMain(ServerOptions), m_ServerOptions(ServerOptions) { } void ZenProxyServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) { ZenProxyServer Server; Server.SetServerMode("Proxy"); Server.SetDataRoot(m_ServerOptions.DataDir); Server.SetContentRoot(m_ServerOptions.ContentDir); Server.SetTestMode(m_ServerOptions.IsTest); Server.SetDedicatedMode(m_ServerOptions.IsDedicated); const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry); if (EffectiveBasePort == -1) { std::exit(1); } Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); if (EffectiveBasePort != m_ServerOptions.BasePort) { ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); m_ServerOptions.BasePort = EffectiveBasePort; } std::unique_ptr ShutdownThread; std::unique_ptr ShutdownEvent; ExtendableStringBuilder<64> ShutdownEventName; ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown"; ShutdownEvent.reset(new NamedEvent{ShutdownEventName}); ShutdownThread.reset(new std::thread{[&] { SetCurrentThreadName("shutdown_mon"); ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId()); if (ShutdownEvent->Wait()) { ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId()); Server.RequestExit(0); } else { ZEN_INFO("shutdown signal wait() failed"); } }}); auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] { ReportServiceStatus(ServiceStatus::Stopping); if (ShutdownEvent) { ShutdownEvent->Set(); } if (ShutdownThread && ShutdownThread->joinable()) { ShutdownThread->join(); } }); Server.SetIsReadyFunc([&] { std::error_code Ec; m_LockFile.Update(MakeLockData(true), Ec); ReportServiceStatus(ServiceStatus::Running); NotifyReady(); }); Server.Run(); } } // namespace zen