aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/proxy/zenproxyserver.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-12 15:03:03 +0100
committerGitHub Enterprise <[email protected]>2026-03-12 15:03:03 +0100
commit81bc43aa96f0059cecb28d1bd88338b7d84667f9 (patch)
treea3428cb7fddceae0b284d33562af5bf3e64a367e /src/zenserver/proxy/zenproxyserver.cpp
parentupdate fmt 12.0.0 -> 12.1.0 (#828) (diff)
downloadzen-81bc43aa96f0059cecb28d1bd88338b7d84667f9.tar.xz
zen-81bc43aa96f0059cecb28d1bd88338b7d84667f9.zip
Transparent proxy mode (#823)
Adds a **transparent TCP proxy mode** to zenserver (activated via `zenserver proxy`), allowing it to sit between clients and upstream Zen servers to inspect and monitor HTTP/1.x traffic in real time. Primarily useful during development, to be able to observe multi-server/client interactions in one place. - **Dedicated proxy port** -- Proxy mode defaults to port 8118 with its own data directory to avoid collisions with a normal zenserver instance. - **TCP proxy core** (`src/zenserver/proxy/`) -- A new transparent TCP proxy that forwards connections to upstream targets, with support for both TCP/IP and Unix socket listeners. Multi-threaded I/O for connection handling. Supports Unix domain sockets for both upstream/downstream. - **HTTP traffic inspection** -- Parses HTTP/1.x request/response streams inline to extract method, path, status, content length, and WebSocket upgrades without breaking the proxied data. - **Proxy dashboard** -- A web UI showing live connection stats, per-target request counts, active connections, bytes transferred, and client IP/session ID rollups. - **Server mode display** -- Dashboard banner now shows the running server mode (Zen Proxy, Zen Compute, etc.). Supporting changes included in this branch: - **Wildcard log level matching** -- Log levels can now be set per-category using wildcard patterns (e.g. `proxy.*=debug`). - **`zen down --all`** -- New flag to shut down all running zenserver instances; also used by the new `xmake kill` task. - Minor test stability fixes (flaky hash collisions, per-thread RNG seeds). - Support ZEN_MALLOC environment variable for default allocator selection and switch default to rpmalloc - Fixed sentry-native build to allow LTO on Windows
Diffstat (limited to 'src/zenserver/proxy/zenproxyserver.cpp')
-rw-r--r--src/zenserver/proxy/zenproxyserver.cpp517
1 files changed, 517 insertions, 0 deletions
diff --git a/src/zenserver/proxy/zenproxyserver.cpp b/src/zenserver/proxy/zenproxyserver.cpp
new file mode 100644
index 000000000..1fd9cd2c4
--- /dev/null
+++ b/src/zenserver/proxy/zenproxyserver.cpp
@@ -0,0 +1,517 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zenproxyserver.h"
+
+#include "frontend/frontend.h"
+#include "proxy/httpproxystats.h"
+
+#include <zenhttp/httpapiservice.h>
+
+#include <zencore/except.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/memory/llm.h>
+#include <zencore/scopeguard.h>
+#include <zencore/sentryintegration.h>
+#include <zencore/string.h>
+#include <zencore/thread.h>
+#include <zencore/windows.h>
+#include <zenutil/service.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <cxxopts.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+// Configurator
+
+void
+ZenProxyServerConfigurator::AddCliOptions(cxxopts::Options& Options)
+{
+ Options.add_option("proxy",
+ "",
+ "proxy-map",
+ "Proxy mapping (see documentation for full format)",
+ cxxopts::value<std::vector<std::string>>(m_RawProxyMappings),
+ "");
+
+ Options.parse_positional({"proxy-map"});
+ Options.show_positional_help();
+}
+
+void
+ZenProxyServerConfigurator::AddConfigOptions(LuaConfig::Options& Options)
+{
+ ZEN_UNUSED(Options);
+}
+
+void
+ZenProxyServerConfigurator::ApplyOptions(cxxopts::Options& Options)
+{
+ ZEN_UNUSED(Options);
+}
+
+void
+ZenProxyServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions)
+{
+ ZEN_UNUSED(LuaOptions);
+}
+
+static ProxyMapping
+ParseProxyMapping(const std::string& Raw)
+{
+ // Preferred format using "=" as the listen/target separator:
+ // listen_spec=target_spec
+ // where listen_spec is [addr:]port or unix:path
+ // and target_spec is host:port or unix:path
+ //
+ // Examples:
+ // 9000=127.0.0.1:8558 (TCP -> TCP)
+ // 10.0.0.1:9000=10.0.0.2:8558 (TCP -> TCP)
+ // 9000=unix:/tmp/target.sock (TCP -> Unix)
+ // 9000=unix:C:\Users\foo\zen.sock (TCP -> Unix, Windows path)
+ // unix:/tmp/listen.sock=localhost:8558 (Unix -> TCP)
+ // unix:C:\foo\l.sock=unix:C:\foo\t.sock (Unix -> Unix, Windows paths)
+ //
+ // Legacy format using colon-only separators (no "=" present):
+ // [listen_addr:]listen_port:target_host:target_port (TCP -> TCP)
+ // [listen_addr:]listen_port:unix:target_socket_path (TCP -> Unix)
+ // unix:listen_socket_path:target_host:target_port (Unix -> TCP, path must not contain colons)
+ // unix:listen_socket_path:unix:target_socket_path (Unix -> Unix, listen path must not contain colons)
+
+ auto ThrowBadMapping = [&](std::string_view Detail) {
+ throw OptionParseException(fmt::format("invalid proxy mapping '{}': {}", Raw, Detail), "");
+ };
+
+ auto ParsePort = [&](std::string_view Field) -> uint16_t {
+ std::optional<uint16_t> Port = ParseInt<uint16_t>(Field);
+ if (!Port)
+ {
+ ThrowBadMapping(fmt::format("'{}' is not a valid port number", Field));
+ }
+ return *Port;
+ };
+
+ auto RequireNonEmpty = [&](std::string_view Value, std::string_view Label) {
+ if (Value.empty())
+ {
+ ThrowBadMapping(fmt::format("empty {}", Label));
+ }
+ };
+
+ // Parse a listen spec: [addr:]port or unix:path
+ auto ParseListenSpec = [&](std::string_view Spec, ProxyMapping& Out) {
+ if (Spec.substr(0, 5) == "unix:")
+ {
+ Out.ListenUnixSocket = Spec.substr(5);
+ RequireNonEmpty(Out.ListenUnixSocket, "listen unix socket path");
+ }
+ else
+ {
+ size_t ColonPos = Spec.find(':');
+ if (ColonPos == std::string_view::npos)
+ {
+ Out.ListenPort = ParsePort(Spec);
+ }
+ else
+ {
+ Out.ListenAddress = Spec.substr(0, ColonPos);
+ Out.ListenPort = ParsePort(Spec.substr(ColonPos + 1));
+ }
+ }
+ };
+
+ // Parse a target spec: host:port or unix:path
+ auto ParseTargetSpec = [&](std::string_view Spec, ProxyMapping& Out) {
+ if (Spec.substr(0, 5) == "unix:")
+ {
+ Out.TargetUnixSocket = Spec.substr(5);
+ RequireNonEmpty(Out.TargetUnixSocket, "target unix socket path");
+ }
+ else
+ {
+ size_t ColonPos = Spec.rfind(':');
+ if (ColonPos == std::string_view::npos)
+ {
+ ThrowBadMapping("target must be host:port or unix:path");
+ }
+ Out.TargetHost = Spec.substr(0, ColonPos);
+ Out.TargetPort = ParsePort(Spec.substr(ColonPos + 1));
+ }
+ };
+
+ ProxyMapping Mapping;
+
+ // Check for the "=" separator first.
+ size_t EqPos = Raw.find('=');
+ if (EqPos != std::string::npos)
+ {
+ std::string_view ListenSpec = std::string_view(Raw).substr(0, EqPos);
+ std::string_view TargetSpec = std::string_view(Raw).substr(EqPos + 1);
+
+ RequireNonEmpty(ListenSpec, "listen spec");
+ RequireNonEmpty(TargetSpec, "target spec");
+
+ ParseListenSpec(ListenSpec, Mapping);
+ ParseTargetSpec(TargetSpec, Mapping);
+ return Mapping;
+ }
+
+ // Legacy colon-only format. Extract fields left-to-right; when we encounter the
+ // "unix" keyword, everything after the next colon is the socket path taken verbatim.
+ // Listen-side unix socket paths must not contain colons in this format.
+
+ auto RequireColon = [&](size_t From) -> size_t {
+ size_t Pos = Raw.find(':', From);
+ if (Pos == std::string::npos)
+ {
+ ThrowBadMapping("expected [listen_addr:]listen_port:target_host:target_port or use '=' separator");
+ }
+ return Pos;
+ };
+
+ size_t Pos1 = RequireColon(0);
+ std::string Field1 = Raw.substr(0, Pos1);
+
+ size_t Pos2 = RequireColon(Pos1 + 1);
+ std::string Field2 = Raw.substr(Pos1 + 1, Pos2 - Pos1 - 1);
+
+ // unix:listen_path:...
+ if (Field1 == "unix")
+ {
+ Mapping.ListenUnixSocket = Field2;
+ RequireNonEmpty(Mapping.ListenUnixSocket, "listen unix socket path");
+
+ ParseTargetSpec(std::string_view(Raw).substr(Pos2 + 1), Mapping);
+ return Mapping;
+ }
+
+ // listen_port:unix:target_socket_path
+ if (Field2 == "unix")
+ {
+ Mapping.ListenPort = ParsePort(Field1);
+ Mapping.TargetUnixSocket = Raw.substr(Pos2 + 1);
+ RequireNonEmpty(Mapping.TargetUnixSocket, "target unix socket path");
+ return Mapping;
+ }
+
+ size_t Pos3 = Raw.find(':', Pos2 + 1);
+ if (Pos3 == std::string::npos)
+ {
+ // listen_port:target_host:target_port
+ Mapping.ListenPort = ParsePort(Field1);
+ Mapping.TargetHost = Field2;
+ Mapping.TargetPort = ParsePort(std::string_view(Raw).substr(Pos2 + 1));
+ return Mapping;
+ }
+
+ std::string Field3 = Raw.substr(Pos2 + 1, Pos3 - Pos2 - 1);
+
+ // listen_addr:listen_port:unix:target_socket_path
+ if (Field3 == "unix")
+ {
+ Mapping.ListenAddress = Field1;
+ Mapping.ListenPort = ParsePort(Field2);
+ Mapping.TargetUnixSocket = Raw.substr(Pos3 + 1);
+ RequireNonEmpty(Mapping.TargetUnixSocket, "target unix socket path");
+ return Mapping;
+ }
+
+ // listen_addr:listen_port:target_host:target_port
+ std::string Field4 = Raw.substr(Pos3 + 1);
+ if (Field4.find(':') != std::string::npos)
+ {
+ ThrowBadMapping("expected [listen_addr:]listen_port:target_host:target_port or use '=' separator");
+ }
+
+ Mapping.ListenAddress = Field1;
+ Mapping.ListenPort = ParsePort(Field2);
+ Mapping.TargetHost = Field3;
+ Mapping.TargetPort = ParsePort(Field4);
+ return Mapping;
+}
+
+void
+ZenProxyServerConfigurator::ValidateOptions()
+{
+ if (m_ServerOptions.BasePort == 0)
+ {
+ m_ServerOptions.BasePort = ZenProxyServerConfig::kDefaultProxyPort;
+ }
+
+ if (m_ServerOptions.DataDir.empty())
+ {
+ std::filesystem::path SystemRoot = m_ServerOptions.SystemRootDir;
+ if (SystemRoot.empty())
+ {
+ SystemRoot = PickDefaultSystemRootDirectory();
+ }
+ if (!SystemRoot.empty())
+ {
+ m_ServerOptions.DataDir = SystemRoot / "Proxy";
+ }
+ }
+
+ for (const std::string& Raw : m_RawProxyMappings)
+ {
+ // The mode keyword "proxy" from argv[1] gets captured as a positional
+ // argument — skip it.
+ if (Raw == "proxy")
+ {
+ continue;
+ }
+
+ m_ServerOptions.ProxyMappings.push_back(ParseProxyMapping(Raw));
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+// ZenProxyServer
+
+ZenProxyServer::ZenProxyServer()
+{
+}
+
+ZenProxyServer::~ZenProxyServer()
+{
+ Cleanup();
+}
+
+int
+ZenProxyServer::Initialize(const ZenProxyServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry)
+{
+ ZEN_TRACE_CPU("ZenProxyServer::Initialize");
+ ZEN_MEMSCOPE(GetZenserverTag());
+
+ ZEN_INFO(ZEN_APP_NAME " initializing in PROXY server mode");
+
+ const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry);
+ if (EffectiveBasePort < 0)
+ {
+ return EffectiveBasePort;
+ }
+
+ for (const ProxyMapping& Mapping : ServerConfig.ProxyMappings)
+ {
+ auto Service = std::make_unique<TcpProxyService>(m_ProxyIoContext, Mapping);
+ Service->Start();
+ m_ProxyServices.push_back(std::move(Service));
+ }
+
+ // Keep the io_context alive even when there is no pending work, so that
+ // worker threads don't exit prematurely between async operations.
+ m_ProxyIoWorkGuard = std::make_unique<asio::io_context::work>(m_ProxyIoContext);
+
+ // Start proxy I/O worker threads. Use a modest thread count — proxy work is
+ // I/O-bound so we don't need a thread per core, but having more than one
+ // avoids head-of-line blocking when many connections are active.
+ unsigned int ThreadCount = std::max(GetHardwareConcurrency() / 4, 4u);
+
+ for (unsigned int i = 0; i < ThreadCount; ++i)
+ {
+ m_ProxyIoThreads.emplace_back([this, i] {
+ ExtendableStringBuilder<32> ThreadName;
+ ThreadName << "proxy_io_" << i;
+ SetCurrentThreadName(ThreadName);
+ m_ProxyIoContext.run();
+ });
+ }
+
+ ZEN_INFO("proxy I/O thread pool started with {} threads", ThreadCount);
+
+ m_ApiService = std::make_unique<HttpApiService>(*m_Http);
+ m_Http->RegisterService(*m_ApiService);
+
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService);
+ m_Http->RegisterService(*m_FrontendService);
+
+ std::string DefaultRecordDir = (m_DataRoot / "recordings").string();
+ m_ProxyStatsService = std::make_unique<HttpProxyStatsService>(m_ProxyServices, m_StatsService, std::move(DefaultRecordDir));
+ m_Http->RegisterService(*m_ProxyStatsService);
+
+ EnsureIoRunner();
+
+ ZenServerBase::Finalize();
+
+ return EffectiveBasePort;
+}
+
+void
+ZenProxyServer::Run()
+{
+ if (m_ProcessMonitor.IsActive())
+ {
+ CheckOwnerPid();
+ }
+
+ if (!m_TestMode)
+ {
+ // clang-format off
+ ZEN_INFO(R"(__________ __________ )" "\n"
+ R"(\____ /____ ____ \______ \_______ _______ ______.__. )" "\n"
+ R"( / // __ \ / \ | ___/\_ __ \/ _ \ \/ < | | )" "\n"
+ R"( / /\ ___/| | \ | | | | \( <_> > < \___ | )" "\n"
+ R"(/_______ \___ >___| / |____| |__| \____/__/\_ \/ ____| )" "\n"
+ R"( \/ \/ \/ \/\/ )");
+ // clang-format on
+
+ ExtendableStringBuilder<256> BuildOptions;
+ GetBuildOptions(BuildOptions, '\n');
+ ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions);
+ }
+
+ ZEN_INFO(ZEN_APP_NAME " now running as PROXY (pid: {})", GetCurrentProcessId());
+
+#if ZEN_PLATFORM_WINDOWS
+ if (zen::windows::IsRunningOnWine())
+ {
+ ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well");
+ }
+#endif
+
+#if ZEN_USE_SENTRY
+ ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED");
+ if (m_UseSentry)
+ {
+ SentryIntegration::ClearCaches();
+ }
+#endif
+
+ const bool IsInteractiveMode = IsInteractiveSession();
+
+ SetNewState(kRunning);
+
+ OnReady();
+
+ m_Http->Run(IsInteractiveMode);
+
+ SetNewState(kShuttingDown);
+
+ ZEN_INFO(ZEN_APP_NAME " exiting");
+}
+
+void
+ZenProxyServer::Cleanup()
+{
+ ZEN_TRACE_CPU("ZenProxyServer::Cleanup");
+ ZEN_INFO(ZEN_APP_NAME " cleaning up");
+ try
+ {
+ for (auto& Service : m_ProxyServices)
+ {
+ Service->Stop();
+ }
+
+ m_ProxyIoWorkGuard.reset();
+ m_ProxyIoContext.stop();
+ for (auto& Thread : m_ProxyIoThreads)
+ {
+ if (Thread.joinable())
+ {
+ Thread.join();
+ }
+ }
+ m_ProxyIoThreads.clear();
+ m_ProxyServices.clear();
+
+ m_IoContext.stop();
+ if (m_IoRunner.joinable())
+ {
+ m_IoRunner.join();
+ }
+
+ m_ProxyStatsService.reset();
+ m_FrontendService.reset();
+ m_ApiService.reset();
+
+ ShutdownServices();
+ if (m_Http)
+ {
+ m_Http->Close();
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what());
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+// ZenProxyServerMain
+
+ZenProxyServerMain::ZenProxyServerMain(ZenProxyServerConfig& ServerOptions) : ZenServerMain(ServerOptions), m_ServerOptions(ServerOptions)
+{
+}
+
+void
+ZenProxyServerMain::DoRun(ZenServerState::ZenServerEntry* Entry)
+{
+ ZenProxyServer Server;
+ Server.SetServerMode("Proxy");
+ Server.SetDataRoot(m_ServerOptions.DataDir);
+ Server.SetContentRoot(m_ServerOptions.ContentDir);
+ Server.SetTestMode(m_ServerOptions.IsTest);
+ Server.SetDedicatedMode(m_ServerOptions.IsDedicated);
+
+ const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry);
+ if (EffectiveBasePort == -1)
+ {
+ std::exit(1);
+ }
+
+ Entry->EffectiveListenPort = uint16_t(EffectiveBasePort);
+ if (EffectiveBasePort != m_ServerOptions.BasePort)
+ {
+ ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort);
+ m_ServerOptions.BasePort = EffectiveBasePort;
+ }
+
+ std::unique_ptr<std::thread> ShutdownThread;
+ std::unique_ptr<NamedEvent> ShutdownEvent;
+
+ ExtendableStringBuilder<64> ShutdownEventName;
+ ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown";
+ ShutdownEvent.reset(new NamedEvent{ShutdownEventName});
+
+ ShutdownThread.reset(new std::thread{[&] {
+ SetCurrentThreadName("shutdown_mon");
+
+ ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId());
+
+ if (ShutdownEvent->Wait())
+ {
+ ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId());
+ Server.RequestExit(0);
+ }
+ else
+ {
+ ZEN_INFO("shutdown signal wait() failed");
+ }
+ }});
+
+ auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] {
+ ReportServiceStatus(ServiceStatus::Stopping);
+
+ if (ShutdownEvent)
+ {
+ ShutdownEvent->Set();
+ }
+ if (ShutdownThread && ShutdownThread->joinable())
+ {
+ ShutdownThread->join();
+ }
+ });
+
+ Server.SetIsReadyFunc([&] {
+ std::error_code Ec;
+ m_LockFile.Update(MakeLockData(true), Ec);
+ ReportServiceStatus(ServiceStatus::Running);
+ NotifyReady();
+ });
+
+ Server.Run();
+}
+
+} // namespace zen