// Copyright Epic Games, Inc. All Rights Reserved. #include "bench_cmd.h" #include "bench.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include static std::atomic s_BenchAbort{false}; namespace zen { ////////////////////////////////////////////////////////////////////////// // BenchPurgeSubCmd BenchPurgeSubCmd::BenchPurgeSubCmd() : ZenSubCmdBase("purge", "Purge standby memory (system cache)") { SubOptions().add_options()("single", "Do not spawn child processes", cxxopts::value(m_SingleProcess)->default_value("false")); } void BenchPurgeSubCmd::Run(const ZenCliOptions& GlobalOptions) { ZEN_UNUSED(GlobalOptions); bool Ok = false; zen::Stopwatch Timer; try { zen::bench::util::EmptyStandByList(); Ok = true; } catch (const zen::bench::util::elevation_required_exception&) { ZEN_CONSOLE_WARN("Purging standby lists requires elevation. Will try launch as elevated process"); } catch (const std::exception& Ex) { ZEN_CONSOLE_ERROR("{}", Ex.what()); } #if ZEN_PLATFORM_WINDOWS if (!Ok && !m_SingleProcess) { try { zen::CreateProcOptions Cpo; Cpo.Flags = zen::CreateProcOptions::Flag_Elevated | zen::CreateProcOptions::Flag_NewConsole; std::filesystem::path CurExe{zen::GetRunningExecutablePath()}; if (zen::CreateProcResult Cpr = zen::CreateProc(CurExe, fmt::format("bench purge --single"), Cpo)) { zen::ProcessHandle ProcHandle; ProcHandle.Initialize(Cpr); int ExitCode = ProcHandle.WaitExitCode(); if (ExitCode == 0) { Ok = true; } else { ZEN_CONSOLE_ERROR("Elevated child process failed with return code {}", ExitCode); } } } catch (const std::exception& Ex) { ZEN_CONSOLE_ERROR("{}", Ex.what()); } } #endif if (Ok) { // TODO: could also add reporting on just how much memory was purged ZEN_CONSOLE("Purged standby lists! (took {})", zen::NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } } ////////////////////////////////////////////////////////////////////////// // BenchHttpSubCmd BenchHttpSubCmd::BenchHttpSubCmd() : ZenSubCmdBase("http", "Benchmark an HTTP server") { SubOptions().add_option("", "u", "url", "URL to benchmark", cxxopts::value(m_Url), ""); SubOptions().add_option("", "n", "count", "Number of requests to send", cxxopts::value(m_Count)->default_value("100"), ""); SubOptions().add_option("", "c", "concurrency", "Number of concurrent threads", cxxopts::value(m_Concurrency)->default_value("1"), ""); SubOptions().add_option("", "", "method", "HTTP method to use (GET, HEAD)", cxxopts::value(m_Method)->default_value("GET"), ""); SubOptions() .add_option("", "", "unix-socket", "Unix domain socket path (overrides TCP)", cxxopts::value(m_SocketPath), ""); SubOptions().add_options()("no-keepalive", "Close connection after each request (disables keep-alive)", cxxopts::value(m_NoKeepAlive)->default_value("false")); SubOptions().add_options()("continuous", "Run until interrupted (Ctrl+C), printing metrics once per second", cxxopts::value(m_Continuous)->default_value("false")); SubOptions().parse_positional({"url"}); } static std::pair SplitUrl(std::string_view Url) { size_t SchemeEnd = Url.find("://"); size_t SearchFrom = (SchemeEnd != std::string_view::npos) ? SchemeEnd + 3 : 0; size_t PathStart = Url.find('/', SearchFrom); if (PathStart == std::string_view::npos) { return {std::string(Url), "/"}; } return {std::string(Url.substr(0, PathStart)), std::string(Url.substr(PathStart))}; } void BenchHttpSubCmd::Run(const ZenCliOptions& GlobalOptions) { ZEN_UNUSED(GlobalOptions); if (m_Url.empty()) { throw OptionParseException("URL is required", SubOptions().help()); } if (!m_Continuous && m_Count <= 0) { throw OptionParseException("--count must be a positive integer", SubOptions().help()); } if (m_Concurrency <= 0) { throw OptionParseException("--concurrency must be a positive integer", SubOptions().help()); } if (m_Method != "GET" && m_Method != "HEAD") { throw OptionParseException(fmt::format("Unsupported HTTP method '{}'. Supported: GET, HEAD", m_Method), SubOptions().help()); } auto [BaseUri, Path] = SplitUrl(m_Url); std::string ModeStr = m_Continuous ? "continuous" : fmt::format("count={}", m_Count); if (m_SocketPath.empty()) { ZEN_CONSOLE("Benchmarking {} {} ({}, concurrency={})", m_Method, m_Url, ModeStr, m_Concurrency); } else { ZEN_CONSOLE("Benchmarking {} {} via {} ({}, concurrency={})", m_Method, m_Url, m_SocketPath, ModeStr, m_Concurrency); } // Probe for a zenserver identity. If the target exposes /health/info and the // response contains a BuildVersion field we print a short summary. Any failure // (non-zenserver, timeout, unreachable) is silently ignored. try { HttpClientSettings ProbeSettings{.ConnectTimeout = std::chrono::milliseconds(2000), .Timeout = std::chrono::milliseconds(2000), .UnixSocketPath = m_SocketPath}; HttpClient ProbeHttp(BaseUri, ProbeSettings); HttpClient::Response ProbeResp = ProbeHttp.Get("/health/info"); if (ProbeResp.IsSuccess()) { CbObject Info = ProbeResp.AsObject(); std::string_view BuildVersion = Info["BuildVersion"].AsString(); if (!BuildVersion.empty()) { std::string_view Hostname = Info["Hostname"].AsString(); int64_t Pid = Info["Pid"].AsInt64(); std::string_view HttpServerClass = Info["HttpServerClass"].AsString(); ZEN_CONSOLE("Remote : zenserver {} on {} (pid {}, {})", BuildVersion, Hostname, Pid, HttpServerClass); std::string_view OS = Info["OS"].AsString(); std::string_view Arch = Info["Arch"].AsString(); CbObjectView System = Info["System"].AsObjectView(); int64_t LpCount = System["lp_count"].AsInt64(); int64_t TotalMemMiB = System["total_memory_mb"].AsInt64(); ZEN_CONSOLE(" : {}, {}, {} logical processors, {} RAM", OS, Arch, LpCount, NiceBytes(static_cast(TotalMemMiB) * 1024 * 1024)); } } } catch (...) { } if (m_Continuous) { RunContinuous(BaseUri, Path); } else { RunFixedCount(BaseUri, Path); } } void BenchHttpSubCmd::RunFixedCount(const std::string& BaseUri, const std::string& Path) { std::atomic NextRequest{0}; std::vector AllLatencies; AllLatencies.reserve(m_Count); std::mutex LatencyMutex; std::atomic ErrorCount{0}; std::atomic TotalDownloadedBytes{0}; std::atomic TotalUploadedBytes{0}; Stopwatch Timer; auto WorkerFn = [&]() { std::vector LocalLatencies; HttpClientSettings Settings{.UnixSocketPath = m_SocketPath, .ForbidReuseConnection = m_NoKeepAlive}; HttpClient Http(BaseUri, Settings); while (true) { int RequestIndex = NextRequest.fetch_add(1); if (RequestIndex >= m_Count) { break; } try { HttpClient::Response Resp = (m_Method == "HEAD") ? Http.Head(Path) : Http.Get(Path); if (Resp.IsSuccess()) { LocalLatencies.push_back(Resp.ElapsedSeconds); TotalDownloadedBytes.fetch_add(Resp.DownloadedBytes); TotalUploadedBytes.fetch_add(Resp.UploadedBytes); } else { ErrorCount.fetch_add(1); } } catch (const HttpClientError&) { ErrorCount.fetch_add(1); } } std::lock_guard Lock(LatencyMutex); AllLatencies.insert(AllLatencies.end(), LocalLatencies.begin(), LocalLatencies.end()); }; std::vector Threads; Threads.reserve(m_Concurrency); for (int i = 0; i < m_Concurrency; ++i) { Threads.emplace_back(WorkerFn); } for (std::thread& T : Threads) { T.join(); } double TotalSeconds = Timer.GetElapsedTimeMs() / 1000.0; int SuccessCount = static_cast(AllLatencies.size()); int TotalCount = SuccessCount + ErrorCount.load(); std::sort(AllLatencies.begin(), AllLatencies.end()); auto PercentileMs = [&](int Pct) -> double { if (AllLatencies.empty()) { return 0.0; } size_t Index = std::min(AllLatencies.size() * static_cast(Pct) / 100, AllLatencies.size() - 1); return AllLatencies[Index] * 1000.0; }; double SumMs = 0.0; for (double L : AllLatencies) { SumMs += L * 1000.0; } double MeanMs = SuccessCount > 0 ? SumMs / SuccessCount : 0.0; double Rps = TotalSeconds > 0.0 ? TotalCount / TotalSeconds : 0.0; uint64_t DownBytesPerSec = TotalSeconds > 0.0 ? static_cast(TotalDownloadedBytes.load() / TotalSeconds) : 0; uint64_t UpBytesPerSec = TotalSeconds > 0.0 ? static_cast(TotalUploadedBytes.load() / TotalSeconds) : 0; ZEN_CONSOLE(" Requests : {:L} total, {:L} success, {:L} errors", TotalCount, SuccessCount, ErrorCount.load()); ZEN_CONSOLE(" Latency : min={:.1f}ms mean={:.1f}ms p50={:.1f}ms p95={:.1f}ms p99={:.1f}ms max={:.1f}ms", PercentileMs(0), MeanMs, PercentileMs(50), PercentileMs(95), PercentileMs(99), PercentileMs(100)); ZEN_CONSOLE(" Throughput: {:.1f} req/s down: {}/s up: {}/s (elapsed: {:.2f}s)", Rps, NiceBytes(DownBytesPerSec), NiceBytes(UpBytesPerSec), TotalSeconds); } void BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Path) { s_BenchAbort.store(false); auto PrevSigInt = std::signal(SIGINT, [](int) { s_BenchAbort.store(true); }); auto PrevSigTerm = std::signal(SIGTERM, [](int) { s_BenchAbort.store(true); }); metrics::Histogram LatencyHistogram; std::atomic IntervalSuccessCount{0}; std::atomic IntervalErrorCount{0}; std::atomic IntervalDownloadBytes{0}; std::atomic IntervalUploadBytes{0}; std::atomic TotalSuccessCount{0}; std::atomic TotalErrorCount{0}; std::atomic TotalDownloadBytes{0}; std::atomic TotalUploadBytes{0}; Stopwatch RunTimer; auto WorkerFn = [&]() { HttpClientSettings Settings{.UnixSocketPath = m_SocketPath, .ForbidReuseConnection = m_NoKeepAlive}; HttpClient Http(BaseUri, Settings); while (!s_BenchAbort.load(std::memory_order_relaxed)) { try { HttpClient::Response Resp = (m_Method == "HEAD") ? Http.Head(Path) : Http.Get(Path); if (Resp.IsSuccess()) { LatencyHistogram.Update(static_cast(Resp.ElapsedSeconds * 1.0e6)); IntervalSuccessCount.fetch_add(1, std::memory_order_relaxed); IntervalDownloadBytes.fetch_add(Resp.DownloadedBytes, std::memory_order_relaxed); IntervalUploadBytes.fetch_add(Resp.UploadedBytes, std::memory_order_relaxed); TotalSuccessCount.fetch_add(1, std::memory_order_relaxed); TotalDownloadBytes.fetch_add(Resp.DownloadedBytes, std::memory_order_relaxed); TotalUploadBytes.fetch_add(Resp.UploadedBytes, std::memory_order_relaxed); } else { IntervalErrorCount.fetch_add(1, std::memory_order_relaxed); TotalErrorCount.fetch_add(1, std::memory_order_relaxed); } } catch (const HttpClientError&) { IntervalErrorCount.fetch_add(1, std::memory_order_relaxed); TotalErrorCount.fetch_add(1, std::memory_order_relaxed); } } }; auto ReporterFn = [&]() { while (!s_BenchAbort.load(std::memory_order_relaxed)) { // Sleep 1s in short increments to stay responsive to abort for (int i = 0; i < 10 && !s_BenchAbort.load(std::memory_order_relaxed); ++i) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } if (s_BenchAbort.load(std::memory_order_relaxed)) { break; } // Snapshot and reset per-interval counters int64_t Successes = IntervalSuccessCount.exchange(0); int64_t Errors = IntervalErrorCount.exchange(0); int64_t DownBytes = IntervalDownloadBytes.exchange(0); int64_t UpBytes = IntervalUploadBytes.exchange(0); // Snapshot and reset latency histogram uint64_t HistCount = LatencyHistogram.Count(); int64_t HistMin = LatencyHistogram.Min(); int64_t HistMax = LatencyHistogram.Max(); double HistMean = LatencyHistogram.Mean(); metrics::SampleSnapshot Snap = LatencyHistogram.Snapshot(); LatencyHistogram.Clear(); // Format elapsed as HH:MM:SS int TotalSec = static_cast(RunTimer.GetElapsedTimeMs() / 1000.0); int Hours = TotalSec / 3600; int Minutes = (TotalSec % 3600) / 60; int Secs = TotalSec % 60; if (HistCount > 0) { ZEN_CONSOLE( "[{:02d}:{:02d}:{:02d}] req/s: {:L} errors: {:L} lat(ms): min={:.1f} mean={:.1f} p95={:.1f} p99={:.1f} max={:.1f} " "down: {}/s up: {}/s", Hours, Minutes, Secs, Successes, Errors, HistMin / 1000.0, HistMean / 1000.0, Snap.Get95Percentile() / 1000.0, Snap.Get99Percentile() / 1000.0, HistMax / 1000.0, NiceBytes(static_cast(std::max(int64_t{0}, DownBytes))), NiceBytes(static_cast(std::max(int64_t{0}, UpBytes)))); } else { ZEN_CONSOLE("[{:02d}:{:02d}:{:02d}] req/s: 0 errors: {:L} (no successful requests)", Hours, Minutes, Secs, Errors); } } }; std::vector Threads; Threads.reserve(m_Concurrency + 1); Threads.emplace_back(ReporterFn); for (int i = 0; i < m_Concurrency; ++i) { Threads.emplace_back(WorkerFn); } for (std::thread& T : Threads) { T.join(); } std::signal(SIGINT, PrevSigInt); std::signal(SIGTERM, PrevSigTerm); double TotalSeconds = RunTimer.GetElapsedTimeMs() / 1000.0; int64_t TotalCount = TotalSuccessCount.load() + TotalErrorCount.load(); uint64_t DownPerSec = TotalSeconds > 0.0 ? static_cast(TotalDownloadBytes.load() / TotalSeconds) : 0; uint64_t UpPerSec = TotalSeconds > 0.0 ? static_cast(TotalUploadBytes.load() / TotalSeconds) : 0; ZEN_CONSOLE("Stopped. Total: {:L} requests, {:L} success, {:L} errors avg throughput: down {}/s up {}/s (elapsed: {:.2f}s)", TotalCount, TotalSuccessCount.load(), TotalErrorCount.load(), NiceBytes(DownPerSec), NiceBytes(UpPerSec), TotalSeconds); } ////////////////////////////////////////////////////////////////////////// // BenchCommand BenchCommand::BenchCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("__hidden__", "", "subcommand", "", cxxopts::value(m_SubCommand)->default_value(""), ""); m_Options.parse_positional({"subcommand"}); AddSubCommand(m_PurgeSubCmd); AddSubCommand(m_HttpSubCmd); } BenchCommand::~BenchCommand() = default; } // namespace zen