aboutsummaryrefslogtreecommitdiff
path: root/src/zen
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-10 17:27:26 +0100
committerGitHub Enterprise <[email protected]>2026-03-10 17:27:26 +0100
commitd0a07e555577dcd4a8f55f1b45d9e8e4e6366ab7 (patch)
tree2dfe1e3e0b620043d358e0b7f8bdf8320d985491 /src/zen
parentchangelog entry which was inadvertently omitted from PR merge (diff)
downloadarchived-zen-d0a07e555577dcd4a8f55f1b45d9e8e4e6366ab7.tar.xz
archived-zen-d0a07e555577dcd4a8f55f1b45d9e8e4e6366ab7.zip
HttpClient using libcurl, Unix Sockets for HTTP. HTTPS support (#770)
The main goal of this change is to eliminate the cpr back-end altogether and replace it with the curl implementation. I would expect to drop cpr as soon as we feel happy with the libcurl back-end. That would leave us with a direct dependency on libcurl only, and cpr can be eliminated as a dependency. ### HttpClient Backend Overhaul - Implemented a new **libcurl-based HttpClient** backend (`httpclientcurl.cpp`, ~2000 lines) as an alternative to the cpr-based one - Made HttpClient backend **configurable at runtime** via constructor arguments and `-httpclient=...` CLI option (for zen, zenserver, and tests) - Extended HttpClient test suite to cover multipart/content-range scenarios ### Unix Domain Socket Support - Added Unix domain socket support to **httpasio** (server side) - Added Unix domain socket support to **HttpClient** - Added Unix domain socket support to **HttpWsClient** (WebSocket client) - Templatized `HttpServerConnectionT<SocketType>` and `WsAsioConnectionT<SocketType>` to handle TCP, Unix, and SSL sockets uniformly via `if constexpr` dispatch ### HTTPS Support - Added **preliminary HTTPS support to httpasio** (for Mac/Linux via OpenSSL) - Added **basic HTTPS support for http.sys** (Windows) - Implemented HTTPS test for httpasio - Split `InitializeServer` into smaller sub-functions for http.sys ### Other Notable Changes - Improved **zenhttp-test stability** with dynamic port allocation - Enhanced port retry logic in http.sys (handles ERROR_ACCESS_DENIED) - Fatal signal/exception handlers for backtrace generation in tests - Added `zen bench http` subcommand to exercise network + HTTP client/server communication stack
Diffstat (limited to 'src/zen')
-rw-r--r--src/zen/bench.cpp47
-rw-r--r--src/zen/cmds/bench_cmd.cpp516
-rw-r--r--src/zen/cmds/bench_cmd.h41
-rw-r--r--src/zen/xmake.lua2
-rw-r--r--src/zen/zen.cpp75
-rw-r--r--src/zen/zen.h7
6 files changed, 630 insertions, 58 deletions
diff --git a/src/zen/bench.cpp b/src/zen/bench.cpp
index 614454ed5..2332ce1b8 100644
--- a/src/zen/bench.cpp
+++ b/src/zen/bench.cpp
@@ -119,6 +119,53 @@ EmptyStandByList()
} // namespace zen::bench::util
+#elif ZEN_PLATFORM_LINUX
+
+# include <fcntl.h>
+# include <unistd.h>
+
+namespace zen::bench::util {
+
+void
+EmptyStandByList()
+{
+ sync();
+
+ int Fd = open("/proc/sys/vm/drop_caches", O_WRONLY);
+ if (Fd < 0)
+ {
+ throw std::runtime_error("Failed to open /proc/sys/vm/drop_caches (are you running as root?)");
+ }
+
+ if (write(Fd, "3", 1) != 1)
+ {
+ close(Fd);
+ throw std::runtime_error("Failed to write to /proc/sys/vm/drop_caches");
+ }
+
+ close(Fd);
+}
+
+} // namespace zen::bench::util
+
+#elif ZEN_PLATFORM_MAC
+
+# include <cstdlib>
+
+namespace zen::bench::util {
+
+void
+EmptyStandByList()
+{
+ int Result = system("/usr/sbin/purge");
+ if (Result != 0)
+ {
+ throw std::runtime_error("Failed to run /usr/sbin/purge (are you running as root?)");
+ }
+}
+
+} // namespace zen::bench::util
+
#else
namespace zen::bench::util {
diff --git a/src/zen/cmds/bench_cmd.cpp b/src/zen/cmds/bench_cmd.cpp
index b9c45a328..658b42da6 100644
--- a/src/zen/cmds/bench_cmd.cpp
+++ b/src/zen/cmds/bench_cmd.cpp
@@ -3,6 +3,7 @@
#include "bench_cmd.h"
#include "bench.h"
+#include <zencore/compactbinary.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
@@ -11,93 +12,514 @@
#include <zencore/string.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
+#include <zenhttp/httpclient.h>
+#include <zentelemetry/stats.h>
+
+#include <algorithm>
+#include <atomic>
+#include <csignal>
+#include <mutex>
+#include <thread>
+
+static std::atomic<bool> s_BenchAbort{false};
namespace zen {
-BenchCommand::BenchCommand()
+//////////////////////////////////////////////////////////////////////////
+// BenchPurgeSubCmd
+
+BenchPurgeSubCmd::BenchPurgeSubCmd()
+: ZenSubCmdBase("purge", "Purge standby memory (system cache)")
{
- m_Options.add_options()("h,help", "Print help");
- m_Options.add_options()("purge",
- "Purge standby memory (system cache)",
- cxxopts::value<bool>(m_PurgeStandbyLists)->default_value("false"));
- m_Options.add_options()("single", "Do not spawn child processes", cxxopts::value<bool>(m_SingleProcess)->default_value("false"));
+ SubOptions().add_options()("single",
+ "Do not spawn child processes",
+ cxxopts::value<bool>(m_SingleProcess)->default_value("false"));
}
-BenchCommand::~BenchCommand() = default;
-
void
-BenchCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+BenchPurgeSubCmd::Run(const ZenCliOptions& GlobalOptions)
{
ZEN_UNUSED(GlobalOptions);
- if (!ParseOptions(argc, argv))
+ bool Ok = false;
+
+ zen::Stopwatch Timer;
+
+ try
+ {
+ zen::bench::util::EmptyStandByList();
+
+ Ok = true;
+ }
+ catch (const zen::bench::util::elevation_required_exception&)
+ {
+ ZEN_CONSOLE_WARN("Purging standby lists requires elevation. Will try launch as elevated process");
+ }
+ catch (const std::exception& Ex)
{
- return;
+ ZEN_CONSOLE_ERROR("{}", Ex.what());
}
#if ZEN_PLATFORM_WINDOWS
- if (m_PurgeStandbyLists)
+ if (!Ok && !m_SingleProcess)
{
- bool Ok = false;
-
- zen::Stopwatch Timer;
-
try
{
- zen::bench::util::EmptyStandByList();
+ zen::CreateProcOptions Cpo;
+ Cpo.Flags = zen::CreateProcOptions::Flag_Elevated | zen::CreateProcOptions::Flag_NewConsole;
- Ok = true;
- }
- catch (const zen::bench::util::elevation_required_exception&)
- {
- ZEN_CONSOLE_WARN("Purging standby lists requires elevation. Will try launch as elevated process");
+ std::filesystem::path CurExe{zen::GetRunningExecutablePath()};
+
+ if (zen::CreateProcResult Cpr = zen::CreateProc(CurExe, fmt::format("bench purge --single"), Cpo))
+ {
+ zen::ProcessHandle ProcHandle;
+ ProcHandle.Initialize(Cpr);
+
+ int ExitCode = ProcHandle.WaitExitCode();
+
+ if (ExitCode == 0)
+ {
+ Ok = true;
+ }
+ else
+ {
+ ZEN_CONSOLE_ERROR("Elevated child process failed with return code {}", ExitCode);
+ }
+ }
}
catch (const std::exception& Ex)
{
ZEN_CONSOLE_ERROR("{}", Ex.what());
}
+ }
+#endif
+
+ if (Ok)
+ {
+ // TODO: could also add reporting on just how much memory was purged
+ ZEN_CONSOLE("Purged standby lists! (took {})", zen::NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+// BenchHttpSubCmd
+
+
+BenchHttpSubCmd::BenchHttpSubCmd()
+: ZenSubCmdBase("http", "Benchmark an HTTP server")
+{
+ SubOptions().add_option("", "u", "url", "URL to benchmark", cxxopts::value<std::string>(m_Url), "<url>");
+ SubOptions().add_option("",
+ "n",
+ "count",
+ "Number of requests to send",
+ cxxopts::value<int>(m_Count)->default_value("100"),
+ "<count>");
+ SubOptions().add_option("",
+ "c",
+ "concurrency",
+ "Number of concurrent threads",
+ cxxopts::value<int>(m_Concurrency)->default_value("1"),
+ "<threads>");
+ SubOptions().add_option("",
+ "",
+ "method",
+ "HTTP method to use (GET, HEAD)",
+ cxxopts::value<std::string>(m_Method)->default_value("GET"),
+ "<method>");
+ SubOptions().add_option("",
+ "",
+ "unix-socket",
+ "Unix domain socket path (overrides TCP)",
+ cxxopts::value<std::string>(m_SocketPath),
+ "<path>");
+ SubOptions().add_options()("no-keepalive",
+ "Close connection after each request (disables keep-alive)",
+ cxxopts::value<bool>(m_NoKeepAlive)->default_value("false"));
+ SubOptions().add_options()("continuous",
+ "Run until interrupted (Ctrl+C), printing metrics once per second",
+ cxxopts::value<bool>(m_Continuous)->default_value("false"));
+ SubOptions().parse_positional({"url"});
+}
+
+static std::pair<std::string, std::string>
+SplitUrl(std::string_view Url)
+{
+ size_t SchemeEnd = Url.find("://");
+ size_t SearchFrom = (SchemeEnd != std::string_view::npos) ? SchemeEnd + 3 : 0;
+ size_t PathStart = Url.find('/', SearchFrom);
+
+ if (PathStart == std::string_view::npos)
+ {
+ return {std::string(Url), "/"};
+ }
+
+ return {std::string(Url.substr(0, PathStart)), std::string(Url.substr(PathStart))};
+}
+
+void
+BenchHttpSubCmd::Run(const ZenCliOptions& GlobalOptions)
+{
+ ZEN_UNUSED(GlobalOptions);
+
+ if (m_Url.empty())
+ {
+ throw OptionParseException("URL is required", SubOptions().help());
+ }
+
+ if (!m_Continuous && m_Count <= 0)
+ {
+ throw OptionParseException("--count must be a positive integer", SubOptions().help());
+ }
+
+ if (m_Concurrency <= 0)
+ {
+ throw OptionParseException("--concurrency must be a positive integer", SubOptions().help());
+ }
+
+ if (m_Method != "GET" && m_Method != "HEAD")
+ {
+ throw OptionParseException(fmt::format("Unsupported HTTP method '{}'. Supported: GET, HEAD", m_Method),
+ SubOptions().help());
+ }
+
+ auto [BaseUri, Path] = SplitUrl(m_Url);
+
+ std::string ModeStr = m_Continuous ? "continuous" : fmt::format("count={}", m_Count);
+
+ if (m_SocketPath.empty())
+ {
+ ZEN_CONSOLE("Benchmarking {} {} ({}, concurrency={})", m_Method, m_Url, ModeStr, m_Concurrency);
+ }
+ else
+ {
+ ZEN_CONSOLE("Benchmarking {} {} via {} ({}, concurrency={})", m_Method, m_Url, m_SocketPath, ModeStr, m_Concurrency);
+ }
+
+ // Probe for a zenserver identity. If the target exposes /health/info and the
+ // response contains a BuildVersion field we print a short summary. Any failure
+ // (non-zenserver, timeout, unreachable) is silently ignored.
+ try
+ {
+ HttpClientSettings ProbeSettings{.ConnectTimeout = std::chrono::milliseconds(2000),
+ .Timeout = std::chrono::milliseconds(2000),
+ .UnixSocketPath = m_SocketPath};
+ HttpClient ProbeHttp(BaseUri, ProbeSettings);
+ HttpClient::Response ProbeResp = ProbeHttp.Get("/health/info");
+
+ if (ProbeResp.IsSuccess())
+ {
+ CbObject Info = ProbeResp.AsObject();
+ std::string_view BuildVersion = Info["BuildVersion"].AsString();
+
+ if (!BuildVersion.empty())
+ {
+ std::string_view Hostname = Info["Hostname"].AsString();
+ int64_t Pid = Info["Pid"].AsInt64();
+ std::string_view HttpServerClass = Info["HttpServerClass"].AsString();
+ ZEN_CONSOLE("Remote : zenserver {} on {} (pid {}, {})", BuildVersion, Hostname, Pid, HttpServerClass);
+
+ std::string_view OS = Info["OS"].AsString();
+ std::string_view Arch = Info["Arch"].AsString();
+
+ CbObjectView System = Info["System"].AsObjectView();
+ int64_t LpCount = System["lp_count"].AsInt64();
+ int64_t TotalMemMiB = System["total_memory_mb"].AsInt64();
+
+ ZEN_CONSOLE(" : {}, {}, {} logical processors, {} RAM",
+ OS,
+ Arch,
+ LpCount,
+ NiceBytes(static_cast<uint64_t>(TotalMemMiB) * 1024 * 1024));
+ }
+ }
+ }
+ catch (...)
+ {
+ }
+
+ if (m_Continuous)
+ {
+ RunContinuous(BaseUri, Path);
+ }
+ else
+ {
+ RunFixedCount(BaseUri, Path);
+ }
+}
+
+void
+BenchHttpSubCmd::RunFixedCount(const std::string& BaseUri, const std::string& Path)
+{
+ std::atomic<int> NextRequest{0};
+ std::vector<double> AllLatencies;
+ AllLatencies.reserve(m_Count);
+ std::mutex LatencyMutex;
+ std::atomic<int> ErrorCount{0};
+ std::atomic<int64_t> TotalDownloadedBytes{0};
+ std::atomic<int64_t> TotalUploadedBytes{0};
+
+ Stopwatch Timer;
- if (!Ok && !m_SingleProcess)
+ auto WorkerFn = [&]() {
+ std::vector<double> LocalLatencies;
+
+ HttpClientSettings Settings{.UnixSocketPath = m_SocketPath,
+ .ForbidReuseConnection = m_NoKeepAlive};
+ HttpClient Http(BaseUri, Settings);
+
+ while (true)
{
+ int RequestIndex = NextRequest.fetch_add(1);
+
+ if (RequestIndex >= m_Count)
+ {
+ break;
+ }
+
try
{
- zen::CreateProcOptions Cpo;
- Cpo.Flags = zen::CreateProcOptions::Flag_Elevated | zen::CreateProcOptions::Flag_NewConsole;
+ HttpClient::Response Resp = (m_Method == "HEAD") ? Http.Head(Path) : Http.Get(Path);
+
+ if (Resp.IsSuccess())
+ {
+ LocalLatencies.push_back(Resp.ElapsedSeconds);
+ TotalDownloadedBytes.fetch_add(Resp.DownloadedBytes);
+ TotalUploadedBytes.fetch_add(Resp.UploadedBytes);
+ }
+ else
+ {
+ ErrorCount.fetch_add(1);
+ }
+ }
+ catch (const HttpClientError&)
+ {
+ ErrorCount.fetch_add(1);
+ }
+ }
+
+ std::lock_guard Lock(LatencyMutex);
+ AllLatencies.insert(AllLatencies.end(), LocalLatencies.begin(), LocalLatencies.end());
+ };
+
+ std::vector<std::thread> Threads;
+ Threads.reserve(m_Concurrency);
+
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ Threads.emplace_back(WorkerFn);
+ }
+
+ for (std::thread& T : Threads)
+ {
+ T.join();
+ }
+
+ double TotalSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ int SuccessCount = static_cast<int>(AllLatencies.size());
+ int TotalCount = SuccessCount + ErrorCount.load();
+
+ std::sort(AllLatencies.begin(), AllLatencies.end());
+
+ auto PercentileMs = [&](int Pct) -> double {
+ if (AllLatencies.empty())
+ {
+ return 0.0;
+ }
+
+ size_t Index = std::min(AllLatencies.size() * static_cast<size_t>(Pct) / 100, AllLatencies.size() - 1);
+
+ return AllLatencies[Index] * 1000.0;
+ };
+
+ double SumMs = 0.0;
+ for (double L : AllLatencies)
+ {
+ SumMs += L * 1000.0;
+ }
+
+ double MeanMs = SuccessCount > 0 ? SumMs / SuccessCount : 0.0;
+ double Rps = TotalSeconds > 0.0 ? TotalCount / TotalSeconds : 0.0;
+
+ uint64_t DownBytesPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalDownloadedBytes.load() / TotalSeconds) : 0;
+ uint64_t UpBytesPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalUploadedBytes.load() / TotalSeconds) : 0;
+
+ ZEN_CONSOLE(" Requests : {:L} total, {:L} success, {:L} errors", TotalCount, SuccessCount, ErrorCount.load());
+ ZEN_CONSOLE(" Latency : min={:.1f}ms mean={:.1f}ms p50={:.1f}ms p95={:.1f}ms p99={:.1f}ms max={:.1f}ms",
+ PercentileMs(0),
+ MeanMs,
+ PercentileMs(50),
+ PercentileMs(95),
+ PercentileMs(99),
+ PercentileMs(100));
+ ZEN_CONSOLE(" Throughput: {:.1f} req/s down: {}/s up: {}/s (elapsed: {:.2f}s)",
+ Rps,
+ NiceBytes(DownBytesPerSec),
+ NiceBytes(UpBytesPerSec),
+ TotalSeconds);
+}
- std::filesystem::path CurExe{zen::GetRunningExecutablePath()};
+void
+BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Path)
+{
+ s_BenchAbort.store(false);
+
+ auto PrevSigInt = std::signal(SIGINT, [](int) { s_BenchAbort.store(true); });
+ auto PrevSigTerm = std::signal(SIGTERM, [](int) { s_BenchAbort.store(true); });
+
+ metrics::Histogram LatencyHistogram;
+ std::atomic<int64_t> IntervalSuccessCount{0};
+ std::atomic<int64_t> IntervalErrorCount{0};
+ std::atomic<int64_t> IntervalDownloadBytes{0};
+ std::atomic<int64_t> IntervalUploadBytes{0};
+ std::atomic<int64_t> TotalSuccessCount{0};
+ std::atomic<int64_t> TotalErrorCount{0};
+ std::atomic<int64_t> TotalDownloadBytes{0};
+ std::atomic<int64_t> TotalUploadBytes{0};
+
+ Stopwatch RunTimer;
+
+ auto WorkerFn = [&]() {
+ HttpClientSettings Settings{.UnixSocketPath = m_SocketPath,
+ .ForbidReuseConnection = m_NoKeepAlive};
+ HttpClient Http(BaseUri, Settings);
- if (zen::CreateProcResult Cpr = zen::CreateProc(CurExe, fmt::format("bench --purge --single"), Cpo))
+ while (!s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ try
+ {
+ HttpClient::Response Resp = (m_Method == "HEAD") ? Http.Head(Path) : Http.Get(Path);
+
+ if (Resp.IsSuccess())
+ {
+ LatencyHistogram.Update(static_cast<int64_t>(Resp.ElapsedSeconds * 1.0e6));
+ IntervalSuccessCount.fetch_add(1, std::memory_order_relaxed);
+ IntervalDownloadBytes.fetch_add(Resp.DownloadedBytes, std::memory_order_relaxed);
+ IntervalUploadBytes.fetch_add(Resp.UploadedBytes, std::memory_order_relaxed);
+ TotalSuccessCount.fetch_add(1, std::memory_order_relaxed);
+ TotalDownloadBytes.fetch_add(Resp.DownloadedBytes, std::memory_order_relaxed);
+ TotalUploadBytes.fetch_add(Resp.UploadedBytes, std::memory_order_relaxed);
+ }
+ else
{
- zen::ProcessHandle ProcHandle;
- ProcHandle.Initialize(Cpr);
-
- int ExitCode = ProcHandle.WaitExitCode();
-
- if (ExitCode == 0)
- {
- Ok = true;
- }
- else
- {
- ZEN_CONSOLE_ERROR("Elevated child process failed with return code {}", ExitCode);
- }
+ IntervalErrorCount.fetch_add(1, std::memory_order_relaxed);
+ TotalErrorCount.fetch_add(1, std::memory_order_relaxed);
}
}
- catch (const std::exception& Ex)
+ catch (const HttpClientError&)
{
- ZEN_CONSOLE_ERROR("{}", Ex.what());
+ IntervalErrorCount.fetch_add(1, std::memory_order_relaxed);
+ TotalErrorCount.fetch_add(1, std::memory_order_relaxed);
}
}
+ };
- if (Ok)
+ auto ReporterFn = [&]() {
+ while (!s_BenchAbort.load(std::memory_order_relaxed))
{
- // TODO: could also add reporting on just how much memory was purged
- ZEN_CONSOLE("Purged standby lists! (took {})", zen::NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ // Sleep 1s in short increments to stay responsive to abort
+ for (int i = 0; i < 10 && !s_BenchAbort.load(std::memory_order_relaxed); ++i)
+ {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+
+ if (s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ break;
+ }
+
+ // Snapshot and reset per-interval counters
+ int64_t Successes = IntervalSuccessCount.exchange(0);
+ int64_t Errors = IntervalErrorCount.exchange(0);
+ int64_t DownBytes = IntervalDownloadBytes.exchange(0);
+ int64_t UpBytes = IntervalUploadBytes.exchange(0);
+
+ // Snapshot and reset latency histogram
+ uint64_t HistCount = LatencyHistogram.Count();
+ int64_t HistMin = LatencyHistogram.Min();
+ int64_t HistMax = LatencyHistogram.Max();
+ double HistMean = LatencyHistogram.Mean();
+ metrics::SampleSnapshot Snap = LatencyHistogram.Snapshot();
+ LatencyHistogram.Clear();
+
+ // Format elapsed as HH:MM:SS
+ int TotalSec = static_cast<int>(RunTimer.GetElapsedTimeMs() / 1000.0);
+ int Hours = TotalSec / 3600;
+ int Minutes = (TotalSec % 3600) / 60;
+ int Secs = TotalSec % 60;
+
+ if (HistCount > 0)
+ {
+ ZEN_CONSOLE(
+ "[{:02d}:{:02d}:{:02d}] req/s: {:L} errors: {:L} lat(ms): min={:.1f} mean={:.1f} p95={:.1f} p99={:.1f} max={:.1f} down: {}/s up: {}/s",
+ Hours,
+ Minutes,
+ Secs,
+ Successes,
+ Errors,
+ HistMin / 1000.0,
+ HistMean / 1000.0,
+ Snap.Get95Percentile() / 1000.0,
+ Snap.Get99Percentile() / 1000.0,
+ HistMax / 1000.0,
+ NiceBytes(static_cast<uint64_t>(std::max(int64_t{0}, DownBytes))),
+ NiceBytes(static_cast<uint64_t>(std::max(int64_t{0}, UpBytes))));
+ }
+ else
+ {
+ ZEN_CONSOLE("[{:02d}:{:02d}:{:02d}] req/s: 0 errors: {:L} (no successful requests)",
+ Hours,
+ Minutes,
+ Secs,
+ Errors);
+ }
}
+ };
+
+ std::vector<std::thread> Threads;
+ Threads.reserve(m_Concurrency + 1);
+ Threads.emplace_back(ReporterFn);
+
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ Threads.emplace_back(WorkerFn);
}
-#endif
- return;
+ for (std::thread& T : Threads)
+ {
+ T.join();
+ }
+
+ std::signal(SIGINT, PrevSigInt);
+ std::signal(SIGTERM, PrevSigTerm);
+
+ double TotalSeconds = RunTimer.GetElapsedTimeMs() / 1000.0;
+ int64_t TotalCount = TotalSuccessCount.load() + TotalErrorCount.load();
+ uint64_t DownPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalDownloadBytes.load() / TotalSeconds) : 0;
+ uint64_t UpPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalUploadBytes.load() / TotalSeconds) : 0;
+
+ ZEN_CONSOLE("Stopped. Total: {:L} requests, {:L} success, {:L} errors avg throughput: down {}/s up {}/s (elapsed: {:.2f}s)",
+ TotalCount,
+ TotalSuccessCount.load(),
+ TotalErrorCount.load(),
+ NiceBytes(DownPerSec),
+ NiceBytes(UpPerSec),
+ TotalSeconds);
+}
+
+//////////////////////////////////////////////////////////////////////////
+// BenchCommand
+
+BenchCommand::BenchCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("__hidden__", "", "subcommand", "", cxxopts::value<std::string>(m_SubCommand)->default_value(""), "");
+ m_Options.parse_positional({"subcommand"});
+
+ AddSubCommand(m_PurgeSubCmd);
+ AddSubCommand(m_HttpSubCmd);
}
+BenchCommand::~BenchCommand() = default;
+
} // namespace zen
diff --git a/src/zen/cmds/bench_cmd.h b/src/zen/cmds/bench_cmd.h
index 7fbf85340..f332b3fcc 100644
--- a/src/zen/cmds/bench_cmd.h
+++ b/src/zen/cmds/bench_cmd.h
@@ -6,7 +6,36 @@
namespace zen {
-class BenchCommand : public ZenCmdBase
+class BenchPurgeSubCmd : public ZenSubCmdBase
+{
+public:
+ BenchPurgeSubCmd();
+ void Run(const ZenCliOptions& GlobalOptions) override;
+
+private:
+ bool m_SingleProcess = false;
+};
+
+class BenchHttpSubCmd : public ZenSubCmdBase
+{
+public:
+ BenchHttpSubCmd();
+ void Run(const ZenCliOptions& GlobalOptions) override;
+
+private:
+ void RunFixedCount(const std::string& BaseUri, const std::string& Path);
+ void RunContinuous(const std::string& BaseUri, const std::string& Path);
+
+ std::string m_Url;
+ std::string m_SocketPath;
+ int m_Count = 100;
+ int m_Concurrency = 1;
+ std::string m_Method = "GET";
+ bool m_NoKeepAlive = false;
+ bool m_Continuous = false;
+};
+
+class BenchCommand : public ZenCmdWithSubCommands
{
public:
static constexpr char Name[] = "bench";
@@ -15,14 +44,14 @@ public:
BenchCommand();
~BenchCommand();
- virtual void Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
- virtual cxxopts::Options& Options() override { return m_Options; }
- virtual ZenCmdCategory& CommandCategory() const override { return g_UtilitiesCategory; }
+ cxxopts::Options& Options() override { return m_Options; }
+ ZenCmdCategory& CommandCategory() const override { return g_UtilitiesCategory; }
private:
cxxopts::Options m_Options{Name, Description};
- bool m_PurgeStandbyLists = false;
- bool m_SingleProcess = false;
+ std::string m_SubCommand;
+ BenchPurgeSubCmd m_PurgeSubCmd;
+ BenchHttpSubCmd m_HttpSubCmd;
};
} // namespace zen
diff --git a/src/zen/xmake.lua b/src/zen/xmake.lua
index f889c3296..4c134404a 100644
--- a/src/zen/xmake.lua
+++ b/src/zen/xmake.lua
@@ -6,7 +6,7 @@ target("zen")
add_files("**.cpp")
add_files("zen.cpp", {unity_ignored = true })
add_deps("zencore", "zenhttp", "zenremotestore", "zenstore", "zenutil")
- add_deps("zencompute", "zennet")
+ add_deps("zencompute", "zennet", "zentelemetry")
add_deps("cxxopts", "fmt")
add_packages("json11")
add_includedirs(".")
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index 9a466da2e..86c29344e 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -196,6 +196,7 @@ ZenCmdBase::GetSubCommand(cxxopts::Options&,
ZenSubCmdBase::ZenSubCmdBase(std::string_view Name, std::string_view Description)
: m_SubOptions(std::string(Name), std::string(Description))
+, m_Description(Description)
{
m_SubOptions.add_options()("h,help", "Print help");
}
@@ -213,6 +214,35 @@ ZenCmdWithSubCommands::OnParentOptionsParsed(const ZenCliOptions& /*GlobalOption
}
void
+ZenCmdWithSubCommands::PrintHelp()
+{
+ // Show all option groups except the internal "__hidden__" group used to
+ // silently capture positional arguments.
+ std::vector<std::string> Groups = Options().groups();
+ Groups.erase(std::remove(Groups.begin(), Groups.end(), std::string("__hidden__")), Groups.end());
+
+ Options().set_width(TuiConsoleColumns(80));
+ printf("%s\n", Options().help(Groups).c_str());
+
+ // Append subcommand listing.
+ size_t MaxNameLen = 0;
+ for (ZenSubCmdBase* SubCmd : m_SubCommands)
+ {
+ MaxNameLen = std::max(MaxNameLen, SubCmd->SubOptions().program().size());
+ }
+
+ printf("subcommands:\n");
+ for (ZenSubCmdBase* SubCmd : m_SubCommands)
+ {
+ printf(" %-*s %s\n",
+ static_cast<int>(MaxNameLen),
+ SubCmd->SubOptions().program().c_str(),
+ std::string(SubCmd->Description()).c_str());
+ }
+ printf("\nFor global options run: zen --help\n");
+}
+
+void
ZenCmdWithSubCommands::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
std::vector<cxxopts::Options*> SubOptionPtrs;
@@ -226,15 +256,47 @@ ZenCmdWithSubCommands::Run(const ZenCliOptions& GlobalOptions, int argc, char**
std::vector<char*> SubCommandArguments;
int ParentArgc = GetSubCommand(Options(), argc, argv, SubOptionPtrs, MatchedSubOption, SubCommandArguments);
- if (!ParseOptions(Options(), ParentArgc, argv))
+ // Intercept --help/-h in the parent arg range before calling ParseOptions so
+ // we can append subcommand information to the output. When a subcommand was
+ // found argv[ParentArgc-1] is the subcommand name itself, which we exclude.
+ int ParentArgEnd = (MatchedSubOption != nullptr) ? ParentArgc - 1 : ParentArgc;
+ for (int i = 1; i < ParentArgEnd; ++i)
{
- return;
+ std::string_view Arg(argv[i]);
+ if (Arg == "--help" || Arg == "-h")
+ {
+ PrintHelp();
+ return;
+ }
+ }
+
+ // Parse parent options. When a subcommand was matched we strip its name from
+ // the arg list so the parent parser does not see it as an unmatched positional.
+ if (MatchedSubOption != nullptr)
+ {
+ std::vector<char*> ParentArgs;
+ ParentArgs.reserve(static_cast<size_t>(ParentArgc - 1));
+ ParentArgs.push_back(argv[0]);
+ std::copy(argv + 1, argv + ParentArgc - 1, std::back_inserter(ParentArgs));
+ if (!ParseOptions(Options(), static_cast<int>(ParentArgs.size()), ParentArgs.data()))
+ {
+ return;
+ }
+ }
+ else
+ {
+ if (!ParseOptions(Options(), ParentArgc, argv))
+ {
+ return;
+ }
}
if (MatchedSubOption == nullptr)
{
+ PrintHelp();
+
ExtendableStringBuilder<128> VerbList;
- for (bool First = true; ZenSubCmdBase * SubCmd : m_SubCommands)
+ for (bool First = true; ZenSubCmdBase* SubCmd : m_SubCommands)
{
if (!First)
{
@@ -243,7 +305,7 @@ ZenCmdWithSubCommands::Run(const ZenCliOptions& GlobalOptions, int argc, char**
VerbList.Append(SubCmd->SubOptions().program());
First = false;
}
- throw OptionParseException(fmt::format("No subcommand specified. Available subcommands: {}", VerbList.ToView()), Options().help());
+ throw OptionParseException(fmt::format("No subcommand specified. Available subcommands: {}", VerbList.ToView()), {});
}
ZenSubCmdBase* MatchedSubCmd = nullptr;
@@ -621,6 +683,9 @@ main(int argc, char** argv)
Options.add_options()("malloc", "Configure memory allocator subsystem", cxxopts::value(MemoryOptions)->default_value("mimalloc"));
Options.add_options()("help", "Show command line help");
Options.add_options()("c, command", "Sub command", cxxopts::value<std::string>(SubCommand));
+ Options.add_options()("httpclient",
+ "Select HTTP client implementation (e.g. 'curl', 'cpr')",
+ cxxopts::value<std::string>(GlobalOptions.HttpClientBackend)->default_value("cpr"));
int CoreLimit = 0;
@@ -783,6 +848,8 @@ main(int argc, char** argv)
FreeCallstack(Callstack);
});
+ zen::SetDefaultHttpClientBackend(GlobalOptions.HttpClientBackend);
+
zen::MaximizeOpenFileCount();
//////////////////////////////////////////////////////////////////////////
diff --git a/src/zen/zen.h b/src/zen/zen.h
index 06e5356a6..3cc06eea6 100644
--- a/src/zen/zen.h
+++ b/src/zen/zen.h
@@ -17,6 +17,8 @@ struct ZenCliOptions
ZenLoggingConfig LoggingConfig;
+ std::string HttpClientBackend; // Choice of HTTP client implementation (e.g. "curl", "cpr")
+
// Arguments after " -- " on command line are passed through and not parsed
std::string PassthroughCommandLine;
std::string PassthroughArgs;
@@ -86,10 +88,14 @@ public:
ZenSubCmdBase(std::string_view Name, std::string_view Description);
virtual ~ZenSubCmdBase() = default;
cxxopts::Options& SubOptions() { return m_SubOptions; }
+ std::string_view Description() const { return m_Description; }
virtual void Run(const ZenCliOptions& GlobalOptions) = 0;
protected:
cxxopts::Options m_SubOptions;
+
+private:
+ std::string m_Description;
};
// Base for commands that host subcommands - handles all dispatch boilerplate
@@ -101,6 +107,7 @@ public:
protected:
void AddSubCommand(ZenSubCmdBase& SubCmd);
virtual bool OnParentOptionsParsed(const ZenCliOptions& GlobalOptions);
+ void PrintHelp();
private:
std::vector<ZenSubCmdBase*> m_SubCommands;