aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/bench_cmd.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zen/cmds/bench_cmd.cpp')
-rw-r--r--src/zen/cmds/bench_cmd.cpp516
1 files changed, 469 insertions, 47 deletions
diff --git a/src/zen/cmds/bench_cmd.cpp b/src/zen/cmds/bench_cmd.cpp
index b9c45a328..658b42da6 100644
--- a/src/zen/cmds/bench_cmd.cpp
+++ b/src/zen/cmds/bench_cmd.cpp
@@ -3,6 +3,7 @@
#include "bench_cmd.h"
#include "bench.h"
+#include <zencore/compactbinary.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
@@ -11,93 +12,514 @@
#include <zencore/string.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
+#include <zenhttp/httpclient.h>
+#include <zentelemetry/stats.h>
+
+#include <algorithm>
+#include <atomic>
+#include <csignal>
+#include <mutex>
+#include <thread>
+
+static std::atomic<bool> s_BenchAbort{false};
namespace zen {
-BenchCommand::BenchCommand()
+//////////////////////////////////////////////////////////////////////////
+// BenchPurgeSubCmd
+
+BenchPurgeSubCmd::BenchPurgeSubCmd()
+: ZenSubCmdBase("purge", "Purge standby memory (system cache)")
{
- m_Options.add_options()("h,help", "Print help");
- m_Options.add_options()("purge",
- "Purge standby memory (system cache)",
- cxxopts::value<bool>(m_PurgeStandbyLists)->default_value("false"));
- m_Options.add_options()("single", "Do not spawn child processes", cxxopts::value<bool>(m_SingleProcess)->default_value("false"));
+ SubOptions().add_options()("single",
+ "Do not spawn child processes",
+ cxxopts::value<bool>(m_SingleProcess)->default_value("false"));
}
-BenchCommand::~BenchCommand() = default;
-
void
-BenchCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+BenchPurgeSubCmd::Run(const ZenCliOptions& GlobalOptions)
{
ZEN_UNUSED(GlobalOptions);
- if (!ParseOptions(argc, argv))
+ bool Ok = false;
+
+ zen::Stopwatch Timer;
+
+ try
+ {
+ zen::bench::util::EmptyStandByList();
+
+ Ok = true;
+ }
+ catch (const zen::bench::util::elevation_required_exception&)
+ {
+ ZEN_CONSOLE_WARN("Purging standby lists requires elevation. Will try launch as elevated process");
+ }
+ catch (const std::exception& Ex)
{
- return;
+ ZEN_CONSOLE_ERROR("{}", Ex.what());
}
#if ZEN_PLATFORM_WINDOWS
- if (m_PurgeStandbyLists)
+ if (!Ok && !m_SingleProcess)
{
- bool Ok = false;
-
- zen::Stopwatch Timer;
-
try
{
- zen::bench::util::EmptyStandByList();
+ zen::CreateProcOptions Cpo;
+ Cpo.Flags = zen::CreateProcOptions::Flag_Elevated | zen::CreateProcOptions::Flag_NewConsole;
- Ok = true;
- }
- catch (const zen::bench::util::elevation_required_exception&)
- {
- ZEN_CONSOLE_WARN("Purging standby lists requires elevation. Will try launch as elevated process");
+ std::filesystem::path CurExe{zen::GetRunningExecutablePath()};
+
+ if (zen::CreateProcResult Cpr = zen::CreateProc(CurExe, fmt::format("bench purge --single"), Cpo))
+ {
+ zen::ProcessHandle ProcHandle;
+ ProcHandle.Initialize(Cpr);
+
+ int ExitCode = ProcHandle.WaitExitCode();
+
+ if (ExitCode == 0)
+ {
+ Ok = true;
+ }
+ else
+ {
+ ZEN_CONSOLE_ERROR("Elevated child process failed with return code {}", ExitCode);
+ }
+ }
}
catch (const std::exception& Ex)
{
ZEN_CONSOLE_ERROR("{}", Ex.what());
}
+ }
+#endif
+
+ if (Ok)
+ {
+ // TODO: could also add reporting on just how much memory was purged
+ ZEN_CONSOLE("Purged standby lists! (took {})", zen::NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+// BenchHttpSubCmd
+
+
+BenchHttpSubCmd::BenchHttpSubCmd()
+: ZenSubCmdBase("http", "Benchmark an HTTP server")
+{
+ SubOptions().add_option("", "u", "url", "URL to benchmark", cxxopts::value<std::string>(m_Url), "<url>");
+ SubOptions().add_option("",
+ "n",
+ "count",
+ "Number of requests to send",
+ cxxopts::value<int>(m_Count)->default_value("100"),
+ "<count>");
+ SubOptions().add_option("",
+ "c",
+ "concurrency",
+ "Number of concurrent threads",
+ cxxopts::value<int>(m_Concurrency)->default_value("1"),
+ "<threads>");
+ SubOptions().add_option("",
+ "",
+ "method",
+ "HTTP method to use (GET, HEAD)",
+ cxxopts::value<std::string>(m_Method)->default_value("GET"),
+ "<method>");
+ SubOptions().add_option("",
+ "",
+ "unix-socket",
+ "Unix domain socket path (overrides TCP)",
+ cxxopts::value<std::string>(m_SocketPath),
+ "<path>");
+ SubOptions().add_options()("no-keepalive",
+ "Close connection after each request (disables keep-alive)",
+ cxxopts::value<bool>(m_NoKeepAlive)->default_value("false"));
+ SubOptions().add_options()("continuous",
+ "Run until interrupted (Ctrl+C), printing metrics once per second",
+ cxxopts::value<bool>(m_Continuous)->default_value("false"));
+ SubOptions().parse_positional({"url"});
+}
+
+static std::pair<std::string, std::string>
+SplitUrl(std::string_view Url)
+{
+ size_t SchemeEnd = Url.find("://");
+ size_t SearchFrom = (SchemeEnd != std::string_view::npos) ? SchemeEnd + 3 : 0;
+ size_t PathStart = Url.find('/', SearchFrom);
+
+ if (PathStart == std::string_view::npos)
+ {
+ return {std::string(Url), "/"};
+ }
+
+ return {std::string(Url.substr(0, PathStart)), std::string(Url.substr(PathStart))};
+}
+
+void
+BenchHttpSubCmd::Run(const ZenCliOptions& GlobalOptions)
+{
+ ZEN_UNUSED(GlobalOptions);
+
+ if (m_Url.empty())
+ {
+ throw OptionParseException("URL is required", SubOptions().help());
+ }
+
+ if (!m_Continuous && m_Count <= 0)
+ {
+ throw OptionParseException("--count must be a positive integer", SubOptions().help());
+ }
+
+ if (m_Concurrency <= 0)
+ {
+ throw OptionParseException("--concurrency must be a positive integer", SubOptions().help());
+ }
+
+ if (m_Method != "GET" && m_Method != "HEAD")
+ {
+ throw OptionParseException(fmt::format("Unsupported HTTP method '{}'. Supported: GET, HEAD", m_Method),
+ SubOptions().help());
+ }
+
+ auto [BaseUri, Path] = SplitUrl(m_Url);
+
+ std::string ModeStr = m_Continuous ? "continuous" : fmt::format("count={}", m_Count);
+
+ if (m_SocketPath.empty())
+ {
+ ZEN_CONSOLE("Benchmarking {} {} ({}, concurrency={})", m_Method, m_Url, ModeStr, m_Concurrency);
+ }
+ else
+ {
+ ZEN_CONSOLE("Benchmarking {} {} via {} ({}, concurrency={})", m_Method, m_Url, m_SocketPath, ModeStr, m_Concurrency);
+ }
+
+ // Probe for a zenserver identity. If the target exposes /health/info and the
+ // response contains a BuildVersion field we print a short summary. Any failure
+ // (non-zenserver, timeout, unreachable) is silently ignored.
+ try
+ {
+ HttpClientSettings ProbeSettings{.ConnectTimeout = std::chrono::milliseconds(2000),
+ .Timeout = std::chrono::milliseconds(2000),
+ .UnixSocketPath = m_SocketPath};
+ HttpClient ProbeHttp(BaseUri, ProbeSettings);
+ HttpClient::Response ProbeResp = ProbeHttp.Get("/health/info");
+
+ if (ProbeResp.IsSuccess())
+ {
+ CbObject Info = ProbeResp.AsObject();
+ std::string_view BuildVersion = Info["BuildVersion"].AsString();
+
+ if (!BuildVersion.empty())
+ {
+ std::string_view Hostname = Info["Hostname"].AsString();
+ int64_t Pid = Info["Pid"].AsInt64();
+ std::string_view HttpServerClass = Info["HttpServerClass"].AsString();
+ ZEN_CONSOLE("Remote : zenserver {} on {} (pid {}, {})", BuildVersion, Hostname, Pid, HttpServerClass);
+
+ std::string_view OS = Info["OS"].AsString();
+ std::string_view Arch = Info["Arch"].AsString();
+
+ CbObjectView System = Info["System"].AsObjectView();
+ int64_t LpCount = System["lp_count"].AsInt64();
+ int64_t TotalMemMiB = System["total_memory_mb"].AsInt64();
+
+ ZEN_CONSOLE(" : {}, {}, {} logical processors, {} RAM",
+ OS,
+ Arch,
+ LpCount,
+ NiceBytes(static_cast<uint64_t>(TotalMemMiB) * 1024 * 1024));
+ }
+ }
+ }
+ catch (...)
+ {
+ }
+
+ if (m_Continuous)
+ {
+ RunContinuous(BaseUri, Path);
+ }
+ else
+ {
+ RunFixedCount(BaseUri, Path);
+ }
+}
+
+void
+BenchHttpSubCmd::RunFixedCount(const std::string& BaseUri, const std::string& Path)
+{
+ std::atomic<int> NextRequest{0};
+ std::vector<double> AllLatencies;
+ AllLatencies.reserve(m_Count);
+ std::mutex LatencyMutex;
+ std::atomic<int> ErrorCount{0};
+ std::atomic<int64_t> TotalDownloadedBytes{0};
+ std::atomic<int64_t> TotalUploadedBytes{0};
+
+ Stopwatch Timer;
- if (!Ok && !m_SingleProcess)
+ auto WorkerFn = [&]() {
+ std::vector<double> LocalLatencies;
+
+ HttpClientSettings Settings{.UnixSocketPath = m_SocketPath,
+ .ForbidReuseConnection = m_NoKeepAlive};
+ HttpClient Http(BaseUri, Settings);
+
+ while (true)
{
+ int RequestIndex = NextRequest.fetch_add(1);
+
+ if (RequestIndex >= m_Count)
+ {
+ break;
+ }
+
try
{
- zen::CreateProcOptions Cpo;
- Cpo.Flags = zen::CreateProcOptions::Flag_Elevated | zen::CreateProcOptions::Flag_NewConsole;
+ HttpClient::Response Resp = (m_Method == "HEAD") ? Http.Head(Path) : Http.Get(Path);
+
+ if (Resp.IsSuccess())
+ {
+ LocalLatencies.push_back(Resp.ElapsedSeconds);
+ TotalDownloadedBytes.fetch_add(Resp.DownloadedBytes);
+ TotalUploadedBytes.fetch_add(Resp.UploadedBytes);
+ }
+ else
+ {
+ ErrorCount.fetch_add(1);
+ }
+ }
+ catch (const HttpClientError&)
+ {
+ ErrorCount.fetch_add(1);
+ }
+ }
+
+ std::lock_guard Lock(LatencyMutex);
+ AllLatencies.insert(AllLatencies.end(), LocalLatencies.begin(), LocalLatencies.end());
+ };
+
+ std::vector<std::thread> Threads;
+ Threads.reserve(m_Concurrency);
+
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ Threads.emplace_back(WorkerFn);
+ }
+
+ for (std::thread& T : Threads)
+ {
+ T.join();
+ }
+
+ double TotalSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ int SuccessCount = static_cast<int>(AllLatencies.size());
+ int TotalCount = SuccessCount + ErrorCount.load();
+
+ std::sort(AllLatencies.begin(), AllLatencies.end());
+
+ auto PercentileMs = [&](int Pct) -> double {
+ if (AllLatencies.empty())
+ {
+ return 0.0;
+ }
+
+ size_t Index = std::min(AllLatencies.size() * static_cast<size_t>(Pct) / 100, AllLatencies.size() - 1);
+
+ return AllLatencies[Index] * 1000.0;
+ };
+
+ double SumMs = 0.0;
+ for (double L : AllLatencies)
+ {
+ SumMs += L * 1000.0;
+ }
+
+ double MeanMs = SuccessCount > 0 ? SumMs / SuccessCount : 0.0;
+ double Rps = TotalSeconds > 0.0 ? TotalCount / TotalSeconds : 0.0;
+
+ uint64_t DownBytesPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalDownloadedBytes.load() / TotalSeconds) : 0;
+ uint64_t UpBytesPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalUploadedBytes.load() / TotalSeconds) : 0;
+
+ ZEN_CONSOLE(" Requests : {:L} total, {:L} success, {:L} errors", TotalCount, SuccessCount, ErrorCount.load());
+ ZEN_CONSOLE(" Latency : min={:.1f}ms mean={:.1f}ms p50={:.1f}ms p95={:.1f}ms p99={:.1f}ms max={:.1f}ms",
+ PercentileMs(0),
+ MeanMs,
+ PercentileMs(50),
+ PercentileMs(95),
+ PercentileMs(99),
+ PercentileMs(100));
+ ZEN_CONSOLE(" Throughput: {:.1f} req/s down: {}/s up: {}/s (elapsed: {:.2f}s)",
+ Rps,
+ NiceBytes(DownBytesPerSec),
+ NiceBytes(UpBytesPerSec),
+ TotalSeconds);
+}
- std::filesystem::path CurExe{zen::GetRunningExecutablePath()};
+void
+BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Path)
+{
+ s_BenchAbort.store(false);
+
+ auto PrevSigInt = std::signal(SIGINT, [](int) { s_BenchAbort.store(true); });
+ auto PrevSigTerm = std::signal(SIGTERM, [](int) { s_BenchAbort.store(true); });
+
+ metrics::Histogram LatencyHistogram;
+ std::atomic<int64_t> IntervalSuccessCount{0};
+ std::atomic<int64_t> IntervalErrorCount{0};
+ std::atomic<int64_t> IntervalDownloadBytes{0};
+ std::atomic<int64_t> IntervalUploadBytes{0};
+ std::atomic<int64_t> TotalSuccessCount{0};
+ std::atomic<int64_t> TotalErrorCount{0};
+ std::atomic<int64_t> TotalDownloadBytes{0};
+ std::atomic<int64_t> TotalUploadBytes{0};
+
+ Stopwatch RunTimer;
+
+ auto WorkerFn = [&]() {
+ HttpClientSettings Settings{.UnixSocketPath = m_SocketPath,
+ .ForbidReuseConnection = m_NoKeepAlive};
+ HttpClient Http(BaseUri, Settings);
- if (zen::CreateProcResult Cpr = zen::CreateProc(CurExe, fmt::format("bench --purge --single"), Cpo))
+ while (!s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ try
+ {
+ HttpClient::Response Resp = (m_Method == "HEAD") ? Http.Head(Path) : Http.Get(Path);
+
+ if (Resp.IsSuccess())
+ {
+ LatencyHistogram.Update(static_cast<int64_t>(Resp.ElapsedSeconds * 1.0e6));
+ IntervalSuccessCount.fetch_add(1, std::memory_order_relaxed);
+ IntervalDownloadBytes.fetch_add(Resp.DownloadedBytes, std::memory_order_relaxed);
+ IntervalUploadBytes.fetch_add(Resp.UploadedBytes, std::memory_order_relaxed);
+ TotalSuccessCount.fetch_add(1, std::memory_order_relaxed);
+ TotalDownloadBytes.fetch_add(Resp.DownloadedBytes, std::memory_order_relaxed);
+ TotalUploadBytes.fetch_add(Resp.UploadedBytes, std::memory_order_relaxed);
+ }
+ else
{
- zen::ProcessHandle ProcHandle;
- ProcHandle.Initialize(Cpr);
-
- int ExitCode = ProcHandle.WaitExitCode();
-
- if (ExitCode == 0)
- {
- Ok = true;
- }
- else
- {
- ZEN_CONSOLE_ERROR("Elevated child process failed with return code {}", ExitCode);
- }
+ IntervalErrorCount.fetch_add(1, std::memory_order_relaxed);
+ TotalErrorCount.fetch_add(1, std::memory_order_relaxed);
}
}
- catch (const std::exception& Ex)
+ catch (const HttpClientError&)
{
- ZEN_CONSOLE_ERROR("{}", Ex.what());
+ IntervalErrorCount.fetch_add(1, std::memory_order_relaxed);
+ TotalErrorCount.fetch_add(1, std::memory_order_relaxed);
}
}
+ };
- if (Ok)
+ auto ReporterFn = [&]() {
+ while (!s_BenchAbort.load(std::memory_order_relaxed))
{
- // TODO: could also add reporting on just how much memory was purged
- ZEN_CONSOLE("Purged standby lists! (took {})", zen::NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ // Sleep 1s in short increments to stay responsive to abort
+ for (int i = 0; i < 10 && !s_BenchAbort.load(std::memory_order_relaxed); ++i)
+ {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+
+ if (s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ break;
+ }
+
+ // Snapshot and reset per-interval counters
+ int64_t Successes = IntervalSuccessCount.exchange(0);
+ int64_t Errors = IntervalErrorCount.exchange(0);
+ int64_t DownBytes = IntervalDownloadBytes.exchange(0);
+ int64_t UpBytes = IntervalUploadBytes.exchange(0);
+
+ // Snapshot and reset latency histogram
+ uint64_t HistCount = LatencyHistogram.Count();
+ int64_t HistMin = LatencyHistogram.Min();
+ int64_t HistMax = LatencyHistogram.Max();
+ double HistMean = LatencyHistogram.Mean();
+ metrics::SampleSnapshot Snap = LatencyHistogram.Snapshot();
+ LatencyHistogram.Clear();
+
+ // Format elapsed as HH:MM:SS
+ int TotalSec = static_cast<int>(RunTimer.GetElapsedTimeMs() / 1000.0);
+ int Hours = TotalSec / 3600;
+ int Minutes = (TotalSec % 3600) / 60;
+ int Secs = TotalSec % 60;
+
+ if (HistCount > 0)
+ {
+ ZEN_CONSOLE(
+ "[{:02d}:{:02d}:{:02d}] req/s: {:L} errors: {:L} lat(ms): min={:.1f} mean={:.1f} p95={:.1f} p99={:.1f} max={:.1f} down: {}/s up: {}/s",
+ Hours,
+ Minutes,
+ Secs,
+ Successes,
+ Errors,
+ HistMin / 1000.0,
+ HistMean / 1000.0,
+ Snap.Get95Percentile() / 1000.0,
+ Snap.Get99Percentile() / 1000.0,
+ HistMax / 1000.0,
+ NiceBytes(static_cast<uint64_t>(std::max(int64_t{0}, DownBytes))),
+ NiceBytes(static_cast<uint64_t>(std::max(int64_t{0}, UpBytes))));
+ }
+ else
+ {
+ ZEN_CONSOLE("[{:02d}:{:02d}:{:02d}] req/s: 0 errors: {:L} (no successful requests)",
+ Hours,
+ Minutes,
+ Secs,
+ Errors);
+ }
}
+ };
+
+ std::vector<std::thread> Threads;
+ Threads.reserve(m_Concurrency + 1);
+ Threads.emplace_back(ReporterFn);
+
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ Threads.emplace_back(WorkerFn);
}
-#endif
- return;
+ for (std::thread& T : Threads)
+ {
+ T.join();
+ }
+
+ std::signal(SIGINT, PrevSigInt);
+ std::signal(SIGTERM, PrevSigTerm);
+
+ double TotalSeconds = RunTimer.GetElapsedTimeMs() / 1000.0;
+ int64_t TotalCount = TotalSuccessCount.load() + TotalErrorCount.load();
+ uint64_t DownPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalDownloadBytes.load() / TotalSeconds) : 0;
+ uint64_t UpPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalUploadBytes.load() / TotalSeconds) : 0;
+
+ ZEN_CONSOLE("Stopped. Total: {:L} requests, {:L} success, {:L} errors avg throughput: down {}/s up {}/s (elapsed: {:.2f}s)",
+ TotalCount,
+ TotalSuccessCount.load(),
+ TotalErrorCount.load(),
+ NiceBytes(DownPerSec),
+ NiceBytes(UpPerSec),
+ TotalSeconds);
+}
+
+//////////////////////////////////////////////////////////////////////////
+// BenchCommand
+
+BenchCommand::BenchCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("__hidden__", "", "subcommand", "", cxxopts::value<std::string>(m_SubCommand)->default_value(""), "");
+ m_Options.parse_positional({"subcommand"});
+
+ AddSubCommand(m_PurgeSubCmd);
+ AddSubCommand(m_HttpSubCmd);
}
+BenchCommand::~BenchCommand() = default;
+
} // namespace zen