diff options
| author | Stefan Boberg <[email protected]> | 2026-03-12 15:03:03 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-12 15:03:03 +0100 |
| commit | 81bc43aa96f0059cecb28d1bd88338b7d84667f9 (patch) | |
| tree | a3428cb7fddceae0b284d33562af5bf3e64a367e /src/zenserver | |
| parent | update fmt 12.0.0 -> 12.1.0 (#828) (diff) | |
| download | zen-81bc43aa96f0059cecb28d1bd88338b7d84667f9.tar.xz zen-81bc43aa96f0059cecb28d1bd88338b7d84667f9.zip | |
Transparent proxy mode (#823)
Adds a **transparent TCP proxy mode** to zenserver (activated via `zenserver proxy`), allowing it to sit between clients and upstream Zen servers to inspect and monitor HTTP/1.x traffic in real time. Primarily useful during development, to be able to observe multi-server/client interactions in one place.
- **Dedicated proxy port** -- Proxy mode defaults to port 8118 with its own data directory to avoid collisions with a normal zenserver instance.
- **TCP proxy core** (`src/zenserver/proxy/`) -- A new transparent TCP proxy that forwards connections to upstream targets, with support for both TCP/IP and Unix socket listeners. Multi-threaded I/O for connection handling. Supports Unix domain sockets for both upstream/downstream.
- **HTTP traffic inspection** -- Parses HTTP/1.x request/response streams inline to extract method, path, status, content length, and WebSocket upgrades without breaking the proxied data.
- **Proxy dashboard** -- A web UI showing live connection stats, per-target request counts, active connections, bytes transferred, and client IP/session ID rollups.
- **Server mode display** -- Dashboard banner now shows the running server mode (Zen Proxy, Zen Compute, etc.).
Supporting changes included in this branch:
- **Wildcard log level matching** -- Log levels can now be set per-category using wildcard patterns (e.g. `proxy.*=debug`).
- **`zen down --all`** -- New flag to shut down all running zenserver instances; also used by the new `xmake kill` task.
- Minor test stability fixes (flaky hash collisions, per-thread RNG seeds).
- Support ZEN_MALLOC environment variable for default allocator selection and switch default to rpmalloc
- Fixed sentry-native build to allow LTO on Windows
Diffstat (limited to 'src/zenserver')
23 files changed, 2807 insertions, 26 deletions
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp index 2ac3de599..a02ca7be3 100644 --- a/src/zenserver/compute/computeserver.cpp +++ b/src/zenserver/compute/computeserver.cpp @@ -946,6 +946,7 @@ ZenComputeServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) ZEN_TRACE_CPU("ZenComputeServerMain::DoRun"); ZenComputeServer Server; + Server.SetServerMode("Compute"); Server.SetDataRoot(m_ServerOptions.DataDir); Server.SetContentRoot(m_ServerOptions.ContentDir); Server.SetTestMode(m_ServerOptions.IsTest); diff --git a/src/zenserver/config/config.cpp b/src/zenserver/config/config.cpp index ef9c6b7b8..858225032 100644 --- a/src/zenserver/config/config.cpp +++ b/src/zenserver/config/config.cpp @@ -18,6 +18,9 @@ #include <zencore/string.h> #include <zenutil/config/commandlineoptions.h> #include <zenutil/config/environmentoptions.h> +#include <zenutil/consoletui.h> + +#include <charconv> ZEN_THIRD_PARTY_INCLUDES_START #include <fmt/format.h> @@ -197,6 +200,7 @@ struct ZenServerCmdLineOptions std::string DataDir; std::string BaseSnapshotDir; std::string SecurityConfigPath; + std::string PortStr; ZenLoggingCmdLineOptions LoggingOptions; @@ -301,7 +305,7 @@ ZenServerCmdLineOptions::AddCliOptions(cxxopts::Options& options, ZenServerConfi "p", "port", "Select HTTP port", - cxxopts::value<int>(ServerOptions.BasePort)->default_value("8558"), + cxxopts::value<std::string>(PortStr)->default_value("auto"), "<port number>"); options.add_option("network", @@ -468,6 +472,18 @@ ZenServerCmdLineOptions::ApplyOptions(cxxopts::Options& options, ZenServerConfig ServerOptions.BaseSnapshotDir = MakeSafeAbsolutePath(BaseSnapshotDir); ServerOptions.SecurityConfigPath = MakeSafeAbsolutePath(SecurityConfigPath); + if (PortStr != "auto") + { + int Port = 0; + auto [Ptr, Ec] = std::from_chars(PortStr.data(), PortStr.data() + PortStr.size(), Port); + if (Ec != std::errc{} || Ptr != PortStr.data() + PortStr.size() || Port <= 0 || Port > 65535) + { + throw OptionParseException(fmt::format("invalid port '{}': expected 'auto' or a number between 1 and 65535", PortStr), + options.help()); + } + ServerOptions.BasePort = Port; + } + LoggingOptions.ApplyOptions(ServerOptions.LoggingConfig); #if ZEN_WITH_HTTPSYS @@ -545,6 +561,7 @@ ZenServerConfiguratorBase::Configure(int argc, char* argv[]) } cxxopts::Options options("zenserver", "Zen Storage Server"); + options.set_width(TuiConsoleColumns(80)); ZenServerCmdLineOptions BaseOptions; BaseOptions.AddCliOptions(options, m_ServerOptions); @@ -609,6 +626,14 @@ ZenServerConfiguratorBase::Configure(int argc, char* argv[]) } ValidateOptions(); // subclass validation + + // Resolve auto port: subclass ValidateOptions may have set a + // mode-specific default; if BasePort is still 0, fall back to the + // global default. + if (m_ServerOptions.BasePort == 0) + { + m_ServerOptions.BasePort = ZenServerConfig::kDefaultBasePort; + } } catch (const OptionParseException& e) { diff --git a/src/zenserver/config/config.h b/src/zenserver/config/config.h index 88226f810..e481c7225 100644 --- a/src/zenserver/config/config.h +++ b/src/zenserver/config/config.h @@ -50,18 +50,19 @@ struct ZenServerConfig ZenSentryConfig SentryConfig; ZenStatsConfig StatsConfig; ZenLoggingConfig LoggingConfig; - int BasePort = 8558; // Service listen port (used for both UDP and TCP) - int OwnerPid = 0; // Parent process id (zero for standalone) - bool IsDebug = false; - bool IsCleanStart = false; // Indicates whether all state should be wiped on startup or not - bool IsPowerCycle = false; // When true, the process shuts down immediately after initialization - bool IsTest = false; - bool Detach = true; // Whether zenserver should detach from existing process group (Mac/Linux) - int CoreLimit = 0; // If set, hardware concurrency queries are capped at this number - int LieCpu = 0; - bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements - bool ShouldCrash = false; // Option for testing crash handling - bool IsFirstRun = false; + static constexpr int kDefaultBasePort = 8558; + int BasePort = 0; // Service listen port; 0 = auto (resolved per mode) + int OwnerPid = 0; // Parent process id (zero for standalone) + bool IsDebug = false; + bool IsCleanStart = false; // Indicates whether all state should be wiped on startup or not + bool IsPowerCycle = false; // When true, the process shuts down immediately after initialization + bool IsTest = false; + bool Detach = true; // Whether zenserver should detach from existing process group (Mac/Linux) + int CoreLimit = 0; // If set, hardware concurrency queries are capped at this number + int LieCpu = 0; + bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements + bool ShouldCrash = false; // Option for testing crash handling + bool IsFirstRun = false; std::filesystem::path ConfigFile; // Path to Lua config file std::filesystem::path SystemRootDir; // System root directory (used for machine level config) std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) diff --git a/src/zenserver/frontend/html.zip b/src/zenserver/frontend/html.zip Binary files differindex 84472ff08..58778a592 100644 --- a/src/zenserver/frontend/html.zip +++ b/src/zenserver/frontend/html.zip diff --git a/src/zenserver/frontend/html/pages/page.js b/src/zenserver/frontend/html/pages/page.js index dd8032c28..89a86a044 100644 --- a/src/zenserver/frontend/html/pages/page.js +++ b/src/zenserver/frontend/html/pages/page.js @@ -71,7 +71,7 @@ export class ZenPage extends PageBase add_branding(parent) { var banner = parent.tag("zen-banner"); - banner.attr("subtitle", "SERVER"); + banner.attr("subtitle", "Server"); banner.attr("tagline", "Local Storage Service"); banner.attr("logo-src", "favicon.ico"); banner.attr("load", "0"); @@ -80,6 +80,13 @@ export class ZenPage extends PageBase this._poll_status(); } + static _mode_taglines = { + "Server": "Local Storage Service", + "Proxy": "Proxy Service", + "Compute": "Compute Service", + "Hub": "Hub Service", + }; + async _poll_status() { try @@ -89,10 +96,18 @@ export class ZenPage extends PageBase { var obj = cbo.as_object(); - var hostname = obj.find("hostname"); - if (hostname) + var mode = obj.find("serverMode"); + if (mode) { - this._banner.attr("tagline", "Local Storage Service \u2014 " + hostname.as_value()); + var modeStr = mode.as_value(); + this._banner.attr("subtitle", modeStr); + var tagline = ZenPage._mode_taglines[modeStr] || modeStr; + + var hostname = obj.find("hostname"); + if (hostname) + tagline += " \u2014 " + hostname.as_value(); + + this._banner.attr("tagline", tagline); } var cpu = obj.find("cpuUsagePercent"); @@ -115,16 +130,17 @@ export class ZenPage extends PageBase // which links to show based on the services that are currently registered. const service_dashboards = [ - { base_uri: "/compute/", label: "Compute", href: "/dashboard/?page=compute" }, - { base_uri: "/orch/", label: "Orchestrator", href: "/dashboard/?page=orchestrator" }, - { base_uri: "/hub/", label: "Hub", href: "/dashboard/?page=hub" }, + { base_uri: "/sessions/", label: "Sessions", href: "/dashboard/?page=sessions" }, + { base_uri: "/z$/", label: "Cache", href: "/dashboard/?page=cache" }, + { base_uri: "/prj/", label: "Projects", href: "/dashboard/?page=projects" }, + { base_uri: "/compute/", label: "Compute", href: "/dashboard/?page=compute" }, + { base_uri: "/orch/", label: "Orchestrator", href: "/dashboard/?page=orchestrator" }, + { base_uri: "/hub/", label: "Hub", href: "/dashboard/?page=hub" }, + { base_uri: "/proxy/", label: "Proxy", href: "/dashboard/?page=proxy" }, ]; nav.tag("a").text("Home").attr("href", "/dashboard/"); - nav.tag("a").text("Sessions").attr("href", "/dashboard/?page=sessions"); - nav.tag("a").text("Cache").attr("href", "/dashboard/?page=cache"); - nav.tag("a").text("Projects").attr("href", "/dashboard/?page=projects"); this._info_link = nav.tag("a").text("Info").attr("href", "/dashboard/?page=info"); new Fetcher().resource("/api/").json().then((data) => { diff --git a/src/zenserver/frontend/html/pages/proxy.js b/src/zenserver/frontend/html/pages/proxy.js new file mode 100644 index 000000000..50e1f255a --- /dev/null +++ b/src/zenserver/frontend/html/pages/proxy.js @@ -0,0 +1,452 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +"use strict"; + +import { ZenPage } from "./page.js" +import { Fetcher } from "../util/fetcher.js" +import { Friendly } from "../util/friendly.js" +import { Table } from "../util/widgets.js" + +//////////////////////////////////////////////////////////////////////////////// +export class Page extends ZenPage +{ + generate_crumbs() {} + + async main() + { + this.set_title("proxy"); + + // Recording + const record_section = this.add_section("Recording"); + this._record_host = record_section; + this._init_record_controls(record_section); + + // Summary + const summary_section = this.add_section("Summary"); + this._summary_grid = summary_section.tag().classify("grid").classify("stats-tiles"); + + // Mappings + const mappings_section = this.add_section("Proxy Mappings"); + this._mappings_host = mappings_section; + this._mappings_table = null; + + // Active Connections + const connections_section = this.add_section("Active Connections"); + this._connections_host = connections_section; + this._connections_table = null; + + try { this._view_mode = localStorage.getItem("zen-proxy-view-mode") || "per-conn"; } catch (e) { this._view_mode = "per-conn"; } + this._init_view_tabs(connections_section); + + await this._update(); + this._connect_stats_ws(); + } + + async _update() + { + try + { + const data = await new Fetcher().resource("/proxy/stats").json(); + this._render_summary(data); + this._render_mappings(data); + this._render_connections(data); + } + catch (e) { /* service unavailable */ } + } + + _connect_stats_ws() + { + try + { + const proto = location.protocol === "https:" ? "wss:" : "ws:"; + const ws = new WebSocket(`${proto}//${location.host}/stats`); + + try { this._ws_paused = localStorage.getItem("zen-ws-paused") === "true"; } catch (e) { this._ws_paused = false; } + document.addEventListener("zen-ws-toggle", (e) => { + this._ws_paused = e.detail.paused; + }); + + ws.onmessage = (ev) => { + if (this._ws_paused) + { + return; + } + try + { + const all_stats = JSON.parse(ev.data); + const data = all_stats["proxy"]; + if (data) + { + this._render_summary(data); + this._render_mappings(data); + this._render_connections(data); + } + } + catch (e) { /* ignore parse errors */ } + }; + + ws.onclose = () => { this._stats_ws = null; }; + ws.onerror = () => { ws.close(); }; + + this._stats_ws = ws; + } + catch (e) { /* WebSocket not available */ } + } + + _init_record_controls(host) + { + const container = host.tag().classify("card"); + container.inner().style.display = "flex"; + container.inner().style.alignItems = "center"; + container.inner().style.gap = "12px"; + container.inner().style.padding = "12px 16px"; + + this._record_btn = document.createElement("button"); + this._record_btn.className = "history-tab"; + this._record_btn.textContent = "Start Recording"; + this._record_btn.addEventListener("click", () => this._toggle_recording()); + container.inner().appendChild(this._record_btn); + + this._record_status = document.createElement("span"); + this._record_status.style.fontSize = "0.85em"; + this._record_status.style.opacity = "0.7"; + this._record_status.textContent = "Off"; + container.inner().appendChild(this._record_status); + + this._recording = false; + } + + _update_record_ui(data) + { + const recording = !!data.recording; + this._recording = recording; + + this._record_btn.textContent = recording ? "Stop Recording" : "Start Recording"; + this._record_btn.classList.toggle("active", recording); + + const dir = data.recordDir || ""; + this._record_status.textContent = recording ? "Recording to: " + dir : "Off"; + } + + async _toggle_recording() + { + try + { + const endpoint = this._recording ? "/proxy/record/stop" : "/proxy/record/start"; + await fetch(endpoint, { method: "POST" }); + } + catch (e) { /* ignore */ } + } + + _render_summary(data) + { + this._update_record_ui(data); + + const grid = this._summary_grid; + grid.inner().innerHTML = ""; + + const mappings = data.mappings || []; + let totalActive = 0; + let totalPeak = 0; + let totalConn = 0; + let totalBytes = 0; + let totalRequestRate1 = 0; + let totalByteRate1 = 0; + let totalByteRate5 = 0; + + for (const m of mappings) + { + totalActive += (m.activeConnections || 0); + totalPeak += (m.peakActiveConnections || 0); + totalConn += (m.totalConnections || 0); + totalBytes += (m.bytesFromClient || 0) + (m.bytesToClient || 0); + totalRequestRate1 += (m.requestRate1 || 0); + totalByteRate1 += (m.byteRate1 || 0); + totalByteRate5 += (m.byteRate5 || 0); + } + + { + const tile = grid.tag().classify("card").classify("stats-tile"); + tile.tag().classify("card-title").text("Connections"); + const body = tile.tag().classify("tile-metrics"); + this._metric(body, Friendly.sep(totalActive), "currently open", true); + this._metric(body, Friendly.sep(totalPeak), "peak"); + this._metric(body, Friendly.sep(totalConn), "total since startup"); + } + + { + const tile = grid.tag().classify("card").classify("stats-tile"); + tile.tag().classify("card-title").text("Throughput"); + const body = tile.tag().classify("tile-metrics"); + this._metric(body, Friendly.sep(totalRequestRate1, 1) + "/s", "req/sec (1m)", true); + this._metric(body, Friendly.bytes(totalByteRate1) + "/s", "bandwidth (1m)", true); + this._metric(body, Friendly.bytes(totalByteRate5) + "/s", "bandwidth (5m)"); + this._metric(body, Friendly.bytes(totalBytes), "total transferred"); + } + } + + _render_mappings(data) + { + const mappings = data.mappings || []; + + if (this._mappings_table) + { + this._mappings_table.clear(); + } + else + { + this._mappings_table = this._mappings_host.add_widget( + Table, + ["listen", "target", "active", "peak", "total", "from client", "to client"], + Table.Flag_FitLeft|Table.Flag_PackRight|Table.Flag_AlignNumeric, -1 + ); + } + + for (const m of mappings) + { + this._mappings_table.add_row( + m.listen || "", + m.target || "", + Friendly.sep(m.activeConnections || 0), + Friendly.sep(m.peakActiveConnections || 0), + Friendly.sep(m.totalConnections || 0), + Friendly.bytes(m.bytesFromClient || 0), + Friendly.bytes(m.bytesToClient || 0), + ); + } + } + + _init_view_tabs(host) + { + const tabs_el = document.createElement("div"); + tabs_el.className = "history-tabs"; + tabs_el.style.marginBottom = "8px"; + tabs_el.style.width = "fit-content"; + host.tag().inner().appendChild(tabs_el); + + this._view_tabs = {}; + const make_tab = (label, mode) => { + const btn = document.createElement("button"); + btn.className = "history-tab"; + btn.textContent = label; + btn.addEventListener("click", () => { + if (this._view_mode === mode) { return; } + this._view_mode = mode; + try { localStorage.setItem("zen-proxy-view-mode", mode); } catch (e) {} + this._update_active_tab(); + if (this._connections_table) + { + this._connections_table.destroy(); + this._connections_table = null; + } + if (this._last_data) + { + this._render_connections(this._last_data); + } + }); + tabs_el.appendChild(btn); + this._view_tabs[mode] = btn; + }; + + make_tab("Per Connection", "per-conn"); + make_tab("Group by IP", "by-ip"); + make_tab("Group by Session", "by-session"); + this._update_active_tab(); + } + + _update_active_tab() + { + for (const [mode, btn] of Object.entries(this._view_tabs)) + { + btn.classList.toggle("active", this._view_mode === mode); + } + } + + _render_connections(data) + { + this._last_data = data; + + const mappings = data.mappings || []; + let connections = []; + for (const m of mappings) + { + for (const c of (m.connections || [])) + { + connections.push(c); + } + } + + if (this._view_mode === "by-ip") + { + this._render_connections_grouped_ip(connections); + } + else if (this._view_mode === "by-session") + { + this._render_connections_grouped_session(connections); + } + else + { + this._render_connections_flat(connections); + } + } + + _render_connections_flat(connections) + { + if (this._connections_table) + { + this._connections_table.clear(); + } + else + { + this._connections_table = this._connections_host.add_widget( + Table, + ["client", "session", "target", "requests", "from client", "to client", "duration"], + Table.Flag_FitLeft|Table.Flag_PackRight|Table.Flag_AlignNumeric, -1 + ); + } + + for (const c of connections) + { + const row = this._connections_table.add_row( + c.client || "", + c.sessionId || "", + c.target || "", + Friendly.sep(c.requests || 0), + Friendly.bytes(c.bytesFromClient || 0), + Friendly.bytes(c.bytesToClient || 0), + Friendly.duration((c.durationMs || 0) / 1000), + ); + row.get_cell(0).style("textAlign", "left"); + row.get_cell(1).style("textAlign", "left"); + row.get_cell(2).style("textAlign", "left"); + if (c.websocket) + { + this._append_badge(row.get_cell(0), "WS"); + } + } + } + + _append_badge(cell, text) + { + const badge = document.createElement("span"); + badge.className = "detail-tag"; + badge.style.marginLeft = "6px"; + badge.style.background = "color-mix(in srgb, var(--theme_p0) 15%, transparent)"; + badge.style.color = "var(--theme_p0)"; + badge.textContent = text; + cell.inner().appendChild(badge); + } + + _render_connections_grouped_ip(connections) + { + if (this._connections_table) + { + this._connections_table.clear(); + } + else + { + this._connections_table = this._connections_host.add_widget( + Table, + ["client ip", "conns", "requests", "from client", "to client", "max duration"], + Table.Flag_FitLeft|Table.Flag_PackRight|Table.Flag_AlignNumeric, -1 + ); + } + + const groups = new Map(); + for (const c of connections) + { + const ip = (c.client || "").replace(/:\d+$/, ""); + let group = groups.get(ip); + if (!group) + { + group = { ip: ip, conns: 0, wsConns: 0, requests: 0, bytesFromClient: 0, bytesToClient: 0, maxDurationMs: 0 }; + groups.set(ip, group); + } + group.conns++; + if (c.websocket) { group.wsConns++; } + group.requests += (c.requests || 0); + group.bytesFromClient += (c.bytesFromClient || 0); + group.bytesToClient += (c.bytesToClient || 0); + group.maxDurationMs = Math.max(group.maxDurationMs, c.durationMs || 0); + } + + for (const g of groups.values()) + { + const row = this._connections_table.add_row( + g.ip, + Friendly.sep(g.conns), + Friendly.sep(g.requests), + Friendly.bytes(g.bytesFromClient), + Friendly.bytes(g.bytesToClient), + Friendly.duration(g.maxDurationMs / 1000), + ); + row.get_cell(0).style("textAlign", "left"); + if (g.wsConns > 0) + { + this._append_badge(row.get_cell(0), g.wsConns === 1 ? "WS" : `${g.wsConns} WS`); + } + } + } + + _render_connections_grouped_session(connections) + { + if (this._connections_table) + { + this._connections_table.clear(); + } + else + { + this._connections_table = this._connections_host.add_widget( + Table, + ["session", "conns", "requests", "from client", "to client", "max duration"], + Table.Flag_FitLeft|Table.Flag_PackRight|Table.Flag_AlignNumeric, -1 + ); + } + + const groups = new Map(); + for (const c of connections) + { + const sid = c.sessionId || "(none)"; + let group = groups.get(sid); + if (!group) + { + group = { sessionId: sid, conns: 0, wsConns: 0, requests: 0, bytesFromClient: 0, bytesToClient: 0, maxDurationMs: 0 }; + groups.set(sid, group); + } + group.conns++; + if (c.websocket) { group.wsConns++; } + group.requests += (c.requests || 0); + group.bytesFromClient += (c.bytesFromClient || 0); + group.bytesToClient += (c.bytesToClient || 0); + group.maxDurationMs = Math.max(group.maxDurationMs, c.durationMs || 0); + } + + for (const g of groups.values()) + { + const row = this._connections_table.add_row( + g.sessionId, + Friendly.sep(g.conns), + Friendly.sep(g.requests), + Friendly.bytes(g.bytesFromClient), + Friendly.bytes(g.bytesToClient), + Friendly.duration(g.maxDurationMs / 1000), + ); + row.get_cell(0).style("textAlign", "left"); + if (g.wsConns > 0) + { + this._append_badge(row.get_cell(0), g.wsConns === 1 ? "WS" : `${g.wsConns} WS`); + } + } + } + + _metric(parent, value, label, hero = false) + { + const m = parent.tag().classify("tile-metric"); + if (hero) + { + m.classify("tile-metric-hero"); + } + m.tag().classify("metric-value").text(value); + m.tag().classify("metric-label").text(label); + } +} diff --git a/src/zenserver/frontend/html/pages/start.js b/src/zenserver/frontend/html/pages/start.js index 3a68a725d..580045060 100644 --- a/src/zenserver/frontend/html/pages/start.js +++ b/src/zenserver/frontend/html/pages/start.js @@ -262,6 +262,30 @@ export class Page extends ZenPage tile.on_click(() => { window.location = "?page=stat&provider=builds"; }); } + // Proxy tile + if (all_stats["proxy"]) + { + const s = all_stats["proxy"]; + const tile = grid.tag().classify("card").classify("stats-tile"); + tile.tag().classify("card-title").text("Proxy"); + const body = tile.tag().classify("tile-metrics"); + + const mappings = s.mappings || []; + let totalActive = 0; + let totalBytes = 0; + for (const m of mappings) + { + totalActive += (m.activeConnections || 0); + totalBytes += (m.bytesFromClient || 0) + (m.bytesToClient || 0); + } + + this._add_tile_metric(body, Friendly.sep(totalActive), "active connections", true); + this._add_tile_metric(body, Friendly.sep(mappings.length), "mappings"); + this._add_tile_metric(body, Friendly.bytes(totalBytes), "traffic"); + + tile.on_click(() => { window.location = "?page=proxy"; }); + } + // Workspace tile (ws) if (all_stats["ws"]) { diff --git a/src/zenserver/frontend/html/zen.css b/src/zenserver/frontend/html/zen.css index a968aecab..74336f0e1 100644 --- a/src/zenserver/frontend/html/zen.css +++ b/src/zenserver/frontend/html/zen.css @@ -1062,7 +1062,9 @@ tr:last-child td { .history-tab { background: transparent; + border: none; color: var(--theme_g1); + cursor: pointer; font-size: 12px; font-weight: 600; padding: 4px 12px; diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index a019ff295..696991403 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -400,6 +400,7 @@ void ZenHubServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) { ZenHubServer Server; + Server.SetServerMode("Hub"); Server.SetDataRoot(m_ServerOptions.DataDir); Server.SetContentRoot(m_ServerOptions.ContentDir); Server.SetTestMode(m_ServerOptions.IsTest); diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index 09ecc48e5..26ae85ae1 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -31,6 +31,8 @@ #include "hub/zenhubserver.h" +#include "proxy/zenproxyserver.h" + #if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> # include <zenutil/windows/windowsservice.h> @@ -291,6 +293,7 @@ main(int argc, char* argv[]) kHub, kStore, kCompute, + kProxy, kTest } ServerMode = kStore; @@ -308,6 +311,10 @@ main(int argc, char* argv[]) { ServerMode = kCompute; } + else if (argv[1] == "proxy"sv) + { + ServerMode = kProxy; + } else if (argv[1] == "test"sv) { ServerMode = kTest; @@ -333,6 +340,8 @@ main(int argc, char* argv[]) fprintf(stderr, "compute services are not compiled in!\n"); exit(5); #endif + case kProxy: + return AppMain<ZenProxyServerMain>(argc, argv); default: case kStore: return AppMain<ZenStorageServerMain>(argc, argv); diff --git a/src/zenserver/proxy/httpproxystats.cpp b/src/zenserver/proxy/httpproxystats.cpp new file mode 100644 index 000000000..6aa3e5c9b --- /dev/null +++ b/src/zenserver/proxy/httpproxystats.cpp @@ -0,0 +1,234 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httpproxystats.h" + +#include "tcpproxy.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/fmtutils.h> + +#include <chrono> +#include <filesystem> + +namespace zen { + +HttpProxyStatsService::HttpProxyStatsService(const std::vector<std::unique_ptr<TcpProxyService>>& ProxyServices, + IHttpStatsService& StatsService, + std::string DefaultRecordDir) +: m_ProxyServices(ProxyServices) +, m_StatsService(StatsService) +, m_DefaultRecordDir(std::move(DefaultRecordDir)) +{ + m_StatsService.RegisterHandler("proxy", *this); +} + +HttpProxyStatsService::~HttpProxyStatsService() +{ + m_StatsService.UnregisterHandler("proxy", *this); +} + +const char* +HttpProxyStatsService::BaseUri() const +{ + return "/proxy/"; +} + +void +HttpProxyStatsService::HandleRequest(HttpServerRequest& Request) +{ + std::string_view Uri = Request.RelativeUri(); + + if (Uri == "stats" || Uri == "stats/") + { + HandleStatsRequest(Request); + } + else if (Uri == "record/start" || Uri == "record/start/") + { + HandleRecordStart(Request); + } + else if (Uri == "record/stop" || Uri == "record/stop/") + { + HandleRecordStop(Request); + } + else if (Uri == "record" || Uri == "record/") + { + HandleRecordStatus(Request); + } + else + { + Request.WriteResponse(HttpResponseCode::NotFound); + } +} + +void +HttpProxyStatsService::HandleRecordStart(HttpServerRequest& Request) +{ + if (Request.RequestVerb() != HttpVerb::kPost) + { + Request.WriteResponse(HttpResponseCode::MethodNotAllowed); + return; + } + + auto Params = Request.GetQueryParams(); + std::string_view Dir = Params.GetValue("dir"); + + std::string RecordDir; + if (Dir.empty()) + { + RecordDir = m_DefaultRecordDir; + } + else + { + // Treat dir as a subdirectory name within the default record directory. + // Reject path separators and parent references to prevent path traversal. + if (Dir.find("..") != std::string_view::npos || Dir.find('/') != std::string_view::npos || Dir.find('\\') != std::string_view::npos) + { + Request.WriteResponse(HttpResponseCode::BadRequest); + return; + } + RecordDir = (std::filesystem::path(m_DefaultRecordDir) / std::string(Dir)).string(); + } + + for (const std::unique_ptr<TcpProxyService>& Service : m_ProxyServices) + { + Service->SetRecording(true, RecordDir); + } + + CbObjectWriter Cbo; + Cbo << "recording" << true; + Cbo << "dir" << std::string_view(RecordDir); + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void +HttpProxyStatsService::HandleRecordStop(HttpServerRequest& Request) +{ + if (Request.RequestVerb() != HttpVerb::kPost) + { + Request.WriteResponse(HttpResponseCode::MethodNotAllowed); + return; + } + + for (const std::unique_ptr<TcpProxyService>& Service : m_ProxyServices) + { + Service->SetRecording(false, Service->GetRecordDir()); + } + + CbObjectWriter Cbo; + Cbo << "recording" << false; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void +HttpProxyStatsService::HandleRecordStatus(HttpServerRequest& Request) +{ + bool IsRecording = false; + std::string RecordDir; + for (const std::unique_ptr<TcpProxyService>& Service : m_ProxyServices) + { + if (Service->IsRecording()) + { + IsRecording = true; + RecordDir = Service->GetRecordDir(); + break; + } + } + + CbObjectWriter Cbo; + Cbo << "recording" << IsRecording; + Cbo << "dir" << std::string_view(RecordDir); + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +CbObject +HttpProxyStatsService::CollectStats() +{ + CbObjectWriter Cbo; + + // Include recording status in stats output. + { + bool IsRecording = false; + std::string RecordDir; + for (const std::unique_ptr<TcpProxyService>& Service : m_ProxyServices) + { + if (Service->IsRecording()) + { + IsRecording = true; + RecordDir = Service->GetRecordDir(); + break; + } + } + Cbo << "recording" << IsRecording; + Cbo << "recordDir" << std::string_view(RecordDir); + } + + Cbo.BeginArray("mappings"); + for (const std::unique_ptr<TcpProxyService>& Service : m_ProxyServices) + { + const ProxyMapping& Mapping = Service->GetMapping(); + + Cbo.BeginObject(); + { + std::string ListenAddr = Mapping.ListenDescription(); + Cbo << "listen" << std::string_view(ListenAddr); + + std::string TargetAddr = Mapping.TargetDescription(); + Cbo << "target" << std::string_view(TargetAddr); + + Cbo << "activeConnections" << Service->GetActiveConnections(); + Cbo << "peakActiveConnections" << Service->GetPeakActiveConnections(); + Cbo << "totalConnections" << Service->GetTotalConnections(); + Cbo << "bytesFromClient" << Service->GetTotalBytesFromClient(); + Cbo << "bytesToClient" << Service->GetTotalBytesToClient(); + + Cbo << "requestRate1" << Service->GetRequestMeter().Rate1(); + Cbo << "byteRate1" << Service->GetBytesMeter().Rate1(); + Cbo << "byteRate5" << Service->GetBytesMeter().Rate5(); + + auto Now = std::chrono::steady_clock::now(); + auto Sessions = Service->GetActiveSessions(); + + Cbo.BeginArray("connections"); + for (const auto& Session : Sessions) + { + Cbo.BeginObject(); + { + std::string ClientLabel = Session->GetClientLabel(); + Cbo << "client" << std::string_view(ClientLabel); + + std::string TargetLabel = Mapping.TargetDescription(); + Cbo << "target" << std::string_view(TargetLabel); + + Cbo << "bytesFromClient" << Session->GetBytesFromClient(); + Cbo << "bytesToClient" << Session->GetBytesToClient(); + + Cbo << "requests" << Session->GetRequestCount(); + Cbo << "websocket" << Session->IsWebSocket(); + + if (Session->HasSessionId()) + { + std::string SessionId = Session->GetSessionId().ToString(); + Cbo << "sessionId" << std::string_view(SessionId); + } + + double DurationMs = std::chrono::duration<double, std::milli>(Now - Session->GetStartTime()).count(); + Cbo << "durationMs" << DurationMs; + } + Cbo.EndObject(); + } + Cbo.EndArray(); + } + Cbo.EndObject(); + } + Cbo.EndArray(); + + return Cbo.Save(); +} + +void +HttpProxyStatsService::HandleStatsRequest(HttpServerRequest& Request) +{ + Request.WriteResponse(HttpResponseCode::OK, CollectStats()); +} + +} // namespace zen diff --git a/src/zenserver/proxy/httpproxystats.h b/src/zenserver/proxy/httpproxystats.h new file mode 100644 index 000000000..76ac7c875 --- /dev/null +++ b/src/zenserver/proxy/httpproxystats.h @@ -0,0 +1,39 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhttp/httpserver.h> + +#include <memory> +#include <vector> + +namespace zen { + +class TcpProxyService; + +class HttpProxyStatsService : public HttpService, public IHttpStatsProvider +{ +public: + HttpProxyStatsService(const std::vector<std::unique_ptr<TcpProxyService>>& ProxyServices, + IHttpStatsService& StatsService, + std::string DefaultRecordDir); + ~HttpProxyStatsService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + + // IHttpStatsProvider + virtual void HandleStatsRequest(HttpServerRequest& Request) override; + virtual CbObject CollectStats() override; + +private: + void HandleRecordStart(HttpServerRequest& Request); + void HandleRecordStop(HttpServerRequest& Request); + void HandleRecordStatus(HttpServerRequest& Request); + + const std::vector<std::unique_ptr<TcpProxyService>>& m_ProxyServices; + IHttpStatsService& m_StatsService; + std::string m_DefaultRecordDir; +}; + +} // namespace zen diff --git a/src/zenserver/proxy/httptrafficinspector.cpp b/src/zenserver/proxy/httptrafficinspector.cpp new file mode 100644 index 000000000..74ecbfd48 --- /dev/null +++ b/src/zenserver/proxy/httptrafficinspector.cpp @@ -0,0 +1,197 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httptrafficinspector.h" + +#include <zencore/logging.h> +#include <zencore/string.h> + +#include <charconv> + +namespace zen { + +// clang-format off +http_parser_settings HttpTrafficInspector::s_RequestSettings{ + .on_message_begin = [](http_parser*) { return 0; }, + .on_url = [](http_parser* p, const char* Data, size_t Len) { return GetThis(p)->OnUrl(Data, Len); }, + .on_status = [](http_parser*, const char*, size_t) { return 0; }, + .on_header_field = [](http_parser* p, const char* Data, size_t Len) { return GetThis(p)->OnHeaderField(Data, Len); }, + .on_header_value = [](http_parser* p, const char* Data, size_t Len) { return GetThis(p)->OnHeaderValue(Data, Len); }, + .on_headers_complete = [](http_parser* p) { return GetThis(p)->OnHeadersComplete(); }, + .on_body = [](http_parser*, const char*, size_t) { return 0; }, + .on_message_complete = [](http_parser* p) { return GetThis(p)->OnMessageComplete(); }, + .on_chunk_header{}, + .on_chunk_complete{}}; + +http_parser_settings HttpTrafficInspector::s_ResponseSettings{ + .on_message_begin = [](http_parser*) { return 0; }, + .on_url = [](http_parser*, const char*, size_t) { return 0; }, + .on_status = [](http_parser*, const char*, size_t) { return 0; }, + .on_header_field = [](http_parser* p, const char* Data, size_t Len) { return GetThis(p)->OnHeaderField(Data, Len); }, + .on_header_value = [](http_parser* p, const char* Data, size_t Len) { return GetThis(p)->OnHeaderValue(Data, Len); }, + .on_headers_complete = [](http_parser* p) { return GetThis(p)->OnHeadersComplete(); }, + .on_body = [](http_parser*, const char*, size_t) { return 0; }, + .on_message_complete = [](http_parser* p) { return GetThis(p)->OnMessageComplete(); }, + .on_chunk_header{}, + .on_chunk_complete{}}; +// clang-format on + +HttpTrafficInspector::HttpTrafficInspector(Direction Dir, std::string_view SessionLabel) +: m_Log(logging::Get("proxy.http")) +, m_Direction(Dir) +, m_SessionLabel(SessionLabel) +{ + http_parser_init(&m_Parser, Dir == Direction::Request ? HTTP_REQUEST : HTTP_RESPONSE); + m_Parser.data = this; +} + +void +HttpTrafficInspector::Inspect(const char* Data, size_t Length) +{ + if (m_Disabled) + { + return; + } + + http_parser_settings* Settings = (m_Direction == Direction::Request) ? &s_RequestSettings : &s_ResponseSettings; + + size_t Parsed = http_parser_execute(&m_Parser, Settings, Data, Length); + + if (m_Parser.upgrade) + { + if (m_Direction == Direction::Request) + { + ZEN_DEBUG("[{}] >> {} {} (upgrade to WebSocket)", m_SessionLabel, m_Method, m_Url); + } + else + { + ZEN_DEBUG("[{}] << {} (upgrade to WebSocket)", m_SessionLabel, m_StatusCode); + } + ResetMessageState(); + m_Upgraded.store(true, std::memory_order_relaxed); + m_Disabled = true; + return; + } + + http_errno Error = HTTP_PARSER_ERRNO(&m_Parser); + if (Error != HPE_OK) + { + ZEN_DEBUG("[{}] non-HTTP traffic detected ({}), disabling inspection", m_SessionLabel, http_errno_name(Error)); + m_Disabled = true; + } + else if (Parsed != Length) + { + ZEN_DEBUG("[{}] parser consumed {}/{} bytes, disabling inspection", m_SessionLabel, Parsed, Length); + m_Disabled = true; + } +} + +int +HttpTrafficInspector::OnUrl(const char* Data, size_t Length) +{ + m_Url.append(Data, Length); + return 0; +} + +int +HttpTrafficInspector::OnHeaderField(const char* Data, size_t Length) +{ + m_CurrentHeaderField.assign(Data, Length); + return 0; +} + +int +HttpTrafficInspector::OnHeaderValue(const char* Data, size_t Length) +{ + if (m_CurrentHeaderField.size() == 14 && StrCaseCompare(m_CurrentHeaderField.c_str(), "Content-Length", 14) == 0) + { + int64_t Value = 0; + std::from_chars(Data, Data + Length, Value); + m_ContentLength = Value; + } + else if (!m_SessionIdCaptured && m_CurrentHeaderField.size() == 10 && + StrCaseCompare(m_CurrentHeaderField.c_str(), "UE-Session", 10) == 0) + { + Oid Parsed; + if (Oid::TryParse(std::string_view(Data, Length), Parsed)) + { + m_SessionId = Parsed; + m_SessionIdCaptured = true; + } + } + m_CurrentHeaderField.clear(); + return 0; +} + +int +HttpTrafficInspector::OnHeadersComplete() +{ + if (m_Direction == Direction::Request) + { + m_Method = http_method_str(static_cast<http_method>(m_Parser.method)); + } + else + { + m_StatusCode = m_Parser.status_code; + } + return 0; +} + +int +HttpTrafficInspector::OnMessageComplete() +{ + if (m_Direction == Direction::Request) + { + if (m_ContentLength >= 0) + { + ZEN_DEBUG("[{}] >> {} {} (content-length: {})", m_SessionLabel, m_Method, m_Url, m_ContentLength); + } + else + { + ZEN_DEBUG("[{}] >> {} {}", m_SessionLabel, m_Method, m_Url); + } + } + else + { + if (m_ContentLength >= 0) + { + ZEN_DEBUG("[{}] << {} (content-length: {})", m_SessionLabel, m_StatusCode, m_ContentLength); + } + else + { + ZEN_DEBUG("[{}] << {}", m_SessionLabel, m_StatusCode); + } + } + + if (m_Observer) + { + m_Observer->OnMessageComplete(m_Direction, m_Method, m_Url, m_StatusCode, m_ContentLength); + } + + m_MessageCount.fetch_add(1, std::memory_order_relaxed); + ResetMessageState(); + return 0; +} + +Oid +HttpTrafficInspector::GetSessionId() const +{ + return m_SessionId; +} + +bool +HttpTrafficInspector::HasSessionId() const +{ + return m_SessionIdCaptured; +} + +void +HttpTrafficInspector::ResetMessageState() +{ + m_Url.clear(); + m_Method.clear(); + m_StatusCode = 0; + m_ContentLength = -1; + m_CurrentHeaderField.clear(); +} + +} // namespace zen diff --git a/src/zenserver/proxy/httptrafficinspector.h b/src/zenserver/proxy/httptrafficinspector.h new file mode 100644 index 000000000..f4af0e77e --- /dev/null +++ b/src/zenserver/proxy/httptrafficinspector.h @@ -0,0 +1,85 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/logging.h> +#include <zencore/uid.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <http_parser.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <atomic> +#include <cstdint> +#include <string> +#include <string_view> + +namespace zen { + +class HttpTrafficInspector +{ +public: + enum class Direction + { + Request, + Response + }; + + HttpTrafficInspector(Direction Dir, std::string_view SessionLabel); + + void Inspect(const char* Data, size_t Length); + + uint64_t GetMessageCount() const { return m_MessageCount.load(std::memory_order_relaxed); } + bool IsUpgraded() const { return m_Upgraded.load(std::memory_order_relaxed); } + Oid GetSessionId() const; + bool HasSessionId() const; + + void SetObserver(class IHttpTrafficObserver* Observer) { m_Observer = Observer; } + +private: + int OnUrl(const char* Data, size_t Length); + int OnHeaderField(const char* Data, size_t Length); + int OnHeaderValue(const char* Data, size_t Length); + int OnHeadersComplete(); + int OnMessageComplete(); + + void ResetMessageState(); + + static HttpTrafficInspector* GetThis(http_parser* Parser) { return static_cast<HttpTrafficInspector*>(Parser->data); } + + static http_parser_settings s_RequestSettings; + static http_parser_settings s_ResponseSettings; + + LoggerRef Log() { return m_Log; } + + LoggerRef m_Log; + http_parser m_Parser; + Direction m_Direction; + std::string m_SessionLabel; + bool m_Disabled = false; + + // Per-message state + std::string m_Url; + std::string m_Method; + uint16_t m_StatusCode = 0; + int64_t m_ContentLength = -1; + std::string m_CurrentHeaderField; + std::atomic<uint64_t> m_MessageCount{0}; + std::atomic<bool> m_Upgraded{false}; + Oid m_SessionId = Oid::Zero; + bool m_SessionIdCaptured = false; + IHttpTrafficObserver* m_Observer = nullptr; +}; + +class IHttpTrafficObserver +{ +public: + virtual ~IHttpTrafficObserver() = default; + virtual void OnMessageComplete(HttpTrafficInspector::Direction Dir, + std::string_view Method, + std::string_view Url, + uint16_t StatusCode, + int64_t ContentLength) = 0; +}; + +} // namespace zen diff --git a/src/zenserver/proxy/httptrafficrecorder.cpp b/src/zenserver/proxy/httptrafficrecorder.cpp new file mode 100644 index 000000000..0279555a0 --- /dev/null +++ b/src/zenserver/proxy/httptrafficrecorder.cpp @@ -0,0 +1,191 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httptrafficrecorder.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/logging.h> + +#include <chrono> +#include <filesystem> + +namespace zen { + +HttpTrafficRecorder::HttpTrafficRecorder(const std::filesystem::path& OutputDir, std::string_view ClientLabel, std::string_view TargetLabel) +: m_Log(logging::Get("proxy.record")) +, m_Dir(OutputDir) +, m_ClientLabel(ClientLabel) +, m_TargetLabel(TargetLabel) +{ + auto Now = std::chrono::system_clock::now(); + m_StartTimeMs = uint64_t(std::chrono::duration_cast<std::chrono::milliseconds>(Now.time_since_epoch()).count()); + + std::error_code Ec; + std::filesystem::create_directories(m_Dir, Ec); + if (Ec) + { + ZEN_WARN("failed to create recording directory {} - {}", m_Dir.string(), Ec.message()); + return; + } + + std::error_code ReqEc; + m_RequestFile.Open(m_Dir / "request.bin", BasicFile::Mode::kTruncate, ReqEc); + if (ReqEc) + { + ZEN_WARN("failed to open request.bin in {} - {}", m_Dir.string(), ReqEc.message()); + return; + } + + std::error_code RespEc; + m_ResponseFile.Open(m_Dir / "response.bin", BasicFile::Mode::kTruncate, RespEc); + if (RespEc) + { + ZEN_WARN("failed to open response.bin in {} - {}", m_Dir.string(), RespEc.message()); + m_RequestFile.Close(); + return; + } + + m_Valid = true; + ZEN_DEBUG("recording started in {}", m_Dir.string()); +} + +HttpTrafficRecorder::~HttpTrafficRecorder() +{ + if (m_Valid && !m_Finalized) + { + Finalize(false, Oid::Zero); + } +} + +void +HttpTrafficRecorder::WriteRequest(const char* Data, size_t Length) +{ + if (!m_Valid) + { + return; + } + m_RequestFile.Write(Data, Length, m_RequestOffset); + m_RequestOffset += Length; +} + +void +HttpTrafficRecorder::WriteResponse(const char* Data, size_t Length) +{ + if (!m_Valid) + { + return; + } + m_ResponseFile.Write(Data, Length, m_ResponseOffset); + m_ResponseOffset += Length; +} + +void +HttpTrafficRecorder::OnMessageComplete(HttpTrafficInspector::Direction Dir, + std::string_view Method, + std::string_view Url, + uint16_t StatusCode, + int64_t /*ContentLength*/) +{ + if (!m_Valid) + { + return; + } + + if (Dir == HttpTrafficInspector::Direction::Request) + { + // Record the request boundary. The request spans from m_CurrentRequestStart to m_RequestOffset. + m_PendingReqOffset = m_CurrentRequestStart; + m_PendingReqSize = m_RequestOffset - m_CurrentRequestStart; + m_PendingMethod = Method; + m_PendingUrl = Url; + m_HasPendingRequest = true; + + // Advance start to current offset for the next request. + m_CurrentRequestStart = m_RequestOffset; + } + else + { + // Response complete -- pair with pending request. + RecordedEntry Entry; + if (m_HasPendingRequest) + { + Entry.ReqOffset = m_PendingReqOffset; + Entry.ReqSize = m_PendingReqSize; + Entry.Method = std::move(m_PendingMethod); + Entry.Url = std::move(m_PendingUrl); + m_HasPendingRequest = false; + } + + Entry.RespOffset = m_CurrentResponseStart; + Entry.RespSize = m_ResponseOffset - m_CurrentResponseStart; + Entry.Status = StatusCode; + + m_Entries.push_back(std::move(Entry)); + + // Advance start to current offset for the next response. + m_CurrentResponseStart = m_ResponseOffset; + } +} + +void +HttpTrafficRecorder::Finalize(bool WebSocket, const Oid& SessionId) +{ + if (!m_Valid || m_Finalized) + { + return; + } + m_Finalized = true; + + m_RequestFile.Close(); + m_ResponseFile.Close(); + + // Write index.ucb as a CbObject. + CbObjectWriter Cbo; + + Cbo << "client" << std::string_view(m_ClientLabel); + Cbo << "target" << std::string_view(m_TargetLabel); + Cbo << "startTime" << m_StartTimeMs; + Cbo << "websocket" << WebSocket; + if (SessionId != Oid::Zero) + { + std::string SessionIdStr = SessionId.ToString(); + Cbo << "sessionId" << std::string_view(SessionIdStr); + } + + Cbo.BeginArray("entries"); + for (const RecordedEntry& Entry : m_Entries) + { + Cbo.BeginObject(); + Cbo << "reqOffset" << Entry.ReqOffset; + Cbo << "reqSize" << Entry.ReqSize; + Cbo << "respOffset" << Entry.RespOffset; + Cbo << "respSize" << Entry.RespSize; + Cbo << "method" << std::string_view(Entry.Method); + Cbo << "url" << std::string_view(Entry.Url); + Cbo << "status" << Entry.Status; + Cbo.EndObject(); + } + Cbo.EndArray(); + + CbObject IndexObj = Cbo.Save(); + + MemoryView View = IndexObj.GetView(); + std::error_code Ec; + BasicFile IndexFile(m_Dir / "index.ucb", BasicFile::Mode::kTruncate, Ec); + if (!Ec) + { + IndexFile.Write(View, 0, Ec); + if (Ec) + { + ZEN_WARN("failed to write index.ucb in {} - {}", m_Dir.string(), Ec.message()); + } + IndexFile.Close(); + } + else + { + ZEN_WARN("failed to create index.ucb in {} - {}", m_Dir.string(), Ec.message()); + } + + ZEN_DEBUG("recording finalized in {} ({} entries, websocket: {})", m_Dir.string(), m_Entries.size(), WebSocket); +} + +} // namespace zen diff --git a/src/zenserver/proxy/httptrafficrecorder.h b/src/zenserver/proxy/httptrafficrecorder.h new file mode 100644 index 000000000..bbf22a14e --- /dev/null +++ b/src/zenserver/proxy/httptrafficrecorder.h @@ -0,0 +1,81 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "httptrafficinspector.h" + +#include <zencore/basicfile.h> +#include <zencore/logging.h> + +#include <cstdint> +#include <filesystem> +#include <string> +#include <string_view> +#include <vector> + +namespace zen { + +struct RecordedEntry +{ + uint64_t ReqOffset = 0; + uint64_t ReqSize = 0; + uint64_t RespOffset = 0; + uint64_t RespSize = 0; + std::string Method; + std::string Url; + uint32_t Status = 0; +}; + +class IHttpTrafficObserver; + +class HttpTrafficRecorder : public IHttpTrafficObserver +{ +public: + HttpTrafficRecorder(const std::filesystem::path& OutputDir, std::string_view ClientLabel, std::string_view TargetLabel); + ~HttpTrafficRecorder(); + + bool IsValid() const { return m_Valid; } + + void WriteRequest(const char* Data, size_t Length); + void WriteResponse(const char* Data, size_t Length); + + // IHttpTrafficObserver + void OnMessageComplete(HttpTrafficInspector::Direction Dir, + std::string_view Method, + std::string_view Url, + uint16_t StatusCode, + int64_t ContentLength) override; + + void Finalize(bool WebSocket, const Oid& SessionId); + +private: + LoggerRef Log() { return m_Log; } + + LoggerRef m_Log; + std::filesystem::path m_Dir; + std::string m_ClientLabel; + std::string m_TargetLabel; + uint64_t m_StartTimeMs = 0; + bool m_Valid = false; + bool m_Finalized = false; + + BasicFile m_RequestFile; + BasicFile m_ResponseFile; + + uint64_t m_RequestOffset = 0; + uint64_t m_ResponseOffset = 0; + + uint64_t m_CurrentRequestStart = 0; + uint64_t m_CurrentResponseStart = 0; + + // Pending request metadata waiting for its paired response. + std::string m_PendingMethod; + std::string m_PendingUrl; + uint64_t m_PendingReqOffset = 0; + uint64_t m_PendingReqSize = 0; + bool m_HasPendingRequest = false; + + std::vector<RecordedEntry> m_Entries; +}; + +} // namespace zen diff --git a/src/zenserver/proxy/tcpproxy.cpp b/src/zenserver/proxy/tcpproxy.cpp new file mode 100644 index 000000000..bdc0de164 --- /dev/null +++ b/src/zenserver/proxy/tcpproxy.cpp @@ -0,0 +1,610 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "tcpproxy.h" + +#include <zencore/logging.h> + +#include <filesystem> + +namespace zen { + +////////////////////////////////////////////////////////////////////////// +// ProxyMapping + +std::string +ProxyMapping::ListenDescription() const +{ + if (IsUnixListen()) + { + return fmt::format("unix:{}", ListenUnixSocket); + } + std::string Addr = ListenAddress.empty() ? "0.0.0.0" : ListenAddress; + return fmt::format("{}:{}", Addr, ListenPort); +} + +std::string +ProxyMapping::TargetDescription() const +{ + if (IsUnixTarget()) + { + return fmt::format("unix:{}", TargetUnixSocket); + } + return fmt::format("{}:{}", TargetHost, TargetPort); +} + +////////////////////////////////////////////////////////////////////////// +// TcpProxySession + +TcpProxySession::TcpProxySession(asio::ip::tcp::socket ClientSocket, const ProxyMapping& Mapping, TcpProxyService& Owner) +: m_ClientTcpSocket(std::move(ClientSocket)) +, m_UpstreamTcpSocket(m_ClientTcpSocket.get_executor()) +#if defined(ASIO_HAS_LOCAL_SOCKETS) +, m_ClientUnixSocket(m_ClientTcpSocket.get_executor()) +, m_UpstreamUnixSocket(m_ClientTcpSocket.get_executor()) +, m_IsUnixClient(false) +, m_IsUnixTarget(Mapping.IsUnixTarget()) +#endif +, m_TargetHost(Mapping.TargetHost) +, m_TargetPort(Mapping.TargetPort) +, m_TargetUnixSocket(Mapping.TargetUnixSocket) +, m_Owner(Owner) +{ +} + +#if defined(ASIO_HAS_LOCAL_SOCKETS) +TcpProxySession::TcpProxySession(asio::local::stream_protocol::socket ClientSocket, const ProxyMapping& Mapping, TcpProxyService& Owner) +: m_ClientTcpSocket(ClientSocket.get_executor()) +, m_UpstreamTcpSocket(ClientSocket.get_executor()) +, m_ClientUnixSocket(std::move(ClientSocket)) +, m_UpstreamUnixSocket(m_ClientUnixSocket.get_executor()) +, m_IsUnixClient(true) +, m_IsUnixTarget(Mapping.IsUnixTarget()) +, m_TargetHost(Mapping.TargetHost) +, m_TargetPort(Mapping.TargetPort) +, m_TargetUnixSocket(Mapping.TargetUnixSocket) +, m_Owner(Owner) +{ +} +#endif + +LoggerRef +TcpProxySession::Log() +{ + return m_Owner.Log(); +} + +void +TcpProxySession::Start() +{ +#if defined(ASIO_HAS_LOCAL_SOCKETS) + if (m_IsUnixTarget) + { + ConnectToUnixTarget(); + return; + } +#endif + ConnectToTcpTarget(); +} + +void +TcpProxySession::ConnectToTcpTarget() +{ + auto Self = shared_from_this(); + auto Resolver = std::make_shared<asio::ip::tcp::resolver>(m_UpstreamTcpSocket.get_executor()); + + Resolver->async_resolve(m_TargetHost, + std::to_string(m_TargetPort), + [this, Self, Resolver](const asio::error_code& Ec, asio::ip::tcp::resolver::results_type Results) { + if (Ec) + { + ZEN_WARN("failed to resolve {}:{} - {}", m_TargetHost, m_TargetPort, Ec.message()); + Shutdown(); + return; + } + + asio::async_connect( + m_UpstreamTcpSocket, + Results, + [this, Self](const asio::error_code& ConnectEc, const asio::ip::tcp::endpoint& /*Endpoint*/) { + if (ConnectEc) + { + ZEN_WARN("failed to connect to {}:{} - {}", m_TargetHost, m_TargetPort, ConnectEc.message()); + Shutdown(); + return; + } + + StartRelay(); + }); + }); +} + +#if defined(ASIO_HAS_LOCAL_SOCKETS) +void +TcpProxySession::ConnectToUnixTarget() +{ + auto Self = shared_from_this(); + + asio::local::stream_protocol::endpoint Endpoint(m_TargetUnixSocket); + + m_UpstreamUnixSocket.async_connect(Endpoint, [this, Self](const asio::error_code& Ec) { + if (Ec) + { + ZEN_WARN("failed to connect to unix:{} - {}", m_TargetUnixSocket, Ec.message()); + Shutdown(); + return; + } + + StartRelay(); + }); +} +#endif + +void +TcpProxySession::StartRelay() +{ + asio::error_code Ec; + + // TCP no_delay only applies to TCP sockets. +#if defined(ASIO_HAS_LOCAL_SOCKETS) + if (!m_IsUnixClient) +#endif + { + m_ClientTcpSocket.set_option(asio::ip::tcp::no_delay(true), Ec); + } + +#if defined(ASIO_HAS_LOCAL_SOCKETS) + if (!m_IsUnixTarget) +#endif + { + m_UpstreamTcpSocket.set_option(asio::ip::tcp::no_delay(true), Ec); + } + + std::string TargetLabel = m_Owner.GetMapping().TargetDescription(); + std::string ClientLabel; + +#if defined(ASIO_HAS_LOCAL_SOCKETS) + if (m_IsUnixClient) + { + ClientLabel = "unix"; + } + else +#endif + { + asio::ip::tcp::endpoint ClientEndpoint = m_ClientTcpSocket.remote_endpoint(Ec); + if (!Ec) + { + ClientLabel = fmt::format("{}:{}", ClientEndpoint.address().to_string(), ClientEndpoint.port()); + } + else + { + ClientLabel = "?"; + } + } + + m_ClientLabel = ClientLabel; + m_StartTime = std::chrono::steady_clock::now(); + + std::string SessionLabel = fmt::format("{} -> {}", ClientLabel, TargetLabel); + + ZEN_DEBUG("session established {}", SessionLabel); + + m_RequestInspector.emplace(HttpTrafficInspector::Direction::Request, SessionLabel); + m_ResponseInspector.emplace(HttpTrafficInspector::Direction::Response, SessionLabel); + + if (m_Owner.IsRecording()) + { + std::string RecordDir = m_Owner.GetRecordDir(); + if (!RecordDir.empty()) + { + auto Now = std::chrono::system_clock::now(); + uint64_t Ms = uint64_t(std::chrono::duration_cast<std::chrono::milliseconds>(Now.time_since_epoch()).count()); + uint64_t Seq = m_Owner.m_RecordSessionCounter.fetch_add(1, std::memory_order_relaxed); + + std::filesystem::path ConnDir = std::filesystem::path(RecordDir) / fmt::format("{}_{}", Ms, Seq); + + m_Recorder = std::make_unique<HttpTrafficRecorder>(ConnDir, ClientLabel, TargetLabel); + if (m_Recorder->IsValid()) + { + m_RequestInspector->SetObserver(m_Recorder.get()); + m_ResponseInspector->SetObserver(m_Recorder.get()); + } + else + { + m_Recorder.reset(); + } + } + } + + ReadFromClient(); + ReadFromUpstream(); +} + +template<typename Fn> +void +TcpProxySession::DispatchClientSocket(Fn&& F) +{ +#if defined(ASIO_HAS_LOCAL_SOCKETS) + if (m_IsUnixClient) + { + F(m_ClientUnixSocket); + return; + } +#endif + F(m_ClientTcpSocket); +} + +template<typename Fn> +void +TcpProxySession::DispatchUpstreamSocket(Fn&& F) +{ +#if defined(ASIO_HAS_LOCAL_SOCKETS) + if (m_IsUnixTarget) + { + F(m_UpstreamUnixSocket); + return; + } +#endif + F(m_UpstreamTcpSocket); +} + +void +TcpProxySession::ReadFromClient() +{ + DispatchClientSocket([this](auto& Client) { DoReadFromClient(Client); }); +} + +void +TcpProxySession::ReadFromUpstream() +{ + DispatchUpstreamSocket([this](auto& Upstream) { DoReadFromUpstream(Upstream); }); +} + +template<typename SocketT> +void +TcpProxySession::DoReadFromClient(SocketT& ClientSocket) +{ + auto Self = shared_from_this(); + + ClientSocket.async_read_some(asio::buffer(m_ClientBuffer), [this, Self](const asio::error_code& Ec, size_t BytesRead) { + if (Ec) + { + if (Ec != asio::error::eof && Ec != asio::error::operation_aborted) + { + ZEN_DEBUG("client read error - {}", Ec.message()); + } + Shutdown(); + return; + } + + uint64_t RequestsBefore = m_RequestInspector ? m_RequestInspector->GetMessageCount() : 0; + if (m_RequestInspector) + { + m_RequestInspector->Inspect(m_ClientBuffer.data(), BytesRead); + } + if (m_Recorder) + { + m_Recorder->WriteRequest(m_ClientBuffer.data(), BytesRead); + } + uint64_t RequestsAfter = m_RequestInspector ? m_RequestInspector->GetMessageCount() : 0; + uint64_t NewRequests = RequestsAfter - RequestsBefore; + + DispatchUpstreamSocket( + [this, Self, BytesRead, NewRequests](auto& Upstream) { DoForwardToUpstream(Upstream, BytesRead, NewRequests); }); + }); +} + +template<typename SocketT> +void +TcpProxySession::DoForwardToUpstream(SocketT& UpstreamSocket, size_t BytesToWrite, uint64_t NewRequests) +{ + auto Self = shared_from_this(); + + asio::async_write(UpstreamSocket, + asio::buffer(m_ClientBuffer.data(), BytesToWrite), + [this, Self, BytesToWrite, NewRequests](const asio::error_code& WriteEc, size_t /*BytesWritten*/) { + if (WriteEc) + { + if (WriteEc != asio::error::operation_aborted) + { + ZEN_DEBUG("upstream write error - {}", WriteEc.message()); + } + Shutdown(); + return; + } + + m_Owner.m_TotalBytesFromClient.fetch_add(BytesToWrite, std::memory_order_relaxed); + m_BytesFromClient.fetch_add(BytesToWrite, std::memory_order_relaxed); + m_Owner.m_BytesMeter.Mark(BytesToWrite); + if (NewRequests > 0) + { + m_Owner.m_RequestMeter.Mark(NewRequests); + } + ReadFromClient(); + }); +} + +template<typename SocketT> +void +TcpProxySession::DoReadFromUpstream(SocketT& UpstreamSocket) +{ + auto Self = shared_from_this(); + + UpstreamSocket.async_read_some(asio::buffer(m_UpstreamBuffer), [this, Self](const asio::error_code& Ec, size_t BytesRead) { + if (Ec) + { + if (Ec != asio::error::eof && Ec != asio::error::operation_aborted) + { + ZEN_DEBUG("upstream read error - {}", Ec.message()); + } + Shutdown(); + return; + } + + if (m_ResponseInspector) + { + m_ResponseInspector->Inspect(m_UpstreamBuffer.data(), BytesRead); + } + if (m_Recorder) + { + m_Recorder->WriteResponse(m_UpstreamBuffer.data(), BytesRead); + } + + DispatchClientSocket([this, Self, BytesRead](auto& Client) { DoForwardToClient(Client, BytesRead); }); + }); +} + +template<typename SocketT> +void +TcpProxySession::DoForwardToClient(SocketT& ClientSocket, size_t BytesToWrite) +{ + auto Self = shared_from_this(); + + asio::async_write(ClientSocket, + asio::buffer(m_UpstreamBuffer.data(), BytesToWrite), + [this, Self, BytesToWrite](const asio::error_code& WriteEc, size_t /*BytesWritten*/) { + if (WriteEc) + { + if (WriteEc != asio::error::operation_aborted) + { + ZEN_DEBUG("client write error - {}", WriteEc.message()); + } + Shutdown(); + return; + } + + m_Owner.m_TotalBytesToClient.fetch_add(BytesToWrite, std::memory_order_relaxed); + m_BytesToClient.fetch_add(BytesToWrite, std::memory_order_relaxed); + m_Owner.m_BytesMeter.Mark(BytesToWrite); + ReadFromUpstream(); + }); +} + +template<typename SocketT> +void +TcpProxySession::DoShutdownSocket(SocketT& Socket) +{ + if (Socket.is_open()) + { + asio::error_code Ec; + Socket.shutdown(asio::socket_base::shutdown_both, Ec); + Socket.close(Ec); + } +} + +void +TcpProxySession::Shutdown() +{ + if (m_ShutdownCalled.exchange(true)) + { + return; + } + +#if defined(ASIO_HAS_LOCAL_SOCKETS) + if (m_IsUnixClient) + { + DoShutdownSocket(m_ClientUnixSocket); + } + else +#endif + { + DoShutdownSocket(m_ClientTcpSocket); + } + +#if defined(ASIO_HAS_LOCAL_SOCKETS) + if (m_IsUnixTarget) + { + DoShutdownSocket(m_UpstreamUnixSocket); + } + else +#endif + { + DoShutdownSocket(m_UpstreamTcpSocket); + } + + if (m_Recorder) + { + bool WebSocket = m_RequestInspector && m_RequestInspector->IsUpgraded(); + Oid SessionId = m_RequestInspector ? m_RequestInspector->GetSessionId() : Oid::Zero; + m_Recorder->Finalize(WebSocket, SessionId); + } + + m_Owner.m_ActiveConnections.fetch_sub(1, std::memory_order_relaxed); + m_Owner.RemoveSession(this); +} + +////////////////////////////////////////////////////////////////////////// +// TcpProxyService + +TcpProxyService::TcpProxyService(asio::io_context& IoContext, const ProxyMapping& Mapping) +: m_Log(logging::Get("proxy")) +, m_Mapping(Mapping) +, m_IoContext(IoContext) +, m_TcpAcceptor(IoContext) +#if defined(ASIO_HAS_LOCAL_SOCKETS) +, m_UnixAcceptor(IoContext) +#endif +{ + if (!Mapping.IsUnixListen()) + { + asio::ip::address ListenAddr = + Mapping.ListenAddress.empty() ? asio::ip::address_v4::any() : asio::ip::make_address(Mapping.ListenAddress); + m_ListenEndpoint = asio::ip::tcp::endpoint(ListenAddr, Mapping.ListenPort); + } +} + +void +TcpProxyService::Start() +{ +#if defined(ASIO_HAS_LOCAL_SOCKETS) + if (m_Mapping.IsUnixListen()) + { + // Remove stale socket file if it exists. + std::error_code RemoveEc; + std::filesystem::remove(m_Mapping.ListenUnixSocket, RemoveEc); + + asio::local::stream_protocol::endpoint Endpoint(m_Mapping.ListenUnixSocket); + m_UnixAcceptor.open(Endpoint.protocol()); + m_UnixAcceptor.bind(Endpoint); + m_UnixAcceptor.listen(); + + ZEN_INFO("listening on {} -> {}", m_Mapping.ListenDescription(), m_Mapping.TargetDescription()); + + DoAcceptUnix(); + return; + } +#endif + + m_TcpAcceptor.open(m_ListenEndpoint.protocol()); + m_TcpAcceptor.set_option(asio::ip::tcp::acceptor::reuse_address(true)); + m_TcpAcceptor.bind(m_ListenEndpoint); + m_TcpAcceptor.listen(); + + ZEN_INFO("listening on {} -> {}", m_Mapping.ListenDescription(), m_Mapping.TargetDescription()); + + DoAccept(); +} + +void +TcpProxyService::Stop() +{ + m_Stopped = true; + + asio::error_code Ec; + +#if defined(ASIO_HAS_LOCAL_SOCKETS) + if (m_Mapping.IsUnixListen()) + { + m_UnixAcceptor.close(Ec); + + // Clean up the socket file. + std::error_code RemoveEc; + std::filesystem::remove(m_Mapping.ListenUnixSocket, RemoveEc); + return; + } +#endif + + m_TcpAcceptor.close(Ec); +} + +void +TcpProxyService::OnAcceptedSession(std::shared_ptr<TcpProxySession> Session) +{ + m_TotalConnections.fetch_add(1, std::memory_order_relaxed); + uint64_t Active = m_ActiveConnections.fetch_add(1, std::memory_order_relaxed) + 1; + uint64_t Peak = m_PeakActiveConnections.load(std::memory_order_relaxed); + while (Active > Peak && !m_PeakActiveConnections.compare_exchange_weak(Peak, Active, std::memory_order_relaxed)) + ; + AddSession(Session); + Session->Start(); +} + +void +TcpProxyService::AddSession(std::shared_ptr<TcpProxySession> Session) +{ + RwLock::ExclusiveLockScope Lock(m_SessionsLock); + m_Sessions.push_back(std::move(Session)); +} + +void +TcpProxyService::RemoveSession(TcpProxySession* Session) +{ + RwLock::ExclusiveLockScope Lock(m_SessionsLock); + auto It = std::find_if(m_Sessions.begin(), m_Sessions.end(), [Session](const std::shared_ptr<TcpProxySession>& S) { + return S.get() == Session; + }); + if (It != m_Sessions.end()) + { + // Swap-and-pop for O(1) removal; order doesn't matter. + std::swap(*It, m_Sessions.back()); + m_Sessions.pop_back(); + } +} + +std::vector<std::shared_ptr<TcpProxySession>> +TcpProxyService::GetActiveSessions() const +{ + RwLock::SharedLockScope Lock(m_SessionsLock); + return m_Sessions; +} + +void +TcpProxyService::SetRecording(bool Enabled, const std::string& Dir) +{ + { + RwLock::ExclusiveLockScope Lock(m_RecordDirLock); + m_RecordDir = Dir; + } + m_RecordingEnabled.store(Enabled, std::memory_order_relaxed); + ZEN_INFO("proxy recording {} (dir: {})", Enabled ? "enabled" : "disabled", Dir); +} + +std::string +TcpProxyService::GetRecordDir() const +{ + RwLock::SharedLockScope Lock(m_RecordDirLock); + return m_RecordDir; +} + +void +TcpProxyService::DoAccept() +{ + m_TcpAcceptor.async_accept([this](const asio::error_code& Ec, asio::ip::tcp::socket Socket) { + if (Ec) + { + if (!m_Stopped) + { + ZEN_WARN("accept error - {}", Ec.message()); + } + return; + } + + ZEN_DEBUG("accepted connection from {}", Socket.remote_endpoint().address().to_string()); + + OnAcceptedSession(std::make_shared<TcpProxySession>(std::move(Socket), m_Mapping, *this)); + DoAccept(); + }); +} + +#if defined(ASIO_HAS_LOCAL_SOCKETS) +void +TcpProxyService::DoAcceptUnix() +{ + m_UnixAcceptor.async_accept([this](const asio::error_code& Ec, asio::local::stream_protocol::socket Socket) { + if (Ec) + { + if (!m_Stopped) + { + ZEN_WARN("accept error - {}", Ec.message()); + } + return; + } + + ZEN_DEBUG("accepted unix connection"); + + OnAcceptedSession(std::make_shared<TcpProxySession>(std::move(Socket), m_Mapping, *this)); + DoAcceptUnix(); + }); +} +#endif + +} // namespace zen diff --git a/src/zenserver/proxy/tcpproxy.h b/src/zenserver/proxy/tcpproxy.h new file mode 100644 index 000000000..7eb5c8dff --- /dev/null +++ b/src/zenserver/proxy/tcpproxy.h @@ -0,0 +1,196 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "httptrafficinspector.h" +#include "httptrafficrecorder.h" + +#include <zencore/logging.h> +#include <zencore/thread.h> +#include <zentelemetry/stats.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <asio.hpp> +#if defined(ASIO_HAS_LOCAL_SOCKETS) +# include <asio/local/stream_protocol.hpp> +#endif +ZEN_THIRD_PARTY_INCLUDES_END + +#include <atomic> +#include <chrono> +#include <cstdint> +#include <memory> +#include <optional> +#include <string> +#include <vector> + +namespace zen { + +struct ProxyMapping +{ + std::string ListenAddress; + uint16_t ListenPort = 0; + std::string ListenUnixSocket; + std::string TargetHost; + uint16_t TargetPort = 0; + std::string TargetUnixSocket; + + bool IsUnixListen() const { return !ListenUnixSocket.empty(); } + bool IsUnixTarget() const { return !TargetUnixSocket.empty(); } + std::string ListenDescription() const; + std::string TargetDescription() const; +}; + +class TcpProxyService; + +class TcpProxySession : public std::enable_shared_from_this<TcpProxySession> +{ +public: + TcpProxySession(asio::ip::tcp::socket ClientSocket, const ProxyMapping& Mapping, TcpProxyService& Owner); +#if defined(ASIO_HAS_LOCAL_SOCKETS) + TcpProxySession(asio::local::stream_protocol::socket ClientSocket, const ProxyMapping& Mapping, TcpProxyService& Owner); +#endif + + void Start(); + + const std::string& GetClientLabel() const { return m_ClientLabel; } + std::chrono::steady_clock::time_point GetStartTime() const { return m_StartTime; } + uint64_t GetBytesFromClient() const { return m_BytesFromClient.load(std::memory_order_relaxed); } + uint64_t GetBytesToClient() const { return m_BytesToClient.load(std::memory_order_relaxed); } + uint64_t GetRequestCount() const { return m_RequestInspector ? m_RequestInspector->GetMessageCount() : 0; } + bool IsWebSocket() const { return m_RequestInspector && m_RequestInspector->IsUpgraded(); } + bool HasSessionId() const { return m_RequestInspector && m_RequestInspector->HasSessionId(); } + Oid GetSessionId() const { return m_RequestInspector ? m_RequestInspector->GetSessionId() : Oid::Zero; } + +private: + LoggerRef Log(); + + void ConnectToTcpTarget(); +#if defined(ASIO_HAS_LOCAL_SOCKETS) + void ConnectToUnixTarget(); +#endif + void StartRelay(); + + void ReadFromClient(); + void ReadFromUpstream(); + + template<typename Fn> + void DispatchClientSocket(Fn&& F); + template<typename Fn> + void DispatchUpstreamSocket(Fn&& F); + + template<typename SocketT> + void DoReadFromClient(SocketT& ClientSocket); + template<typename SocketT> + void DoReadFromUpstream(SocketT& UpstreamSocket); + template<typename SocketT> + void DoForwardToUpstream(SocketT& UpstreamSocket, size_t BytesToWrite, uint64_t NewRequests); + template<typename SocketT> + void DoForwardToClient(SocketT& ClientSocket, size_t BytesToWrite); + template<typename SocketT> + void DoShutdownSocket(SocketT& Socket); + + void Shutdown(); + + asio::ip::tcp::socket m_ClientTcpSocket; + asio::ip::tcp::socket m_UpstreamTcpSocket; +#if defined(ASIO_HAS_LOCAL_SOCKETS) + asio::local::stream_protocol::socket m_ClientUnixSocket; + asio::local::stream_protocol::socket m_UpstreamUnixSocket; + bool m_IsUnixClient = false; + bool m_IsUnixTarget = false; +#endif + + std::string m_TargetHost; + uint16_t m_TargetPort; + std::string m_TargetUnixSocket; + + TcpProxyService& m_Owner; + + static constexpr size_t kBufferSize = 16 * 1024; + + std::array<char, kBufferSize> m_ClientBuffer; + std::array<char, kBufferSize> m_UpstreamBuffer; + + std::atomic<bool> m_ShutdownCalled{false}; + + std::string m_ClientLabel; + std::chrono::steady_clock::time_point m_StartTime; + std::atomic<uint64_t> m_BytesFromClient{0}; + std::atomic<uint64_t> m_BytesToClient{0}; + + std::optional<HttpTrafficInspector> m_RequestInspector; + std::optional<HttpTrafficInspector> m_ResponseInspector; + std::unique_ptr<HttpTrafficRecorder> m_Recorder; +}; + +class TcpProxyService +{ +public: + TcpProxyService(asio::io_context& IoContext, const ProxyMapping& Mapping); + + void Start(); + void Stop(); + + const ProxyMapping& GetMapping() const { return m_Mapping; } + + uint64_t GetTotalConnections() const { return m_TotalConnections.load(std::memory_order_relaxed); } + uint64_t GetActiveConnections() const { return m_ActiveConnections.load(std::memory_order_relaxed); } + uint64_t GetPeakActiveConnections() const { return m_PeakActiveConnections.load(std::memory_order_relaxed); } + uint64_t GetTotalBytesFromClient() const { return m_TotalBytesFromClient.load(std::memory_order_relaxed); } + uint64_t GetTotalBytesToClient() const { return m_TotalBytesToClient.load(std::memory_order_relaxed); } + + metrics::Meter& GetRequestMeter() { return m_RequestMeter; } + metrics::Meter& GetBytesMeter() { return m_BytesMeter; } + + // Returns a snapshot of active sessions under a shared lock. + std::vector<std::shared_ptr<TcpProxySession>> GetActiveSessions() const; + + void SetRecording(bool Enabled, const std::string& Dir); + bool IsRecording() const { return m_RecordingEnabled.load(std::memory_order_relaxed); } + std::string GetRecordDir() const; + + LoggerRef Log() { return m_Log; } + +private: + friend class TcpProxySession; + + void DoAccept(); +#if defined(ASIO_HAS_LOCAL_SOCKETS) + void DoAcceptUnix(); +#endif + + void OnAcceptedSession(std::shared_ptr<TcpProxySession> Session); + + LoggerRef m_Log; + ProxyMapping m_Mapping; + asio::io_context& m_IoContext; + asio::ip::tcp::acceptor m_TcpAcceptor; + asio::ip::tcp::endpoint m_ListenEndpoint; +#if defined(ASIO_HAS_LOCAL_SOCKETS) + asio::local::stream_protocol::acceptor m_UnixAcceptor; +#endif + bool m_Stopped = false; + + void AddSession(std::shared_ptr<TcpProxySession> Session); + void RemoveSession(TcpProxySession* Session); + + std::atomic<uint64_t> m_TotalConnections{0}; + std::atomic<uint64_t> m_ActiveConnections{0}; + std::atomic<uint64_t> m_PeakActiveConnections{0}; + std::atomic<uint64_t> m_TotalBytesFromClient{0}; + std::atomic<uint64_t> m_TotalBytesToClient{0}; + + metrics::Meter m_RequestMeter; + metrics::Meter m_BytesMeter; + + mutable RwLock m_SessionsLock; + std::vector<std::shared_ptr<TcpProxySession>> m_Sessions; + + std::atomic<bool> m_RecordingEnabled{false}; + mutable RwLock m_RecordDirLock; + std::string m_RecordDir; + std::atomic<uint64_t> m_RecordSessionCounter{0}; +}; + +} // namespace zen diff --git a/src/zenserver/proxy/zenproxyserver.cpp b/src/zenserver/proxy/zenproxyserver.cpp new file mode 100644 index 000000000..1fd9cd2c4 --- /dev/null +++ b/src/zenserver/proxy/zenproxyserver.cpp @@ -0,0 +1,517 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenproxyserver.h" + +#include "frontend/frontend.h" +#include "proxy/httpproxystats.h" + +#include <zenhttp/httpapiservice.h> + +#include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/memory/llm.h> +#include <zencore/scopeguard.h> +#include <zencore/sentryintegration.h> +#include <zencore/string.h> +#include <zencore/thread.h> +#include <zencore/windows.h> +#include <zenutil/service.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cxxopts.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +////////////////////////////////////////////////////////////////////////// +// Configurator + +void +ZenProxyServerConfigurator::AddCliOptions(cxxopts::Options& Options) +{ + Options.add_option("proxy", + "", + "proxy-map", + "Proxy mapping (see documentation for full format)", + cxxopts::value<std::vector<std::string>>(m_RawProxyMappings), + ""); + + Options.parse_positional({"proxy-map"}); + Options.show_positional_help(); +} + +void +ZenProxyServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) +{ + ZEN_UNUSED(Options); +} + +void +ZenProxyServerConfigurator::ApplyOptions(cxxopts::Options& Options) +{ + ZEN_UNUSED(Options); +} + +void +ZenProxyServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions) +{ + ZEN_UNUSED(LuaOptions); +} + +static ProxyMapping +ParseProxyMapping(const std::string& Raw) +{ + // Preferred format using "=" as the listen/target separator: + // listen_spec=target_spec + // where listen_spec is [addr:]port or unix:path + // and target_spec is host:port or unix:path + // + // Examples: + // 9000=127.0.0.1:8558 (TCP -> TCP) + // 10.0.0.1:9000=10.0.0.2:8558 (TCP -> TCP) + // 9000=unix:/tmp/target.sock (TCP -> Unix) + // 9000=unix:C:\Users\foo\zen.sock (TCP -> Unix, Windows path) + // unix:/tmp/listen.sock=localhost:8558 (Unix -> TCP) + // unix:C:\foo\l.sock=unix:C:\foo\t.sock (Unix -> Unix, Windows paths) + // + // Legacy format using colon-only separators (no "=" present): + // [listen_addr:]listen_port:target_host:target_port (TCP -> TCP) + // [listen_addr:]listen_port:unix:target_socket_path (TCP -> Unix) + // unix:listen_socket_path:target_host:target_port (Unix -> TCP, path must not contain colons) + // unix:listen_socket_path:unix:target_socket_path (Unix -> Unix, listen path must not contain colons) + + auto ThrowBadMapping = [&](std::string_view Detail) { + throw OptionParseException(fmt::format("invalid proxy mapping '{}': {}", Raw, Detail), ""); + }; + + auto ParsePort = [&](std::string_view Field) -> uint16_t { + std::optional<uint16_t> Port = ParseInt<uint16_t>(Field); + if (!Port) + { + ThrowBadMapping(fmt::format("'{}' is not a valid port number", Field)); + } + return *Port; + }; + + auto RequireNonEmpty = [&](std::string_view Value, std::string_view Label) { + if (Value.empty()) + { + ThrowBadMapping(fmt::format("empty {}", Label)); + } + }; + + // Parse a listen spec: [addr:]port or unix:path + auto ParseListenSpec = [&](std::string_view Spec, ProxyMapping& Out) { + if (Spec.substr(0, 5) == "unix:") + { + Out.ListenUnixSocket = Spec.substr(5); + RequireNonEmpty(Out.ListenUnixSocket, "listen unix socket path"); + } + else + { + size_t ColonPos = Spec.find(':'); + if (ColonPos == std::string_view::npos) + { + Out.ListenPort = ParsePort(Spec); + } + else + { + Out.ListenAddress = Spec.substr(0, ColonPos); + Out.ListenPort = ParsePort(Spec.substr(ColonPos + 1)); + } + } + }; + + // Parse a target spec: host:port or unix:path + auto ParseTargetSpec = [&](std::string_view Spec, ProxyMapping& Out) { + if (Spec.substr(0, 5) == "unix:") + { + Out.TargetUnixSocket = Spec.substr(5); + RequireNonEmpty(Out.TargetUnixSocket, "target unix socket path"); + } + else + { + size_t ColonPos = Spec.rfind(':'); + if (ColonPos == std::string_view::npos) + { + ThrowBadMapping("target must be host:port or unix:path"); + } + Out.TargetHost = Spec.substr(0, ColonPos); + Out.TargetPort = ParsePort(Spec.substr(ColonPos + 1)); + } + }; + + ProxyMapping Mapping; + + // Check for the "=" separator first. + size_t EqPos = Raw.find('='); + if (EqPos != std::string::npos) + { + std::string_view ListenSpec = std::string_view(Raw).substr(0, EqPos); + std::string_view TargetSpec = std::string_view(Raw).substr(EqPos + 1); + + RequireNonEmpty(ListenSpec, "listen spec"); + RequireNonEmpty(TargetSpec, "target spec"); + + ParseListenSpec(ListenSpec, Mapping); + ParseTargetSpec(TargetSpec, Mapping); + return Mapping; + } + + // Legacy colon-only format. Extract fields left-to-right; when we encounter the + // "unix" keyword, everything after the next colon is the socket path taken verbatim. + // Listen-side unix socket paths must not contain colons in this format. + + auto RequireColon = [&](size_t From) -> size_t { + size_t Pos = Raw.find(':', From); + if (Pos == std::string::npos) + { + ThrowBadMapping("expected [listen_addr:]listen_port:target_host:target_port or use '=' separator"); + } + return Pos; + }; + + size_t Pos1 = RequireColon(0); + std::string Field1 = Raw.substr(0, Pos1); + + size_t Pos2 = RequireColon(Pos1 + 1); + std::string Field2 = Raw.substr(Pos1 + 1, Pos2 - Pos1 - 1); + + // unix:listen_path:... + if (Field1 == "unix") + { + Mapping.ListenUnixSocket = Field2; + RequireNonEmpty(Mapping.ListenUnixSocket, "listen unix socket path"); + + ParseTargetSpec(std::string_view(Raw).substr(Pos2 + 1), Mapping); + return Mapping; + } + + // listen_port:unix:target_socket_path + if (Field2 == "unix") + { + Mapping.ListenPort = ParsePort(Field1); + Mapping.TargetUnixSocket = Raw.substr(Pos2 + 1); + RequireNonEmpty(Mapping.TargetUnixSocket, "target unix socket path"); + return Mapping; + } + + size_t Pos3 = Raw.find(':', Pos2 + 1); + if (Pos3 == std::string::npos) + { + // listen_port:target_host:target_port + Mapping.ListenPort = ParsePort(Field1); + Mapping.TargetHost = Field2; + Mapping.TargetPort = ParsePort(std::string_view(Raw).substr(Pos2 + 1)); + return Mapping; + } + + std::string Field3 = Raw.substr(Pos2 + 1, Pos3 - Pos2 - 1); + + // listen_addr:listen_port:unix:target_socket_path + if (Field3 == "unix") + { + Mapping.ListenAddress = Field1; + Mapping.ListenPort = ParsePort(Field2); + Mapping.TargetUnixSocket = Raw.substr(Pos3 + 1); + RequireNonEmpty(Mapping.TargetUnixSocket, "target unix socket path"); + return Mapping; + } + + // listen_addr:listen_port:target_host:target_port + std::string Field4 = Raw.substr(Pos3 + 1); + if (Field4.find(':') != std::string::npos) + { + ThrowBadMapping("expected [listen_addr:]listen_port:target_host:target_port or use '=' separator"); + } + + Mapping.ListenAddress = Field1; + Mapping.ListenPort = ParsePort(Field2); + Mapping.TargetHost = Field3; + Mapping.TargetPort = ParsePort(Field4); + return Mapping; +} + +void +ZenProxyServerConfigurator::ValidateOptions() +{ + if (m_ServerOptions.BasePort == 0) + { + m_ServerOptions.BasePort = ZenProxyServerConfig::kDefaultProxyPort; + } + + if (m_ServerOptions.DataDir.empty()) + { + std::filesystem::path SystemRoot = m_ServerOptions.SystemRootDir; + if (SystemRoot.empty()) + { + SystemRoot = PickDefaultSystemRootDirectory(); + } + if (!SystemRoot.empty()) + { + m_ServerOptions.DataDir = SystemRoot / "Proxy"; + } + } + + for (const std::string& Raw : m_RawProxyMappings) + { + // The mode keyword "proxy" from argv[1] gets captured as a positional + // argument — skip it. + if (Raw == "proxy") + { + continue; + } + + m_ServerOptions.ProxyMappings.push_back(ParseProxyMapping(Raw)); + } +} + +////////////////////////////////////////////////////////////////////////// +// ZenProxyServer + +ZenProxyServer::ZenProxyServer() +{ +} + +ZenProxyServer::~ZenProxyServer() +{ + Cleanup(); +} + +int +ZenProxyServer::Initialize(const ZenProxyServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry) +{ + ZEN_TRACE_CPU("ZenProxyServer::Initialize"); + ZEN_MEMSCOPE(GetZenserverTag()); + + ZEN_INFO(ZEN_APP_NAME " initializing in PROXY server mode"); + + const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry); + if (EffectiveBasePort < 0) + { + return EffectiveBasePort; + } + + for (const ProxyMapping& Mapping : ServerConfig.ProxyMappings) + { + auto Service = std::make_unique<TcpProxyService>(m_ProxyIoContext, Mapping); + Service->Start(); + m_ProxyServices.push_back(std::move(Service)); + } + + // Keep the io_context alive even when there is no pending work, so that + // worker threads don't exit prematurely between async operations. + m_ProxyIoWorkGuard = std::make_unique<asio::io_context::work>(m_ProxyIoContext); + + // Start proxy I/O worker threads. Use a modest thread count — proxy work is + // I/O-bound so we don't need a thread per core, but having more than one + // avoids head-of-line blocking when many connections are active. + unsigned int ThreadCount = std::max(GetHardwareConcurrency() / 4, 4u); + + for (unsigned int i = 0; i < ThreadCount; ++i) + { + m_ProxyIoThreads.emplace_back([this, i] { + ExtendableStringBuilder<32> ThreadName; + ThreadName << "proxy_io_" << i; + SetCurrentThreadName(ThreadName); + m_ProxyIoContext.run(); + }); + } + + ZEN_INFO("proxy I/O thread pool started with {} threads", ThreadCount); + + m_ApiService = std::make_unique<HttpApiService>(*m_Http); + m_Http->RegisterService(*m_ApiService); + + m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService); + m_Http->RegisterService(*m_FrontendService); + + std::string DefaultRecordDir = (m_DataRoot / "recordings").string(); + m_ProxyStatsService = std::make_unique<HttpProxyStatsService>(m_ProxyServices, m_StatsService, std::move(DefaultRecordDir)); + m_Http->RegisterService(*m_ProxyStatsService); + + EnsureIoRunner(); + + ZenServerBase::Finalize(); + + return EffectiveBasePort; +} + +void +ZenProxyServer::Run() +{ + if (m_ProcessMonitor.IsActive()) + { + CheckOwnerPid(); + } + + if (!m_TestMode) + { + // clang-format off + ZEN_INFO(R"(__________ __________ )" "\n" + R"(\____ /____ ____ \______ \_______ _______ ______.__. )" "\n" + R"( / // __ \ / \ | ___/\_ __ \/ _ \ \/ < | | )" "\n" + R"( / /\ ___/| | \ | | | | \( <_> > < \___ | )" "\n" + R"(/_______ \___ >___| / |____| |__| \____/__/\_ \/ ____| )" "\n" + R"( \/ \/ \/ \/\/ )"); + // clang-format on + + ExtendableStringBuilder<256> BuildOptions; + GetBuildOptions(BuildOptions, '\n'); + ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions); + } + + ZEN_INFO(ZEN_APP_NAME " now running as PROXY (pid: {})", GetCurrentProcessId()); + +#if ZEN_PLATFORM_WINDOWS + if (zen::windows::IsRunningOnWine()) + { + ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well"); + } +#endif + +#if ZEN_USE_SENTRY + ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); + if (m_UseSentry) + { + SentryIntegration::ClearCaches(); + } +#endif + + const bool IsInteractiveMode = IsInteractiveSession(); + + SetNewState(kRunning); + + OnReady(); + + m_Http->Run(IsInteractiveMode); + + SetNewState(kShuttingDown); + + ZEN_INFO(ZEN_APP_NAME " exiting"); +} + +void +ZenProxyServer::Cleanup() +{ + ZEN_TRACE_CPU("ZenProxyServer::Cleanup"); + ZEN_INFO(ZEN_APP_NAME " cleaning up"); + try + { + for (auto& Service : m_ProxyServices) + { + Service->Stop(); + } + + m_ProxyIoWorkGuard.reset(); + m_ProxyIoContext.stop(); + for (auto& Thread : m_ProxyIoThreads) + { + if (Thread.joinable()) + { + Thread.join(); + } + } + m_ProxyIoThreads.clear(); + m_ProxyServices.clear(); + + m_IoContext.stop(); + if (m_IoRunner.joinable()) + { + m_IoRunner.join(); + } + + m_ProxyStatsService.reset(); + m_FrontendService.reset(); + m_ApiService.reset(); + + ShutdownServices(); + if (m_Http) + { + m_Http->Close(); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); + } +} + +////////////////////////////////////////////////////////////////////////// +// ZenProxyServerMain + +ZenProxyServerMain::ZenProxyServerMain(ZenProxyServerConfig& ServerOptions) : ZenServerMain(ServerOptions), m_ServerOptions(ServerOptions) +{ +} + +void +ZenProxyServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) +{ + ZenProxyServer Server; + Server.SetServerMode("Proxy"); + Server.SetDataRoot(m_ServerOptions.DataDir); + Server.SetContentRoot(m_ServerOptions.ContentDir); + Server.SetTestMode(m_ServerOptions.IsTest); + Server.SetDedicatedMode(m_ServerOptions.IsDedicated); + + const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry); + if (EffectiveBasePort == -1) + { + std::exit(1); + } + + Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); + if (EffectiveBasePort != m_ServerOptions.BasePort) + { + ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); + m_ServerOptions.BasePort = EffectiveBasePort; + } + + std::unique_ptr<std::thread> ShutdownThread; + std::unique_ptr<NamedEvent> ShutdownEvent; + + ExtendableStringBuilder<64> ShutdownEventName; + ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown"; + ShutdownEvent.reset(new NamedEvent{ShutdownEventName}); + + ShutdownThread.reset(new std::thread{[&] { + SetCurrentThreadName("shutdown_mon"); + + ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId()); + + if (ShutdownEvent->Wait()) + { + ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId()); + Server.RequestExit(0); + } + else + { + ZEN_INFO("shutdown signal wait() failed"); + } + }}); + + auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] { + ReportServiceStatus(ServiceStatus::Stopping); + + if (ShutdownEvent) + { + ShutdownEvent->Set(); + } + if (ShutdownThread && ShutdownThread->joinable()) + { + ShutdownThread->join(); + } + }); + + Server.SetIsReadyFunc([&] { + std::error_code Ec; + m_LockFile.Update(MakeLockData(true), Ec); + ReportServiceStatus(ServiceStatus::Running); + NotifyReady(); + }); + + Server.Run(); +} + +} // namespace zen diff --git a/src/zenserver/proxy/zenproxyserver.h b/src/zenserver/proxy/zenproxyserver.h new file mode 100644 index 000000000..7dad748cf --- /dev/null +++ b/src/zenserver/proxy/zenproxyserver.h @@ -0,0 +1,96 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zenserver.h" + +#include "proxy/tcpproxy.h" + +#include <memory> +#include <thread> +#include <vector> + +namespace zen { +class HttpApiService; +class HttpFrontendService; +class HttpProxyStatsService; +} // namespace zen + +namespace cxxopts { +class Options; +} +namespace zen::LuaConfig { +struct Options; +} + +namespace zen { + +struct ZenProxyServerConfig : public ZenServerConfig +{ + static constexpr int kDefaultProxyPort = 8118; + + std::vector<ProxyMapping> ProxyMappings; +}; + +struct ZenProxyServerConfigurator : public ZenServerConfiguratorBase +{ + ZenProxyServerConfigurator(ZenProxyServerConfig& ServerOptions) + : ZenServerConfiguratorBase(ServerOptions) + , m_ServerOptions(ServerOptions) + { + } + + ~ZenProxyServerConfigurator() = default; + +private: + virtual void AddCliOptions(cxxopts::Options& Options) override; + virtual void AddConfigOptions(LuaConfig::Options& Options) override; + virtual void ApplyOptions(cxxopts::Options& Options) override; + virtual void OnConfigFileParsed(LuaConfig::Options& LuaOptions) override; + virtual void ValidateOptions() override; + + ZenProxyServerConfig& m_ServerOptions; + + std::vector<std::string> m_RawProxyMappings; +}; + +class ZenProxyServerMain : public ZenServerMain +{ +public: + ZenProxyServerMain(ZenProxyServerConfig& ServerOptions); + virtual void DoRun(ZenServerState::ZenServerEntry* Entry) override; + + ZenProxyServerMain(const ZenProxyServerMain&) = delete; + ZenProxyServerMain& operator=(const ZenProxyServerMain&) = delete; + + typedef ZenProxyServerConfig Config; + typedef ZenProxyServerConfigurator Configurator; + +private: + ZenProxyServerConfig& m_ServerOptions; +}; + +class ZenProxyServer : public ZenServerBase +{ + ZenProxyServer& operator=(ZenProxyServer&&) = delete; + ZenProxyServer(ZenProxyServer&&) = delete; + +public: + ZenProxyServer(); + ~ZenProxyServer(); + + int Initialize(const ZenProxyServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry); + void Run(); + void Cleanup(); + +private: + asio::io_context m_ProxyIoContext; + std::unique_ptr<asio::io_context::work> m_ProxyIoWorkGuard; + std::vector<std::thread> m_ProxyIoThreads; + std::vector<std::unique_ptr<TcpProxyService>> m_ProxyServices; + std::unique_ptr<HttpApiService> m_ApiService; + std::unique_ptr<HttpFrontendService> m_FrontendService; + std::unique_ptr<HttpProxyStatsService> m_ProxyStatsService; +}; + +} // namespace zen diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua index f2ed17f05..3cfaa956d 100644 --- a/src/zenserver/xmake.lua +++ b/src/zenserver/xmake.lua @@ -30,6 +30,7 @@ target("zenserver") add_deps("protozero", "asio", "cxxopts") add_deps("sol2") + add_packages("http_parser") add_packages("json11") add_packages("lua") add_packages("consul") diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 49ae1b6ff..ad8d18dc0 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -456,6 +456,7 @@ ZenServerBase::HandleStatusRequest(HttpServerRequest& Request) Cbo << "state" << ToString(m_CurrentState); Cbo << "hostname" << GetMachineName(); Cbo << "cpuUsagePercent" << Metrics.CpuUsagePercent; + Cbo << "serverMode" << std::string_view(m_ServerMode); Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h index c06093f0d..2b9d68aee 100644 --- a/src/zenserver/zenserver.h +++ b/src/zenserver/zenserver.h @@ -48,6 +48,7 @@ public: void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } + void SetServerMode(std::string_view Mode) { m_ServerMode = Mode; } void SetTestMode(bool State) { m_TestMode = State; } protected: @@ -64,9 +65,10 @@ protected: bool m_UseSentry = false; bool m_IsPowerCycle = false; - bool m_IsDedicatedMode = false; - bool m_TestMode = false; - bool m_DebugOptionForcedCrash = false; + bool m_IsDedicatedMode = false; + bool m_TestMode = false; + bool m_DebugOptionForcedCrash = false; + std::string m_ServerMode = "Server"; std::thread m_IoRunner; asio::io_context m_IoContext; |