diff options
| author | Stefan Boberg <[email protected]> | 2026-03-12 15:03:03 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-12 15:03:03 +0100 |
| commit | 81bc43aa96f0059cecb28d1bd88338b7d84667f9 (patch) | |
| tree | a3428cb7fddceae0b284d33562af5bf3e64a367e /src/zenserver/proxy/zenproxyserver.cpp | |
| parent | update fmt 12.0.0 -> 12.1.0 (#828) (diff) | |
| download | zen-81bc43aa96f0059cecb28d1bd88338b7d84667f9.tar.xz zen-81bc43aa96f0059cecb28d1bd88338b7d84667f9.zip | |
Transparent proxy mode (#823)
Adds a **transparent TCP proxy mode** to zenserver (activated via `zenserver proxy`), allowing it to sit between clients and upstream Zen servers to inspect and monitor HTTP/1.x traffic in real time. Primarily useful during development, to be able to observe multi-server/client interactions in one place.
- **Dedicated proxy port** -- Proxy mode defaults to port 8118 with its own data directory to avoid collisions with a normal zenserver instance.
- **TCP proxy core** (`src/zenserver/proxy/`) -- A new transparent TCP proxy that forwards connections to upstream targets, with support for both TCP/IP and Unix socket listeners. Multi-threaded I/O for connection handling. Supports Unix domain sockets for both upstream/downstream.
- **HTTP traffic inspection** -- Parses HTTP/1.x request/response streams inline to extract method, path, status, content length, and WebSocket upgrades without breaking the proxied data.
- **Proxy dashboard** -- A web UI showing live connection stats, per-target request counts, active connections, bytes transferred, and client IP/session ID rollups.
- **Server mode display** -- Dashboard banner now shows the running server mode (Zen Proxy, Zen Compute, etc.).
Supporting changes included in this branch:
- **Wildcard log level matching** -- Log levels can now be set per-category using wildcard patterns (e.g. `proxy.*=debug`).
- **`zen down --all`** -- New flag to shut down all running zenserver instances; also used by the new `xmake kill` task.
- Minor test stability fixes (flaky hash collisions, per-thread RNG seeds).
- Support ZEN_MALLOC environment variable for default allocator selection and switch default to rpmalloc
- Fixed sentry-native build to allow LTO on Windows
Diffstat (limited to 'src/zenserver/proxy/zenproxyserver.cpp')
| -rw-r--r-- | src/zenserver/proxy/zenproxyserver.cpp | 517 |
1 files changed, 517 insertions, 0 deletions
diff --git a/src/zenserver/proxy/zenproxyserver.cpp b/src/zenserver/proxy/zenproxyserver.cpp new file mode 100644 index 000000000..1fd9cd2c4 --- /dev/null +++ b/src/zenserver/proxy/zenproxyserver.cpp @@ -0,0 +1,517 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenproxyserver.h" + +#include "frontend/frontend.h" +#include "proxy/httpproxystats.h" + +#include <zenhttp/httpapiservice.h> + +#include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/memory/llm.h> +#include <zencore/scopeguard.h> +#include <zencore/sentryintegration.h> +#include <zencore/string.h> +#include <zencore/thread.h> +#include <zencore/windows.h> +#include <zenutil/service.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cxxopts.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +////////////////////////////////////////////////////////////////////////// +// Configurator + +void +ZenProxyServerConfigurator::AddCliOptions(cxxopts::Options& Options) +{ + Options.add_option("proxy", + "", + "proxy-map", + "Proxy mapping (see documentation for full format)", + cxxopts::value<std::vector<std::string>>(m_RawProxyMappings), + ""); + + Options.parse_positional({"proxy-map"}); + Options.show_positional_help(); +} + +void +ZenProxyServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) +{ + ZEN_UNUSED(Options); +} + +void +ZenProxyServerConfigurator::ApplyOptions(cxxopts::Options& Options) +{ + ZEN_UNUSED(Options); +} + +void +ZenProxyServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions) +{ + ZEN_UNUSED(LuaOptions); +} + +static ProxyMapping +ParseProxyMapping(const std::string& Raw) +{ + // Preferred format using "=" as the listen/target separator: + // listen_spec=target_spec + // where listen_spec is [addr:]port or unix:path + // and target_spec is host:port or unix:path + // + // Examples: + // 9000=127.0.0.1:8558 (TCP -> TCP) + // 10.0.0.1:9000=10.0.0.2:8558 (TCP -> TCP) + // 9000=unix:/tmp/target.sock (TCP -> Unix) + // 9000=unix:C:\Users\foo\zen.sock (TCP -> Unix, Windows path) + // unix:/tmp/listen.sock=localhost:8558 (Unix -> TCP) + // unix:C:\foo\l.sock=unix:C:\foo\t.sock (Unix -> Unix, Windows paths) + // + // Legacy format using colon-only separators (no "=" present): + // [listen_addr:]listen_port:target_host:target_port (TCP -> TCP) + // [listen_addr:]listen_port:unix:target_socket_path (TCP -> Unix) + // unix:listen_socket_path:target_host:target_port (Unix -> TCP, path must not contain colons) + // unix:listen_socket_path:unix:target_socket_path (Unix -> Unix, listen path must not contain colons) + + auto ThrowBadMapping = [&](std::string_view Detail) { + throw OptionParseException(fmt::format("invalid proxy mapping '{}': {}", Raw, Detail), ""); + }; + + auto ParsePort = [&](std::string_view Field) -> uint16_t { + std::optional<uint16_t> Port = ParseInt<uint16_t>(Field); + if (!Port) + { + ThrowBadMapping(fmt::format("'{}' is not a valid port number", Field)); + } + return *Port; + }; + + auto RequireNonEmpty = [&](std::string_view Value, std::string_view Label) { + if (Value.empty()) + { + ThrowBadMapping(fmt::format("empty {}", Label)); + } + }; + + // Parse a listen spec: [addr:]port or unix:path + auto ParseListenSpec = [&](std::string_view Spec, ProxyMapping& Out) { + if (Spec.substr(0, 5) == "unix:") + { + Out.ListenUnixSocket = Spec.substr(5); + RequireNonEmpty(Out.ListenUnixSocket, "listen unix socket path"); + } + else + { + size_t ColonPos = Spec.find(':'); + if (ColonPos == std::string_view::npos) + { + Out.ListenPort = ParsePort(Spec); + } + else + { + Out.ListenAddress = Spec.substr(0, ColonPos); + Out.ListenPort = ParsePort(Spec.substr(ColonPos + 1)); + } + } + }; + + // Parse a target spec: host:port or unix:path + auto ParseTargetSpec = [&](std::string_view Spec, ProxyMapping& Out) { + if (Spec.substr(0, 5) == "unix:") + { + Out.TargetUnixSocket = Spec.substr(5); + RequireNonEmpty(Out.TargetUnixSocket, "target unix socket path"); + } + else + { + size_t ColonPos = Spec.rfind(':'); + if (ColonPos == std::string_view::npos) + { + ThrowBadMapping("target must be host:port or unix:path"); + } + Out.TargetHost = Spec.substr(0, ColonPos); + Out.TargetPort = ParsePort(Spec.substr(ColonPos + 1)); + } + }; + + ProxyMapping Mapping; + + // Check for the "=" separator first. + size_t EqPos = Raw.find('='); + if (EqPos != std::string::npos) + { + std::string_view ListenSpec = std::string_view(Raw).substr(0, EqPos); + std::string_view TargetSpec = std::string_view(Raw).substr(EqPos + 1); + + RequireNonEmpty(ListenSpec, "listen spec"); + RequireNonEmpty(TargetSpec, "target spec"); + + ParseListenSpec(ListenSpec, Mapping); + ParseTargetSpec(TargetSpec, Mapping); + return Mapping; + } + + // Legacy colon-only format. Extract fields left-to-right; when we encounter the + // "unix" keyword, everything after the next colon is the socket path taken verbatim. + // Listen-side unix socket paths must not contain colons in this format. + + auto RequireColon = [&](size_t From) -> size_t { + size_t Pos = Raw.find(':', From); + if (Pos == std::string::npos) + { + ThrowBadMapping("expected [listen_addr:]listen_port:target_host:target_port or use '=' separator"); + } + return Pos; + }; + + size_t Pos1 = RequireColon(0); + std::string Field1 = Raw.substr(0, Pos1); + + size_t Pos2 = RequireColon(Pos1 + 1); + std::string Field2 = Raw.substr(Pos1 + 1, Pos2 - Pos1 - 1); + + // unix:listen_path:... + if (Field1 == "unix") + { + Mapping.ListenUnixSocket = Field2; + RequireNonEmpty(Mapping.ListenUnixSocket, "listen unix socket path"); + + ParseTargetSpec(std::string_view(Raw).substr(Pos2 + 1), Mapping); + return Mapping; + } + + // listen_port:unix:target_socket_path + if (Field2 == "unix") + { + Mapping.ListenPort = ParsePort(Field1); + Mapping.TargetUnixSocket = Raw.substr(Pos2 + 1); + RequireNonEmpty(Mapping.TargetUnixSocket, "target unix socket path"); + return Mapping; + } + + size_t Pos3 = Raw.find(':', Pos2 + 1); + if (Pos3 == std::string::npos) + { + // listen_port:target_host:target_port + Mapping.ListenPort = ParsePort(Field1); + Mapping.TargetHost = Field2; + Mapping.TargetPort = ParsePort(std::string_view(Raw).substr(Pos2 + 1)); + return Mapping; + } + + std::string Field3 = Raw.substr(Pos2 + 1, Pos3 - Pos2 - 1); + + // listen_addr:listen_port:unix:target_socket_path + if (Field3 == "unix") + { + Mapping.ListenAddress = Field1; + Mapping.ListenPort = ParsePort(Field2); + Mapping.TargetUnixSocket = Raw.substr(Pos3 + 1); + RequireNonEmpty(Mapping.TargetUnixSocket, "target unix socket path"); + return Mapping; + } + + // listen_addr:listen_port:target_host:target_port + std::string Field4 = Raw.substr(Pos3 + 1); + if (Field4.find(':') != std::string::npos) + { + ThrowBadMapping("expected [listen_addr:]listen_port:target_host:target_port or use '=' separator"); + } + + Mapping.ListenAddress = Field1; + Mapping.ListenPort = ParsePort(Field2); + Mapping.TargetHost = Field3; + Mapping.TargetPort = ParsePort(Field4); + return Mapping; +} + +void +ZenProxyServerConfigurator::ValidateOptions() +{ + if (m_ServerOptions.BasePort == 0) + { + m_ServerOptions.BasePort = ZenProxyServerConfig::kDefaultProxyPort; + } + + if (m_ServerOptions.DataDir.empty()) + { + std::filesystem::path SystemRoot = m_ServerOptions.SystemRootDir; + if (SystemRoot.empty()) + { + SystemRoot = PickDefaultSystemRootDirectory(); + } + if (!SystemRoot.empty()) + { + m_ServerOptions.DataDir = SystemRoot / "Proxy"; + } + } + + for (const std::string& Raw : m_RawProxyMappings) + { + // The mode keyword "proxy" from argv[1] gets captured as a positional + // argument — skip it. + if (Raw == "proxy") + { + continue; + } + + m_ServerOptions.ProxyMappings.push_back(ParseProxyMapping(Raw)); + } +} + +////////////////////////////////////////////////////////////////////////// +// ZenProxyServer + +ZenProxyServer::ZenProxyServer() +{ +} + +ZenProxyServer::~ZenProxyServer() +{ + Cleanup(); +} + +int +ZenProxyServer::Initialize(const ZenProxyServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry) +{ + ZEN_TRACE_CPU("ZenProxyServer::Initialize"); + ZEN_MEMSCOPE(GetZenserverTag()); + + ZEN_INFO(ZEN_APP_NAME " initializing in PROXY server mode"); + + const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry); + if (EffectiveBasePort < 0) + { + return EffectiveBasePort; + } + + for (const ProxyMapping& Mapping : ServerConfig.ProxyMappings) + { + auto Service = std::make_unique<TcpProxyService>(m_ProxyIoContext, Mapping); + Service->Start(); + m_ProxyServices.push_back(std::move(Service)); + } + + // Keep the io_context alive even when there is no pending work, so that + // worker threads don't exit prematurely between async operations. + m_ProxyIoWorkGuard = std::make_unique<asio::io_context::work>(m_ProxyIoContext); + + // Start proxy I/O worker threads. Use a modest thread count — proxy work is + // I/O-bound so we don't need a thread per core, but having more than one + // avoids head-of-line blocking when many connections are active. + unsigned int ThreadCount = std::max(GetHardwareConcurrency() / 4, 4u); + + for (unsigned int i = 0; i < ThreadCount; ++i) + { + m_ProxyIoThreads.emplace_back([this, i] { + ExtendableStringBuilder<32> ThreadName; + ThreadName << "proxy_io_" << i; + SetCurrentThreadName(ThreadName); + m_ProxyIoContext.run(); + }); + } + + ZEN_INFO("proxy I/O thread pool started with {} threads", ThreadCount); + + m_ApiService = std::make_unique<HttpApiService>(*m_Http); + m_Http->RegisterService(*m_ApiService); + + m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService); + m_Http->RegisterService(*m_FrontendService); + + std::string DefaultRecordDir = (m_DataRoot / "recordings").string(); + m_ProxyStatsService = std::make_unique<HttpProxyStatsService>(m_ProxyServices, m_StatsService, std::move(DefaultRecordDir)); + m_Http->RegisterService(*m_ProxyStatsService); + + EnsureIoRunner(); + + ZenServerBase::Finalize(); + + return EffectiveBasePort; +} + +void +ZenProxyServer::Run() +{ + if (m_ProcessMonitor.IsActive()) + { + CheckOwnerPid(); + } + + if (!m_TestMode) + { + // clang-format off + ZEN_INFO(R"(__________ __________ )" "\n" + R"(\____ /____ ____ \______ \_______ _______ ______.__. )" "\n" + R"( / // __ \ / \ | ___/\_ __ \/ _ \ \/ < | | )" "\n" + R"( / /\ ___/| | \ | | | | \( <_> > < \___ | )" "\n" + R"(/_______ \___ >___| / |____| |__| \____/__/\_ \/ ____| )" "\n" + R"( \/ \/ \/ \/\/ )"); + // clang-format on + + ExtendableStringBuilder<256> BuildOptions; + GetBuildOptions(BuildOptions, '\n'); + ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions); + } + + ZEN_INFO(ZEN_APP_NAME " now running as PROXY (pid: {})", GetCurrentProcessId()); + +#if ZEN_PLATFORM_WINDOWS + if (zen::windows::IsRunningOnWine()) + { + ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well"); + } +#endif + +#if ZEN_USE_SENTRY + ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); + if (m_UseSentry) + { + SentryIntegration::ClearCaches(); + } +#endif + + const bool IsInteractiveMode = IsInteractiveSession(); + + SetNewState(kRunning); + + OnReady(); + + m_Http->Run(IsInteractiveMode); + + SetNewState(kShuttingDown); + + ZEN_INFO(ZEN_APP_NAME " exiting"); +} + +void +ZenProxyServer::Cleanup() +{ + ZEN_TRACE_CPU("ZenProxyServer::Cleanup"); + ZEN_INFO(ZEN_APP_NAME " cleaning up"); + try + { + for (auto& Service : m_ProxyServices) + { + Service->Stop(); + } + + m_ProxyIoWorkGuard.reset(); + m_ProxyIoContext.stop(); + for (auto& Thread : m_ProxyIoThreads) + { + if (Thread.joinable()) + { + Thread.join(); + } + } + m_ProxyIoThreads.clear(); + m_ProxyServices.clear(); + + m_IoContext.stop(); + if (m_IoRunner.joinable()) + { + m_IoRunner.join(); + } + + m_ProxyStatsService.reset(); + m_FrontendService.reset(); + m_ApiService.reset(); + + ShutdownServices(); + if (m_Http) + { + m_Http->Close(); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); + } +} + +////////////////////////////////////////////////////////////////////////// +// ZenProxyServerMain + +ZenProxyServerMain::ZenProxyServerMain(ZenProxyServerConfig& ServerOptions) : ZenServerMain(ServerOptions), m_ServerOptions(ServerOptions) +{ +} + +void +ZenProxyServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) +{ + ZenProxyServer Server; + Server.SetServerMode("Proxy"); + Server.SetDataRoot(m_ServerOptions.DataDir); + Server.SetContentRoot(m_ServerOptions.ContentDir); + Server.SetTestMode(m_ServerOptions.IsTest); + Server.SetDedicatedMode(m_ServerOptions.IsDedicated); + + const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry); + if (EffectiveBasePort == -1) + { + std::exit(1); + } + + Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); + if (EffectiveBasePort != m_ServerOptions.BasePort) + { + ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); + m_ServerOptions.BasePort = EffectiveBasePort; + } + + std::unique_ptr<std::thread> ShutdownThread; + std::unique_ptr<NamedEvent> ShutdownEvent; + + ExtendableStringBuilder<64> ShutdownEventName; + ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown"; + ShutdownEvent.reset(new NamedEvent{ShutdownEventName}); + + ShutdownThread.reset(new std::thread{[&] { + SetCurrentThreadName("shutdown_mon"); + + ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId()); + + if (ShutdownEvent->Wait()) + { + ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId()); + Server.RequestExit(0); + } + else + { + ZEN_INFO("shutdown signal wait() failed"); + } + }}); + + auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] { + ReportServiceStatus(ServiceStatus::Stopping); + + if (ShutdownEvent) + { + ShutdownEvent->Set(); + } + if (ShutdownThread && ShutdownThread->joinable()) + { + ShutdownThread->join(); + } + }); + + Server.SetIsReadyFunc([&] { + std::error_code Ec; + m_LockFile.Update(MakeLockData(true), Ec); + ReportServiceStatus(ServiceStatus::Running); + NotifyReady(); + }); + + Server.Run(); +} + +} // namespace zen |