diff options
Diffstat (limited to 'src/zen/cmds/bench_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/bench_cmd.cpp | 516 |
1 files changed, 469 insertions, 47 deletions
diff --git a/src/zen/cmds/bench_cmd.cpp b/src/zen/cmds/bench_cmd.cpp index b9c45a328..658b42da6 100644 --- a/src/zen/cmds/bench_cmd.cpp +++ b/src/zen/cmds/bench_cmd.cpp @@ -3,6 +3,7 @@ #include "bench_cmd.h" #include "bench.h" +#include <zencore/compactbinary.h> #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> @@ -11,93 +12,514 @@ #include <zencore/string.h> #include <zencore/thread.h> #include <zencore/timer.h> +#include <zenhttp/httpclient.h> +#include <zentelemetry/stats.h> + +#include <algorithm> +#include <atomic> +#include <csignal> +#include <mutex> +#include <thread> + +static std::atomic<bool> s_BenchAbort{false}; namespace zen { -BenchCommand::BenchCommand() +////////////////////////////////////////////////////////////////////////// +// BenchPurgeSubCmd + +BenchPurgeSubCmd::BenchPurgeSubCmd() +: ZenSubCmdBase("purge", "Purge standby memory (system cache)") { - m_Options.add_options()("h,help", "Print help"); - m_Options.add_options()("purge", - "Purge standby memory (system cache)", - cxxopts::value<bool>(m_PurgeStandbyLists)->default_value("false")); - m_Options.add_options()("single", "Do not spawn child processes", cxxopts::value<bool>(m_SingleProcess)->default_value("false")); + SubOptions().add_options()("single", + "Do not spawn child processes", + cxxopts::value<bool>(m_SingleProcess)->default_value("false")); } -BenchCommand::~BenchCommand() = default; - void -BenchCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +BenchPurgeSubCmd::Run(const ZenCliOptions& GlobalOptions) { ZEN_UNUSED(GlobalOptions); - if (!ParseOptions(argc, argv)) + bool Ok = false; + + zen::Stopwatch Timer; + + try + { + zen::bench::util::EmptyStandByList(); + + Ok = true; + } + catch (const zen::bench::util::elevation_required_exception&) + { + ZEN_CONSOLE_WARN("Purging standby lists requires elevation. Will try launch as elevated process"); + } + catch (const std::exception& Ex) { - return; + ZEN_CONSOLE_ERROR("{}", Ex.what()); } #if ZEN_PLATFORM_WINDOWS - if (m_PurgeStandbyLists) + if (!Ok && !m_SingleProcess) { - bool Ok = false; - - zen::Stopwatch Timer; - try { - zen::bench::util::EmptyStandByList(); + zen::CreateProcOptions Cpo; + Cpo.Flags = zen::CreateProcOptions::Flag_Elevated | zen::CreateProcOptions::Flag_NewConsole; - Ok = true; - } - catch (const zen::bench::util::elevation_required_exception&) - { - ZEN_CONSOLE_WARN("Purging standby lists requires elevation. Will try launch as elevated process"); + std::filesystem::path CurExe{zen::GetRunningExecutablePath()}; + + if (zen::CreateProcResult Cpr = zen::CreateProc(CurExe, fmt::format("bench purge --single"), Cpo)) + { + zen::ProcessHandle ProcHandle; + ProcHandle.Initialize(Cpr); + + int ExitCode = ProcHandle.WaitExitCode(); + + if (ExitCode == 0) + { + Ok = true; + } + else + { + ZEN_CONSOLE_ERROR("Elevated child process failed with return code {}", ExitCode); + } + } } catch (const std::exception& Ex) { ZEN_CONSOLE_ERROR("{}", Ex.what()); } + } +#endif + + if (Ok) + { + // TODO: could also add reporting on just how much memory was purged + ZEN_CONSOLE("Purged standby lists! (took {})", zen::NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } +} + +////////////////////////////////////////////////////////////////////////// +// BenchHttpSubCmd + + +BenchHttpSubCmd::BenchHttpSubCmd() +: ZenSubCmdBase("http", "Benchmark an HTTP server") +{ + SubOptions().add_option("", "u", "url", "URL to benchmark", cxxopts::value<std::string>(m_Url), "<url>"); + SubOptions().add_option("", + "n", + "count", + "Number of requests to send", + cxxopts::value<int>(m_Count)->default_value("100"), + "<count>"); + SubOptions().add_option("", + "c", + "concurrency", + "Number of concurrent threads", + cxxopts::value<int>(m_Concurrency)->default_value("1"), + "<threads>"); + SubOptions().add_option("", + "", + "method", + "HTTP method to use (GET, HEAD)", + cxxopts::value<std::string>(m_Method)->default_value("GET"), + "<method>"); + SubOptions().add_option("", + "", + "unix-socket", + "Unix domain socket path (overrides TCP)", + cxxopts::value<std::string>(m_SocketPath), + "<path>"); + SubOptions().add_options()("no-keepalive", + "Close connection after each request (disables keep-alive)", + cxxopts::value<bool>(m_NoKeepAlive)->default_value("false")); + SubOptions().add_options()("continuous", + "Run until interrupted (Ctrl+C), printing metrics once per second", + cxxopts::value<bool>(m_Continuous)->default_value("false")); + SubOptions().parse_positional({"url"}); +} + +static std::pair<std::string, std::string> +SplitUrl(std::string_view Url) +{ + size_t SchemeEnd = Url.find("://"); + size_t SearchFrom = (SchemeEnd != std::string_view::npos) ? SchemeEnd + 3 : 0; + size_t PathStart = Url.find('/', SearchFrom); + + if (PathStart == std::string_view::npos) + { + return {std::string(Url), "/"}; + } + + return {std::string(Url.substr(0, PathStart)), std::string(Url.substr(PathStart))}; +} + +void +BenchHttpSubCmd::Run(const ZenCliOptions& GlobalOptions) +{ + ZEN_UNUSED(GlobalOptions); + + if (m_Url.empty()) + { + throw OptionParseException("URL is required", SubOptions().help()); + } + + if (!m_Continuous && m_Count <= 0) + { + throw OptionParseException("--count must be a positive integer", SubOptions().help()); + } + + if (m_Concurrency <= 0) + { + throw OptionParseException("--concurrency must be a positive integer", SubOptions().help()); + } + + if (m_Method != "GET" && m_Method != "HEAD") + { + throw OptionParseException(fmt::format("Unsupported HTTP method '{}'. Supported: GET, HEAD", m_Method), + SubOptions().help()); + } + + auto [BaseUri, Path] = SplitUrl(m_Url); + + std::string ModeStr = m_Continuous ? "continuous" : fmt::format("count={}", m_Count); + + if (m_SocketPath.empty()) + { + ZEN_CONSOLE("Benchmarking {} {} ({}, concurrency={})", m_Method, m_Url, ModeStr, m_Concurrency); + } + else + { + ZEN_CONSOLE("Benchmarking {} {} via {} ({}, concurrency={})", m_Method, m_Url, m_SocketPath, ModeStr, m_Concurrency); + } + + // Probe for a zenserver identity. If the target exposes /health/info and the + // response contains a BuildVersion field we print a short summary. Any failure + // (non-zenserver, timeout, unreachable) is silently ignored. + try + { + HttpClientSettings ProbeSettings{.ConnectTimeout = std::chrono::milliseconds(2000), + .Timeout = std::chrono::milliseconds(2000), + .UnixSocketPath = m_SocketPath}; + HttpClient ProbeHttp(BaseUri, ProbeSettings); + HttpClient::Response ProbeResp = ProbeHttp.Get("/health/info"); + + if (ProbeResp.IsSuccess()) + { + CbObject Info = ProbeResp.AsObject(); + std::string_view BuildVersion = Info["BuildVersion"].AsString(); + + if (!BuildVersion.empty()) + { + std::string_view Hostname = Info["Hostname"].AsString(); + int64_t Pid = Info["Pid"].AsInt64(); + std::string_view HttpServerClass = Info["HttpServerClass"].AsString(); + ZEN_CONSOLE("Remote : zenserver {} on {} (pid {}, {})", BuildVersion, Hostname, Pid, HttpServerClass); + + std::string_view OS = Info["OS"].AsString(); + std::string_view Arch = Info["Arch"].AsString(); + + CbObjectView System = Info["System"].AsObjectView(); + int64_t LpCount = System["lp_count"].AsInt64(); + int64_t TotalMemMiB = System["total_memory_mb"].AsInt64(); + + ZEN_CONSOLE(" : {}, {}, {} logical processors, {} RAM", + OS, + Arch, + LpCount, + NiceBytes(static_cast<uint64_t>(TotalMemMiB) * 1024 * 1024)); + } + } + } + catch (...) + { + } + + if (m_Continuous) + { + RunContinuous(BaseUri, Path); + } + else + { + RunFixedCount(BaseUri, Path); + } +} + +void +BenchHttpSubCmd::RunFixedCount(const std::string& BaseUri, const std::string& Path) +{ + std::atomic<int> NextRequest{0}; + std::vector<double> AllLatencies; + AllLatencies.reserve(m_Count); + std::mutex LatencyMutex; + std::atomic<int> ErrorCount{0}; + std::atomic<int64_t> TotalDownloadedBytes{0}; + std::atomic<int64_t> TotalUploadedBytes{0}; + + Stopwatch Timer; - if (!Ok && !m_SingleProcess) + auto WorkerFn = [&]() { + std::vector<double> LocalLatencies; + + HttpClientSettings Settings{.UnixSocketPath = m_SocketPath, + .ForbidReuseConnection = m_NoKeepAlive}; + HttpClient Http(BaseUri, Settings); + + while (true) { + int RequestIndex = NextRequest.fetch_add(1); + + if (RequestIndex >= m_Count) + { + break; + } + try { - zen::CreateProcOptions Cpo; - Cpo.Flags = zen::CreateProcOptions::Flag_Elevated | zen::CreateProcOptions::Flag_NewConsole; + HttpClient::Response Resp = (m_Method == "HEAD") ? Http.Head(Path) : Http.Get(Path); + + if (Resp.IsSuccess()) + { + LocalLatencies.push_back(Resp.ElapsedSeconds); + TotalDownloadedBytes.fetch_add(Resp.DownloadedBytes); + TotalUploadedBytes.fetch_add(Resp.UploadedBytes); + } + else + { + ErrorCount.fetch_add(1); + } + } + catch (const HttpClientError&) + { + ErrorCount.fetch_add(1); + } + } + + std::lock_guard Lock(LatencyMutex); + AllLatencies.insert(AllLatencies.end(), LocalLatencies.begin(), LocalLatencies.end()); + }; + + std::vector<std::thread> Threads; + Threads.reserve(m_Concurrency); + + for (int i = 0; i < m_Concurrency; ++i) + { + Threads.emplace_back(WorkerFn); + } + + for (std::thread& T : Threads) + { + T.join(); + } + + double TotalSeconds = Timer.GetElapsedTimeMs() / 1000.0; + int SuccessCount = static_cast<int>(AllLatencies.size()); + int TotalCount = SuccessCount + ErrorCount.load(); + + std::sort(AllLatencies.begin(), AllLatencies.end()); + + auto PercentileMs = [&](int Pct) -> double { + if (AllLatencies.empty()) + { + return 0.0; + } + + size_t Index = std::min(AllLatencies.size() * static_cast<size_t>(Pct) / 100, AllLatencies.size() - 1); + + return AllLatencies[Index] * 1000.0; + }; + + double SumMs = 0.0; + for (double L : AllLatencies) + { + SumMs += L * 1000.0; + } + + double MeanMs = SuccessCount > 0 ? SumMs / SuccessCount : 0.0; + double Rps = TotalSeconds > 0.0 ? TotalCount / TotalSeconds : 0.0; + + uint64_t DownBytesPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalDownloadedBytes.load() / TotalSeconds) : 0; + uint64_t UpBytesPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalUploadedBytes.load() / TotalSeconds) : 0; + + ZEN_CONSOLE(" Requests : {:L} total, {:L} success, {:L} errors", TotalCount, SuccessCount, ErrorCount.load()); + ZEN_CONSOLE(" Latency : min={:.1f}ms mean={:.1f}ms p50={:.1f}ms p95={:.1f}ms p99={:.1f}ms max={:.1f}ms", + PercentileMs(0), + MeanMs, + PercentileMs(50), + PercentileMs(95), + PercentileMs(99), + PercentileMs(100)); + ZEN_CONSOLE(" Throughput: {:.1f} req/s down: {}/s up: {}/s (elapsed: {:.2f}s)", + Rps, + NiceBytes(DownBytesPerSec), + NiceBytes(UpBytesPerSec), + TotalSeconds); +} - std::filesystem::path CurExe{zen::GetRunningExecutablePath()}; +void +BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Path) +{ + s_BenchAbort.store(false); + + auto PrevSigInt = std::signal(SIGINT, [](int) { s_BenchAbort.store(true); }); + auto PrevSigTerm = std::signal(SIGTERM, [](int) { s_BenchAbort.store(true); }); + + metrics::Histogram LatencyHistogram; + std::atomic<int64_t> IntervalSuccessCount{0}; + std::atomic<int64_t> IntervalErrorCount{0}; + std::atomic<int64_t> IntervalDownloadBytes{0}; + std::atomic<int64_t> IntervalUploadBytes{0}; + std::atomic<int64_t> TotalSuccessCount{0}; + std::atomic<int64_t> TotalErrorCount{0}; + std::atomic<int64_t> TotalDownloadBytes{0}; + std::atomic<int64_t> TotalUploadBytes{0}; + + Stopwatch RunTimer; + + auto WorkerFn = [&]() { + HttpClientSettings Settings{.UnixSocketPath = m_SocketPath, + .ForbidReuseConnection = m_NoKeepAlive}; + HttpClient Http(BaseUri, Settings); - if (zen::CreateProcResult Cpr = zen::CreateProc(CurExe, fmt::format("bench --purge --single"), Cpo)) + while (!s_BenchAbort.load(std::memory_order_relaxed)) + { + try + { + HttpClient::Response Resp = (m_Method == "HEAD") ? Http.Head(Path) : Http.Get(Path); + + if (Resp.IsSuccess()) + { + LatencyHistogram.Update(static_cast<int64_t>(Resp.ElapsedSeconds * 1.0e6)); + IntervalSuccessCount.fetch_add(1, std::memory_order_relaxed); + IntervalDownloadBytes.fetch_add(Resp.DownloadedBytes, std::memory_order_relaxed); + IntervalUploadBytes.fetch_add(Resp.UploadedBytes, std::memory_order_relaxed); + TotalSuccessCount.fetch_add(1, std::memory_order_relaxed); + TotalDownloadBytes.fetch_add(Resp.DownloadedBytes, std::memory_order_relaxed); + TotalUploadBytes.fetch_add(Resp.UploadedBytes, std::memory_order_relaxed); + } + else { - zen::ProcessHandle ProcHandle; - ProcHandle.Initialize(Cpr); - - int ExitCode = ProcHandle.WaitExitCode(); - - if (ExitCode == 0) - { - Ok = true; - } - else - { - ZEN_CONSOLE_ERROR("Elevated child process failed with return code {}", ExitCode); - } + IntervalErrorCount.fetch_add(1, std::memory_order_relaxed); + TotalErrorCount.fetch_add(1, std::memory_order_relaxed); } } - catch (const std::exception& Ex) + catch (const HttpClientError&) { - ZEN_CONSOLE_ERROR("{}", Ex.what()); + IntervalErrorCount.fetch_add(1, std::memory_order_relaxed); + TotalErrorCount.fetch_add(1, std::memory_order_relaxed); } } + }; - if (Ok) + auto ReporterFn = [&]() { + while (!s_BenchAbort.load(std::memory_order_relaxed)) { - // TODO: could also add reporting on just how much memory was purged - ZEN_CONSOLE("Purged standby lists! (took {})", zen::NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + // Sleep 1s in short increments to stay responsive to abort + for (int i = 0; i < 10 && !s_BenchAbort.load(std::memory_order_relaxed); ++i) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + if (s_BenchAbort.load(std::memory_order_relaxed)) + { + break; + } + + // Snapshot and reset per-interval counters + int64_t Successes = IntervalSuccessCount.exchange(0); + int64_t Errors = IntervalErrorCount.exchange(0); + int64_t DownBytes = IntervalDownloadBytes.exchange(0); + int64_t UpBytes = IntervalUploadBytes.exchange(0); + + // Snapshot and reset latency histogram + uint64_t HistCount = LatencyHistogram.Count(); + int64_t HistMin = LatencyHistogram.Min(); + int64_t HistMax = LatencyHistogram.Max(); + double HistMean = LatencyHistogram.Mean(); + metrics::SampleSnapshot Snap = LatencyHistogram.Snapshot(); + LatencyHistogram.Clear(); + + // Format elapsed as HH:MM:SS + int TotalSec = static_cast<int>(RunTimer.GetElapsedTimeMs() / 1000.0); + int Hours = TotalSec / 3600; + int Minutes = (TotalSec % 3600) / 60; + int Secs = TotalSec % 60; + + if (HistCount > 0) + { + ZEN_CONSOLE( + "[{:02d}:{:02d}:{:02d}] req/s: {:L} errors: {:L} lat(ms): min={:.1f} mean={:.1f} p95={:.1f} p99={:.1f} max={:.1f} down: {}/s up: {}/s", + Hours, + Minutes, + Secs, + Successes, + Errors, + HistMin / 1000.0, + HistMean / 1000.0, + Snap.Get95Percentile() / 1000.0, + Snap.Get99Percentile() / 1000.0, + HistMax / 1000.0, + NiceBytes(static_cast<uint64_t>(std::max(int64_t{0}, DownBytes))), + NiceBytes(static_cast<uint64_t>(std::max(int64_t{0}, UpBytes)))); + } + else + { + ZEN_CONSOLE("[{:02d}:{:02d}:{:02d}] req/s: 0 errors: {:L} (no successful requests)", + Hours, + Minutes, + Secs, + Errors); + } } + }; + + std::vector<std::thread> Threads; + Threads.reserve(m_Concurrency + 1); + Threads.emplace_back(ReporterFn); + + for (int i = 0; i < m_Concurrency; ++i) + { + Threads.emplace_back(WorkerFn); } -#endif - return; + for (std::thread& T : Threads) + { + T.join(); + } + + std::signal(SIGINT, PrevSigInt); + std::signal(SIGTERM, PrevSigTerm); + + double TotalSeconds = RunTimer.GetElapsedTimeMs() / 1000.0; + int64_t TotalCount = TotalSuccessCount.load() + TotalErrorCount.load(); + uint64_t DownPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalDownloadBytes.load() / TotalSeconds) : 0; + uint64_t UpPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalUploadBytes.load() / TotalSeconds) : 0; + + ZEN_CONSOLE("Stopped. Total: {:L} requests, {:L} success, {:L} errors avg throughput: down {}/s up {}/s (elapsed: {:.2f}s)", + TotalCount, + TotalSuccessCount.load(), + TotalErrorCount.load(), + NiceBytes(DownPerSec), + NiceBytes(UpPerSec), + TotalSeconds); +} + +////////////////////////////////////////////////////////////////////////// +// BenchCommand + +BenchCommand::BenchCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("__hidden__", "", "subcommand", "", cxxopts::value<std::string>(m_SubCommand)->default_value(""), ""); + m_Options.parse_positional({"subcommand"}); + + AddSubCommand(m_PurgeSubCmd); + AddSubCommand(m_HttpSubCmd); } +BenchCommand::~BenchCommand() = default; + } // namespace zen |