aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/cache_cmd.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zen/cmds/cache_cmd.cpp')
-rw-r--r--src/zen/cmds/cache_cmd.cpp884
1 files changed, 685 insertions, 199 deletions
diff --git a/src/zen/cmds/cache_cmd.cpp b/src/zen/cmds/cache_cmd.cpp
index a8c15f119..f93a5318c 100644
--- a/src/zen/cmds/cache_cmd.cpp
+++ b/src/zen/cmds/cache_cmd.cpp
@@ -2,25 +2,40 @@
#include "cache_cmd.h"
+#include "zenserviceclient.h"
+
#include <zencore/compactbinarybuilder.h>
#include <zencore/compress.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/process.h>
#include <zencore/scopeguard.h>
+#include <zencore/session.h>
+#include <zencore/stream.h>
#include <zencore/thread.h>
+#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
+#include <zenhttp/formatters.h>
#include <zenhttp/httpclient.h>
#include <zenhttp/httpcommon.h>
#include <zenhttp/packageformat.h>
#include <zenstore/cache/cachepolicy.h>
+#include <zenutil/rpcrecording.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
#include <memory>
#include <random>
namespace zen {
+using namespace std::literals;
+
namespace {
IoBuffer CreateRandomBlob(uint64_t Size)
{
@@ -56,37 +71,112 @@ namespace {
}
} // namespace
-DropCommand::DropCommand()
+////////////////////////////////////////////////////////////////////////////////
+// CacheCommand
+
+CacheCommand::CacheCommand()
{
m_Options.add_options()("h,help", "Print help");
- m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
- m_Options.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), "<namespacename>");
- m_Options.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), "<bucketname>");
- m_Options.parse_positional({"namespace", "bucket"});
+
+ AddSubCommand(m_DetailsSubCmd);
+ AddSubCommand(m_DropSubCmd);
+ AddSubCommand(m_GenSubCmd);
+ AddSubCommand(m_GetSubCmd);
+ AddSubCommand(m_InfoSubCmd);
+ AddSubCommand(m_RecordSubCmd);
+ AddSubCommand(m_ReplaySubCmd);
+ AddSubCommand(m_StatsSubCmd);
}
-DropCommand::~DropCommand() = default;
+CacheCommand::~CacheCommand() = default;
-void
-DropCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+////////////////////////////////////////////////////////////////////////////////
+// CacheSubCmdBase
+
+CacheSubCmdBase::CacheSubCmdBase(std::string_view Name, std::string_view Description) : ZenSubCmdBase(Name, Description)
{
- ZEN_UNUSED(GlobalOptions);
+ m_SubOptions.add_option("", "u", "hosturl", ZenCmdBase::kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
+}
- if (!ParseOptions(argc, argv))
+void
+CacheSubCmdBase::ResolveHost()
+{
+ m_HostName = ZenCmdBase::ResolveTargetHostSpec(m_HostName);
+ if (m_HostName.empty())
{
- return;
+ throw OptionParseException("Unable to resolve server specification", m_SubOptions.help());
}
+}
- m_HostName = ResolveTargetHostSpec(m_HostName);
+////////////////////////////////////////////////////////////////////////////////
+// Legacy shim dispatcher
- if (m_HostName.empty())
+namespace cache_legacy_shim {
+ static void Dispatch(std::span<const std::string_view> Injected, const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
- throw OptionParseException("Unable to resolve server specification", m_Options.help());
+ // cxxopts treats argv as writable char** in the style of C main(argv).
+ // Stage the injected tokens in writable std::string storage so we never
+ // hand out pointers to string literals.
+ std::vector<std::string> Storage;
+ Storage.reserve(Injected.size());
+ for (std::string_view Token : Injected)
+ {
+ Storage.emplace_back(Token);
+ }
+
+ std::vector<char*> NewArgv;
+ NewArgv.reserve(static_cast<size_t>(argc) + Storage.size());
+ NewArgv.push_back(argv[0]);
+ for (std::string& Token : Storage)
+ {
+ NewArgv.push_back(Token.data());
+ }
+ for (int i = 1; i < argc; ++i)
+ {
+ NewArgv.push_back(argv[i]);
+ }
+
+ CacheCommand Impl;
+ Impl.Run(GlobalOptions, static_cast<int>(NewArgv.size()), NewArgv.data());
}
+ void RunAs(const char* SubCommandName, const ZenCliOptions& GlobalOptions, int argc, char** argv)
+ {
+ const std::string_view Tokens[] = {std::string_view(SubCommandName)};
+ Dispatch(Tokens, GlobalOptions, argc, argv);
+ }
+} // namespace cache_legacy_shim
+
+// RpcStopRecordingCommand is unique among legacy shims in that it needs to
+// inject two tokens ("record" and "stop") rather than a single subcommand name.
+void
+RpcStopRecordingCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ using namespace std::literals;
+ const std::string_view Tokens[] = {"record"sv, "stop"sv};
+ cache_legacy_shim::Dispatch(Tokens, GlobalOptions, argc, argv);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// CacheDropSubCmd
+
+CacheDropSubCmd::CacheDropSubCmd() : CacheSubCmdBase("drop", "Drop cache namespace or bucket")
+{
+ m_SubOptions.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), "<namespacename>");
+ m_SubOptions.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), "<bucketname>");
+ m_SubOptions.parse_positional({"namespace", "bucket"});
+}
+
+void
+CacheDropSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
+{
+ ResolveHost();
+ ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "drop"});
+ HttpClient& Http = Service.Http();
+
if (m_NamespaceName.empty())
{
- throw OptionParseException("'--namespace' is required", m_Options.help());
+ throw OptionParseException("'--namespace' is required", m_SubOptions.help());
}
std::string Url;
@@ -94,18 +184,16 @@ DropCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (m_BucketName.empty())
{
- DropDescription = fmt::format("cache namespace '{}' from '{}'", m_NamespaceName, m_HostName);
+ DropDescription = fmt::format("cache namespace '{}' from '{}'", m_NamespaceName, Service.HostSpec());
Url = fmt::format("/z$/{}", m_NamespaceName);
}
else
{
- DropDescription = fmt::format("cache bucket '{}/{}' from '{}'", m_NamespaceName, m_BucketName, m_HostName);
+ DropDescription = fmt::format("cache bucket '{}/{}' from '{}'", m_NamespaceName, m_BucketName, Service.HostSpec());
Url = fmt::format("/z$/{}/{}", m_NamespaceName, m_BucketName);
}
ZEN_CONSOLE("Dropping {}", DropDescription);
-
- HttpClient Http = CreateHttpClient(m_HostName);
if (HttpClient::Response Response = Http.Delete(Url))
{
ZEN_CONSOLE("{}", Response.ToText());
@@ -116,54 +204,43 @@ DropCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
-CacheInfoCommand::CacheInfoCommand()
+////////////////////////////////////////////////////////////////////////////////
+// CacheInfoSubCmd
+
+CacheInfoSubCmd::CacheInfoSubCmd() : CacheSubCmdBase("info", "Info on cache, namespace or bucket")
{
- m_Options.add_options()("h,help", "Print help");
- m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
- m_Options.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), "<namespacename>");
- m_Options.add_option("",
- "",
- "bucketsizes",
- "Comma delimited list of bucket names to get size info from, * to get info on all buckets",
- cxxopts::value(m_SizeInfoBucketNames),
- "<bucketnames>");
- m_Options.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), "<bucketname>");
- m_Options.add_option("", "", "bucketsize", "Show detailed bucket size info", cxxopts::value(m_BucketSizeInfo), "<bucketsize>");
-
- m_Options.parse_positional({"namespace", "bucket"});
+ m_SubOptions.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), "<namespacename>");
+ m_SubOptions.add_option("",
+ "",
+ "bucketsizes",
+ "Comma delimited list of bucket names to get size info from, * to get info on all buckets",
+ cxxopts::value(m_SizeInfoBucketNames),
+ "<bucketnames>");
+ m_SubOptions.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), "<bucketname>");
+ m_SubOptions.add_option("", "", "bucketsize", "Show detailed bucket size info", cxxopts::value(m_BucketSizeInfo), "<bucketsize>");
+ m_SubOptions.add_option("", "y", "yaml", "Output as YAML instead of JSON", cxxopts::value(m_YAML), "<yaml>");
+ m_SubOptions.parse_positional({"namespace", "bucket"});
}
-CacheInfoCommand::~CacheInfoCommand() = default;
-
void
-CacheInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+CacheInfoSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
{
- ZEN_UNUSED(GlobalOptions);
-
- if (!ParseOptions(argc, argv))
- {
- return;
- }
-
- m_HostName = ResolveTargetHostSpec(m_HostName);
-
- if (m_HostName.empty())
- {
- throw OptionParseException("Unable to resolve server specification", m_Options.help());
- }
+ ResolveHost();
+ ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "info"});
+ HttpClient& Http = Service.Http();
std::string Url;
- if (m_HostName.empty())
+ if (m_NamespaceName.empty())
{
if (!m_SizeInfoBucketNames.empty())
{
- throw OptionParseException("'--bucketsizes' requires '--namespace'", m_Options.help());
+ throw OptionParseException("'--bucketsizes' requires '--namespace'", m_SubOptions.help());
}
if (m_BucketSizeInfo)
{
- throw OptionParseException("'--bucketsize' requires '--namespace' and '--bucket'", m_Options.help());
+ throw OptionParseException("'--bucketsize' requires '--namespace' and '--bucket'", m_SubOptions.help());
}
- ZEN_CONSOLE("Info on cache from '{}'", m_HostName);
+ ZEN_CONSOLE("Info on cache from '{}'", Service.HostSpec());
Url = "/z$";
}
else if (m_BucketName.empty())
@@ -171,18 +248,18 @@ CacheInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (m_BucketSizeInfo)
{
throw OptionParseException(fmt::format("'--bucketsize' requires '--namespace' and '--bucket' ('{}')", m_BucketName),
- m_Options.help());
+ m_SubOptions.help());
}
- ZEN_CONSOLE("Info on cache namespace '{}' from '{}'", m_NamespaceName, m_HostName);
+ ZEN_CONSOLE("Info on cache namespace '{}' from '{}'", m_NamespaceName, Service.HostSpec());
Url = fmt::format("/z$/{}", m_NamespaceName);
}
else
{
if (!m_SizeInfoBucketNames.empty())
{
- throw OptionParseException("'--bucketsizes' conflicts with '--bucket'", m_Options.help());
+ throw OptionParseException("'--bucketsizes' conflicts with '--bucket'", m_SubOptions.help());
}
- ZEN_CONSOLE("Info on cache bucket '{}/{}' from '{}'", m_NamespaceName, m_BucketName, m_HostName);
+ ZEN_CONSOLE("Info on cache bucket '{}/{}' from '{}'", m_NamespaceName, m_BucketName, Service.HostSpec());
Url = fmt::format("/z$/{}/{}", m_NamespaceName, m_BucketName);
}
@@ -196,8 +273,9 @@ CacheInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
Parameters.Entries.insert({"bucketsize", "true"});
}
- HttpClient Http = CreateHttpClient(m_HostName);
- if (HttpClient::Response Response = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON), Parameters))
+ const ZenContentType AcceptType = m_YAML ? ZenContentType::kYAML : ZenContentType::kJSON;
+
+ if (HttpClient::Response Response = Http.Get(Url, HttpClient::Accept(AcceptType), Parameters))
{
ZEN_CONSOLE("{}", Response.ToText());
}
@@ -207,76 +285,62 @@ CacheInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
-CacheStatsCommand::CacheStatsCommand()
+////////////////////////////////////////////////////////////////////////////////
+// CacheStatsSubCmd
+
+CacheStatsSubCmd::CacheStatsSubCmd() : CacheSubCmdBase("stats", "Stats on cache")
{
- m_Options.add_options()("h,help", "Print help");
- m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
+ m_SubOptions.add_option("", "y", "yaml", "Output as YAML instead of JSON", cxxopts::value(m_YAML), "<yaml>");
}
-CacheStatsCommand::~CacheStatsCommand() = default;
-
void
-CacheStatsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+CacheStatsSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
{
- ZEN_UNUSED(GlobalOptions);
+ ResolveHost();
+ ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "stats"});
+ HttpClient& Http = Service.Http();
- if (!ParseOptions(argc, argv))
- {
- return;
- }
-
- m_HostName = ResolveTargetHostSpec(m_HostName);
-
- if (m_HostName.empty())
- {
- throw OptionParseException("Unable to resolve server specification", m_Options.help());
- }
+ const ZenContentType AcceptType = m_YAML ? ZenContentType::kYAML : ZenContentType::kJSON;
- HttpClient Http = CreateHttpClient(m_HostName);
- if (HttpClient::Response Response = Http.Get("/stats/z$", HttpClient::Accept(ZenContentType::kJSON)))
+ if (HttpClient::Response Response = Http.Get("/stats/z$", HttpClient::Accept(AcceptType)))
{
ZEN_CONSOLE("{}", Response.ToText());
}
else
{
- Response.ThrowError("Info failed");
+ Response.ThrowError("Stats failed");
}
}
-CacheDetailsCommand::CacheDetailsCommand()
+////////////////////////////////////////////////////////////////////////////////
+// CacheDetailsSubCmd
+
+CacheDetailsSubCmd::CacheDetailsSubCmd() : CacheSubCmdBase("details", "Details on cache")
{
- m_Options.add_options()("h,help", "Print help");
- m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
- m_Options.add_option("", "c", "csv", "Info on csv format", cxxopts::value(m_CSV), "<csv>");
- m_Options.add_option("", "d", "details", "Get detailed information about records", cxxopts::value(m_Details), "<details>");
- m_Options.add_option("",
- "a",
- "attachmentdetails",
- "Get detailed information about attachments",
- cxxopts::value(m_AttachmentDetails),
- "<attachmentdetails>");
- m_Options.add_option("", "n", "namespace", "Namespace name to get info for", cxxopts::value(m_Namespace), "<namespace>");
- m_Options.add_option("", "b", "bucket", "Filter on bucket name", cxxopts::value(m_Bucket), "<bucket>");
- m_Options.add_option("", "v", "valuekey", "Filter on value key hash string", cxxopts::value(m_ValueKey), "<valuekey>");
+ m_SubOptions.add_option("", "c", "csv", "Output as CSV instead of JSON", cxxopts::value(m_CSV), "<csv>");
+ m_SubOptions.add_option("", "y", "yaml", "Output as YAML instead of JSON", cxxopts::value(m_YAML), "<yaml>");
+ m_SubOptions.add_option("", "d", "details", "Get detailed information about records", cxxopts::value(m_Details), "<details>");
+ m_SubOptions.add_option("",
+ "a",
+ "attachmentdetails",
+ "Get detailed information about attachments",
+ cxxopts::value(m_AttachmentDetails),
+ "<attachmentdetails>");
+ m_SubOptions.add_option("", "n", "namespace", "Namespace name to get info for", cxxopts::value(m_Namespace), "<namespace>");
+ m_SubOptions.add_option("", "b", "bucket", "Filter on bucket name", cxxopts::value(m_Bucket), "<bucket>");
+ m_SubOptions.add_option("", "v", "valuekey", "Filter on value key hash string", cxxopts::value(m_ValueKey), "<valuekey>");
}
-CacheDetailsCommand::~CacheDetailsCommand() = default;
-
void
-CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+CacheDetailsSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
{
- ZEN_UNUSED(GlobalOptions);
+ ResolveHost();
+ ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "details"});
+ HttpClient& Http = Service.Http();
- if (!ParseOptions(argc, argv))
+ if (m_CSV && m_YAML)
{
- return;
- }
-
- m_HostName = ResolveTargetHostSpec(m_HostName);
-
- if (m_HostName.empty())
- {
- throw OptionParseException("Unable to resolve server specification", m_Options.help());
+ throw OptionParseException("'--csv' conflicts with '--yaml'", m_SubOptions.help());
}
HttpClient::KeyValueMap Parameters;
@@ -296,7 +360,7 @@ CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar
}
else
{
- Headers = HttpClient::Accept(ZenContentType::kJSON);
+ Headers = HttpClient::Accept(m_YAML ? ZenContentType::kYAML : ZenContentType::kJSON);
}
std::string Url;
@@ -304,11 +368,11 @@ CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar
{
if (m_Namespace.empty())
{
- throw OptionParseException("'--namespace' is required", m_Options.help());
+ throw OptionParseException("'--namespace' is required", m_SubOptions.help());
}
if (m_Bucket.empty())
{
- throw OptionParseException("'--bucket' is required", m_Options.help());
+ throw OptionParseException("'--bucket' is required", m_SubOptions.help());
}
Url = fmt::format("/z$/details$/{}/{}/{}", m_Namespace, m_Bucket, m_ValueKey);
}
@@ -316,7 +380,7 @@ CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar
{
if (m_Namespace.empty())
{
- throw OptionParseException("'--namespace' is required", m_Options.help());
+ throw OptionParseException("'--namespace' is required", m_SubOptions.help());
}
Url = fmt::format("/z$/details$/{}/{}", m_Namespace, m_Bucket);
}
@@ -329,61 +393,49 @@ CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar
Url = "/z$/details$";
}
- HttpClient Http = CreateHttpClient(m_HostName);
if (HttpClient::Response Response = Http.Get(Url, Headers, Parameters))
{
ZEN_CONSOLE("{}", Response.ToText());
}
else
{
- Response.ThrowError("Info failed");
+ Response.ThrowError("Details failed");
}
}
-CacheGenerateCommand::CacheGenerateCommand()
+////////////////////////////////////////////////////////////////////////////////
+// CacheGenSubCmd
+
+CacheGenSubCmd::CacheGenSubCmd() : CacheSubCmdBase("gen", "Generates cache values into a bucket")
{
- m_Options.add_options()("h,help", "Print help");
- m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
- m_Options
+ m_SubOptions
.add_option("", "n", "namespace", "Namespace to generate cache values/records for", cxxopts::value(m_Namespace), "<namespace>");
- m_Options.add_option("", "b", "bucket", "Bucket name to generate cache values/records for", cxxopts::value(m_Bucket), "<bucket>");
- m_Options.add_option("", "", "count", "Number of cache values/records to generate", cxxopts::value(m_Count), "<count>");
- m_Options.add_option("", "", "min-size", "Minimum size of cache value/attachments", cxxopts::value(m_MinSize), "<min>");
- m_Options.add_option("", "", "max-size", "Maximum size of cache value/attachments", cxxopts::value(m_MaxSize), "<max>");
- m_Options.add_option("",
- "",
- "min-attachments",
- "Minimum number of attachments when creating record based values",
- cxxopts::value(m_MinAttachmentCount),
- "<minattachments>");
- m_Options.add_option("",
- "",
- "max-attachments",
- "Minimum number of attachments when creating record based values, 0 to only create cache values",
- cxxopts::value(m_MaxAttachmentCount),
- "<maxattachments>");
- m_Options.parse_positional({"namespace", "bucket", "count"});
- m_Options.positional_help("namespace bucket count");
+ m_SubOptions.add_option("", "b", "bucket", "Bucket name to generate cache values/records for", cxxopts::value(m_Bucket), "<bucket>");
+ m_SubOptions.add_option("", "", "count", "Number of cache values/records to generate", cxxopts::value(m_Count), "<count>");
+ m_SubOptions.add_option("", "", "min-size", "Minimum size of cache value/attachments", cxxopts::value(m_MinSize), "<min>");
+ m_SubOptions.add_option("", "", "max-size", "Maximum size of cache value/attachments", cxxopts::value(m_MaxSize), "<max>");
+ m_SubOptions.add_option("",
+ "",
+ "min-attachments",
+ "Minimum number of attachments when creating record based values",
+ cxxopts::value(m_MinAttachmentCount),
+ "<minattachments>");
+ m_SubOptions.add_option("",
+ "",
+ "max-attachments",
+ "Minimum number of attachments when creating record based values, 0 to only create cache values",
+ cxxopts::value(m_MaxAttachmentCount),
+ "<maxattachments>");
+ m_SubOptions.parse_positional({"namespace", "bucket", "count"});
+ m_SubOptions.positional_help("namespace bucket count");
}
-CacheGenerateCommand::~CacheGenerateCommand() = default;
-
void
-CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+CacheGenSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
{
- ZEN_UNUSED(GlobalOptions);
-
- if (!ParseOptions(argc, argv))
- {
- return;
- }
-
- m_HostName = ResolveTargetHostSpec(m_HostName);
-
- if (m_HostName.empty())
- {
- throw OptionParseException("Unable to resolve server specification", m_Options.help());
- }
+ ResolveHost();
+ ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "gen"});
+ HttpClient& Http = Service.Http();
if (m_MaxSize == 0 && m_MinSize == 0)
{
@@ -400,6 +452,16 @@ CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
}
}
+ // The size-range expansion below requires MinSize >= 1 (it uses
+ // `MinSize - 1` as a uniform distribution upper bound, which would
+ // underflow on an unsigned zero) and MaxSize >= MinSize.
+ if (m_MinSize == 0 || m_MaxSize < m_MinSize)
+ {
+ throw OptionParseException(
+ fmt::format("'--min-size' ({}) must be >= 1 and '--max-size' ({}) must be >= '--min-size'", m_MinSize, m_MaxSize),
+ m_SubOptions.help());
+ }
+
std::vector<std::uniform_int_distribution<uint64_t>> Variations;
std::vector<size_t> SizeRanges;
SizeRanges.push_back(m_MinSize);
@@ -431,8 +493,6 @@ CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
std::uniform_int_distribution<uint64_t> KeyDistribution;
- HttpClient Http = CreateHttpClient(m_HostName);
-
auto GeneratePutCacheValueRequest(
[this, &KeyDistribution, &Generator](std::span<std::uint64_t> BatchSizes, uint64_t RequestIndex) -> CbPackage {
CbPackage Package;
@@ -583,68 +643,56 @@ CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
}
}
-CacheGetCommand::CacheGetCommand()
+////////////////////////////////////////////////////////////////////////////////
+// CacheGetSubCmd
+
+CacheGetSubCmd::CacheGetSubCmd() : CacheSubCmdBase("get", "Get cache values/records or attachments from a bucket")
{
- m_Options.add_options()("h,help", "Print help");
- m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
- m_Options
- .add_option("", "n", "namespace", "Namespace to generate cache values/records for", cxxopts::value(m_Namespace), "<namespace>");
- m_Options.add_option("", "b", "bucket", "Bucket name to generate cache values/records for", cxxopts::value(m_Bucket), "<bucket>");
- m_Options.add_option("", "v", "valuekey", "Cache entry iohash id", cxxopts::value(m_ValueKey), "<valuekey>");
- m_Options.add_option("",
- "a",
- "attachmenthash",
- "For a cache entry record, get a particular attachment based on the 'RawHash'",
- cxxopts::value(m_AttachmentHash),
- "<attachmenthash>");
- m_Options.add_option("", "o", "output-path", "File path for output data", cxxopts::value(m_OutputPath), "<path>");
- m_Options.add_option("", "t", "text", "Ouput content of cache entry record as text", cxxopts::value(m_AsText), "<text>");
- m_Options
+ m_SubOptions.add_option("", "n", "namespace", "Namespace of the cache entry", cxxopts::value(m_Namespace), "<namespace>");
+ m_SubOptions.add_option("", "b", "bucket", "Bucket of the cache entry", cxxopts::value(m_Bucket), "<bucket>");
+ m_SubOptions.add_option("", "v", "valuekey", "Cache entry iohash id", cxxopts::value(m_ValueKey), "<valuekey>");
+ m_SubOptions.add_option("",
+ "a",
+ "attachmenthash",
+ "For a cache entry record, get a particular attachment based on the 'RawHash'",
+ cxxopts::value(m_AttachmentHash),
+ "<attachmenthash>");
+ m_SubOptions.add_option("", "o", "output-path", "File path for output data", cxxopts::value(m_OutputPath), "<path>");
+ m_SubOptions.add_option("", "t", "text", "Output content of cache entry record as text", cxxopts::value(m_AsText), "<text>");
+ m_SubOptions
.add_option("", "d", "decompress", "Decompress data when applicable. Default = true", cxxopts::value(m_Decompress), "<decompress>");
- m_Options.parse_positional({"namespace", "bucket", "valuekey", "attachmenthash"});
- m_Options.positional_help("namespace bucket valuekey attachmenthash");
+ m_SubOptions.parse_positional({"namespace", "bucket", "valuekey", "attachmenthash"});
+ m_SubOptions.positional_help("namespace bucket valuekey attachmenthash");
}
-CacheGetCommand::~CacheGetCommand() = default;
-
void
-CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+CacheGetSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
{
- ZEN_UNUSED(GlobalOptions);
-
using namespace std::literals;
- if (!ParseOptions(argc, argv))
- {
- return;
- }
-
- m_HostName = ResolveTargetHostSpec(m_HostName);
-
- if (m_HostName.empty())
- {
- throw OptionParseException("Unable to resolve server specification", m_Options.help());
- }
+ ResolveHost();
+ ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "get"});
+ HttpClient& Http = Service.Http();
if (m_Namespace.empty())
{
- throw OptionParseException("'--namespace' is required", m_Options.help());
+ throw OptionParseException("'--namespace' is required", m_SubOptions.help());
}
if (m_Bucket.empty())
{
- throw OptionParseException("'--bucket' is required", m_Options.help());
+ throw OptionParseException("'--bucket' is required", m_SubOptions.help());
}
if (m_ValueKey.empty())
{
- throw OptionParseException("'--valuekey' is required", m_Options.help());
+ throw OptionParseException("'--valuekey' is required", m_SubOptions.help());
}
IoHash ValueId;
if (!IoHash::TryParse(m_ValueKey, ValueId))
{
- throw OptionParseException(fmt::format("'--value-key' ('{}') is malformed", m_ValueKey), m_Options.help());
+ throw OptionParseException(fmt::format("'--valuekey' ('{}') is malformed", m_ValueKey), m_SubOptions.help());
}
IoHash AttachmentHash;
@@ -652,11 +700,14 @@ CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
if (!IoHash::TryParse(m_AttachmentHash, AttachmentHash))
{
- throw OptionParseException(fmt::format("'--attachmenthash' ('{}') is malformed", m_AttachmentHash), m_Options.help());
+ throw OptionParseException(fmt::format("'--attachmenthash' ('{}') is malformed", m_AttachmentHash), m_SubOptions.help());
}
}
- HttpClient Http = CreateHttpClient(m_HostName);
+ if (m_OutputPath.empty() && !m_AsText)
+ {
+ throw OptionParseException("'--output-path' is required (or pass '--as-text' to print to stdout)", m_SubOptions.help());
+ }
if (!m_OutputPath.empty())
{
@@ -669,18 +720,19 @@ CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
CreateDirectories(m_OutputPath.parent_path());
}
}
- if (m_OutputPath.empty())
- {
- m_OutputPath = (m_AttachmentHash.empty() ? m_ValueKey : m_AttachmentHash);
- }
- std::string Url = fmt::format("/z$/{}/{}/{}", m_Namespace, m_Bucket, m_ValueKey);
+ std::string Url = fmt::format("/z$/{}/{}/{}", m_Namespace, m_Bucket, ValueId);
if (AttachmentHash != IoHash::Zero)
{
Url = fmt::format("{}/{}", Url, AttachmentHash);
}
if (HttpClient::Response Result = Http.Download(Url, std::filesystem::temp_directory_path()); Result)
{
+ // `Http.Download` parks the payload in the system temp dir and returns
+ // a buffer that already has delete-on-close set, so every exit path
+ // (exception, fallback WriteFile, `--as-text` console print) reaps it.
+ // A successful MoveToFile below clears the flag so the payload's
+ // handle-close doesn't delete the caller's output afterwards.
auto TryDecompress = [](const IoBuffer& Buffer) -> IoBuffer {
IoHash RawHash;
uint64_t RawSize;
@@ -707,7 +759,17 @@ CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
else
{
- if (!MoveToFile(m_OutputPath, ChunkData))
+ if (std::error_code MoveEc = MoveToFile(m_OutputPath, ChunkData); MoveEc)
+ {
+ // The file was renamed into place; clearing DeleteOnClose prevents
+ // the move'd-out file at m_OutputPath from being deleted when the
+ // payload's handle closes. When m_Decompress is false ChunkData
+ // shares a core with ResponsePayload so either clear suffices;
+ // when decompressed ChunkData is in-memory and MoveToFile would
+ // have failed, so we don't reach this branch.
+ Result.ResponsePayload.SetDeleteOnClose(false);
+ }
+ else
{
WriteFile(m_OutputPath, ChunkData);
}
@@ -720,4 +782,428 @@ CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
+////////////////////////////////////////////////////////////////////////////////
+// CacheRecordSubCmd
+
+CacheRecordSubCmd::CacheRecordSubCmd()
+: CacheSubCmdBase("record", "Start recording cache rpc requests ('cache record <path>'), or stop ('cache record stop')")
+{
+ m_SubOptions.add_option("", "p", "path", "Recording file path, or 'stop' to stop recording", cxxopts::value(m_Path), "<path>");
+ m_SubOptions.parse_positional("path");
+ m_SubOptions.positional_help("<path>|stop");
+}
+
+void
+CacheRecordSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
+{
+ ResolveHost();
+ ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "record"});
+ HttpClient& Http = Service.Http();
+
+ if (m_Path == "stop")
+ {
+ if (HttpClient::Response Response = Http.Post("/z$/exec$/stop-recording"sv))
+ {
+ ZEN_CONSOLE("{}", Response.ToText());
+ }
+ else
+ {
+ Response.ThrowError("Failed to stop recording");
+ }
+ return;
+ }
+
+ if (m_Path.empty())
+ {
+ throw OptionParseException("recording path is required (use '<path>' to start, 'stop' to stop)", m_SubOptions.help());
+ }
+
+ if (HttpClient::Response Response =
+ Http.Post("/z$/exec$/start-recording"sv, HttpClient::KeyValueMap{}, HttpClient::KeyValueMap({{"path", m_Path}})))
+ {
+ ZEN_CONSOLE("{}", Response.ToText());
+ }
+ else
+ {
+ Response.ThrowError("Failed to start recording");
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// CacheReplaySubCmd
+
+CacheReplaySubCmd::CacheReplaySubCmd() : CacheSubCmdBase("replay", "Replays a previously recorded session of rpc requests")
+{
+ m_SubOptions.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), "<path>");
+ m_SubOptions.add_option("", "", "dry", "Do a dry run", cxxopts::value(m_DryRun), "<enable>");
+ m_SubOptions.add_option("",
+ "w",
+ "numthreads",
+ "Number of worker threads per process",
+ cxxopts::value(m_ThreadCount)->default_value(fmt::format("{}", GetHardwareConcurrency())),
+ "<count>");
+ m_SubOptions.add_option("", "", "onhost", "Replay on host, bypassing http/network layer", cxxopts::value(m_OnHost), "<onhost>");
+ m_SubOptions.add_option("",
+ "",
+ "showmethodstats",
+ "Show statistics of which RPC methods are used",
+ cxxopts::value(m_ShowMethodStats),
+ "<showmethodstats>");
+ m_SubOptions.add_option("",
+ "",
+ "offset",
+ "Offset into request recording to start replay",
+ cxxopts::value(m_Offset)->default_value("0"),
+ "<offset>");
+ m_SubOptions.add_option("",
+ "",
+ "stride",
+ "Stride for request recording when replaying requests",
+ cxxopts::value(m_Stride)->default_value("1"),
+ "<stride>");
+ m_SubOptions.add_option("", "", "numproc", "Number of worker processes", cxxopts::value(m_ProcessCount)->default_value("1"), "<count>");
+ m_SubOptions.add_option("",
+ "",
+ "forceallowlocalrefs",
+ "Force enable local refs in requests",
+ cxxopts::value(m_ForceAllowLocalRefs),
+ "<enable>");
+ m_SubOptions
+ .add_option("", "", "disablelocalrefs", "Force disable local refs in requests", cxxopts::value(m_DisableLocalRefs), "<enable>");
+ m_SubOptions.add_option("",
+ "",
+ "forceallowlocalhandlerefs",
+ "Force enable local refs as handles in requests",
+ cxxopts::value(m_ForceAllowLocalHandleRef),
+ "<enable>");
+ m_SubOptions.add_option("",
+ "",
+ "disablelocalhandlerefs",
+ "Force disable local refs as handles in requests",
+ cxxopts::value(m_DisableLocalHandleRefs),
+ "<enable>");
+ m_SubOptions.add_option("",
+ "",
+ "forceallowpartiallocalrefs",
+ "Force enable local refs for all sizes",
+ cxxopts::value(m_ForceAllowPartialLocalRefs),
+ "<enable>");
+ m_SubOptions.add_option("",
+ "",
+ "disablepartiallocalrefs",
+ "Force disable local refs for all sizes",
+ cxxopts::value(m_DisablePartialLocalRefs),
+ "<enable>");
+ m_SubOptions.parse_positional("path");
+}
+
+void
+CacheReplaySubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
+{
+ if (m_RecordingPath.empty())
+ {
+ throw OptionParseException("'--path' is required", m_SubOptions.help());
+ }
+
+ if (!IsDir(m_RecordingPath))
+ {
+ throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath));
+ }
+
+ if (m_Stride == 0)
+ {
+ throw OptionParseException("'--stride' must be >= 1", m_SubOptions.help());
+ }
+
+ m_ThreadCount = Max(m_ThreadCount, 1);
+
+ ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "replay"});
+ m_HostName = Service.HostSpec();
+
+ ZEN_CONSOLE("Replay '{}' (start offset {}, stride {}) to '{}', {} threads",
+ m_RecordingPath,
+ m_Offset,
+ m_Stride,
+ m_HostName,
+ m_ThreadCount);
+
+ Stopwatch TotalTimer;
+
+ if (m_OnHost)
+ {
+ HttpClient& Http = Service.Http();
+ if (HttpClient::Response Response =
+ Http.Post("/z$/exec$/replay-recording"sv,
+ HttpClient::KeyValueMap{},
+ HttpClient::KeyValueMap({{"path", m_RecordingPath}, {"thread-count", fmt::format("{}", m_ThreadCount)}})))
+ {
+ ZEN_CONSOLE("{}", Response.ToText());
+
+ return;
+ }
+ else
+ {
+ Response.ThrowError("Failed to start replay");
+ }
+ }
+
+ std::unique_ptr<cache::IRpcRequestReplayer> Replayer = cache::MakeDiskRequestReplayer(m_RecordingPath, true);
+ uint64_t EntryCount = Replayer->GetRequestCount();
+
+ if (m_Offset >= EntryCount)
+ {
+ ZEN_CONSOLE("Offset {} is at or past the end of the recording ({} entries); nothing to replay", m_Offset, EntryCount);
+ return;
+ }
+
+ std::atomic_uint64_t EntryOffset = m_Offset;
+ std::atomic_uint64_t BytesSent = 0;
+ std::atomic_uint64_t BytesReceived = 0;
+
+ Stopwatch Timer;
+
+ // The subcommand API does not receive argv, so look the zen executable path
+ // up from the current process to spawn child workers.
+ const std::filesystem::path SelfExePath = GetRunningExecutablePath();
+
+ if (m_ProcessCount > 1)
+ {
+ std::vector<std::unique_ptr<ProcessHandle>> WorkerProcesses;
+ WorkerProcesses.resize(m_ProcessCount);
+
+ ProcessMonitor Monitor;
+ for (int ProcessIndex = 0; ProcessIndex < m_ProcessCount; ++ProcessIndex)
+ {
+ std::string CommandLine =
+ fmt::format("{} cache replay --hosturl {} --path \"{}\" --offset {} --stride {} --numthreads {} --numproc {}"sv,
+ SelfExePath.string(),
+ m_HostName,
+ m_RecordingPath,
+ m_Stride == 1 ? 0 : m_Offset + ProcessIndex,
+ m_Stride,
+ m_ThreadCount,
+ 1);
+ CreateProcResult Result(CreateProc(SelfExePath, CommandLine));
+ WorkerProcesses[ProcessIndex] = std::make_unique<ProcessHandle>();
+ WorkerProcesses[ProcessIndex]->Initialize(Result);
+ Monitor.AddPid(WorkerProcesses[ProcessIndex]->Pid());
+ }
+ while (Monitor.IsRunning())
+ {
+ ZEN_CONSOLE("Waiting for worker processes...");
+ Sleep(1000);
+ }
+ return;
+ }
+ else
+ {
+ std::map<std::string, size_t> MethodTypes;
+ RwLock MethodTypesLock;
+
+ WorkerThreadPool WorkerPool(m_ThreadCount);
+
+ Latch WorkLatch(m_ThreadCount);
+ for (int WorkerIndex = 0; WorkerIndex < m_ThreadCount; ++WorkerIndex)
+ {
+ WorkerPool.ScheduleWork(
+ [this, &WorkLatch, EntryCount, &EntryOffset, &Replayer, &BytesSent, &BytesReceived, &MethodTypes, &MethodTypesLock]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+
+ std::map<std::string, size_t> LocalMethodTypes;
+
+ auto ReduceTypes = MakeGuard([&] {
+ RwLock::ExclusiveLockScope __(MethodTypesLock);
+
+ for (auto& Entry : LocalMethodTypes)
+ {
+ MethodTypes[Entry.first] += Entry.second;
+ }
+ });
+
+ HttpClient Http = CacheCommand::CreateHttpClient(m_HostName);
+
+ uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride);
+ while (EntryIndex < EntryCount)
+ {
+ IoBuffer Payload;
+ const zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload);
+
+ if (RequestInfo != zen::cache::RecordedRequestInfo::NullRequest)
+ {
+ CbPackage RequestPackage;
+ CbObject Request;
+
+ switch (RequestInfo.ContentType)
+ {
+ case ZenContentType::kCbPackage:
+ {
+ if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage))
+ {
+ Request = RequestPackage.GetObject();
+ }
+ }
+ break;
+ case ZenContentType::kCbObject:
+ {
+ Request = LoadCompactBinaryObject(Payload);
+ }
+ break;
+ }
+
+ RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u));
+ int OriginalProcessPid = Request["Pid"sv].AsInt32(0);
+
+ int AdjustedPid = 0;
+ RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone;
+
+ if (!m_DisableLocalRefs)
+ {
+ if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) ||
+ m_ForceAllowLocalRefs)
+ {
+ AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences;
+ if (!m_DisablePartialLocalRefs)
+ {
+ if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) ||
+ m_ForceAllowPartialLocalRefs)
+ {
+ AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences;
+ }
+ }
+ if (!m_DisableLocalHandleRefs)
+ {
+ if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef)
+ {
+ AdjustedPid = GetCurrentProcessId();
+ }
+ }
+ }
+ }
+
+ if (m_ShowMethodStats)
+ {
+ std::string MethodName = std::string(Request["Method"sv].AsString());
+ if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end())
+ {
+ It->second++;
+ }
+ else
+ {
+ LocalMethodTypes[MethodName] = 1;
+ }
+ }
+
+ if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid)
+ {
+ CbObjectWriter RequestCopyWriter;
+ for (const CbFieldView& Field : Request)
+ {
+ if (!Field.HasName())
+ {
+ RequestCopyWriter.AddField(Field);
+ continue;
+ }
+ std::string_view FieldName = Field.GetName();
+ if (FieldName == "Pid"sv)
+ {
+ continue;
+ }
+ if (FieldName == "AcceptFlags"sv)
+ {
+ continue;
+ }
+ RequestCopyWriter.AddField(FieldName, Field);
+ }
+ if (AdjustedPid != 0)
+ {
+ RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid);
+ }
+ if (AdjustedAcceptOptions != RpcAcceptOptions::kNone)
+ {
+ RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions));
+ }
+
+ if (RequestInfo.ContentType == ZenContentType::kCbPackage)
+ {
+ RequestPackage.SetObject(RequestCopyWriter.Save());
+ std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage);
+ std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end());
+ Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer();
+ }
+ else
+ {
+ RequestCopyWriter.Finalize();
+ Payload = IoBuffer(RequestCopyWriter.GetSaveSize());
+ RequestCopyWriter.Save(Payload.GetMutableView());
+ }
+ }
+
+ if (!m_DryRun)
+ {
+ Http.SetSessionId(RequestInfo.SessionId);
+ Payload.SetContentType(RequestInfo.ContentType);
+
+ HttpClient::Response Response =
+ Http.Post("/z$/$rpc", Payload, {HttpClient::Accept(RequestInfo.AcceptType)});
+
+ BytesSent.fetch_add(Payload.GetSize());
+ if (!Response)
+ {
+ ZEN_CONSOLE_ERROR("{}", Response);
+ break;
+ }
+ BytesReceived.fetch_add(Response.DownloadedBytes);
+ }
+ }
+
+ EntryIndex = EntryOffset.fetch_add(m_Stride);
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+
+ while (!WorkLatch.Wait(1000))
+ {
+ // EntryCount > m_Offset is guaranteed by the early-return above.
+ // EntryOffset atomically overshoots EntryCount (fetch_add past the
+ // end) when the workload finishes, so clamp before subtracting.
+ const uint64_t RequestsTotal = (EntryCount - m_Offset) / m_Stride;
+ const uint64_t CurrentOffset = EntryOffset.load();
+ const uint64_t RequestsRemaining = CurrentOffset < EntryCount ? (EntryCount - CurrentOffset) / m_Stride : 0;
+ const uint64_t PercentDone = RequestsTotal > 0 ? (RequestsTotal - RequestsRemaining) * 100 / RequestsTotal : 100;
+
+ ZEN_CONSOLE("[{:3}%] [{}] {} requests, {} remaining (sent {}, received {})",
+ PercentDone,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ RequestsTotal,
+ RequestsRemaining,
+ NiceBytes(BytesSent.load()),
+ NiceBytes(BytesReceived.load()));
+ }
+
+ if (m_ShowMethodStats)
+ {
+ for (const auto& It : MethodTypes)
+ {
+ ZEN_CONSOLE("{:18}: {:10}", It.first, It.second);
+ }
+ }
+ }
+
+ const uint64_t RequestsSent = (EntryOffset.load() - m_Offset) / m_Stride;
+ const uint64_t ElapsedMS = Timer.GetElapsedTimeMs();
+ const uint64_t Sent = BytesSent.load();
+ const uint64_t Received = BytesReceived.load();
+
+ ZEN_CONSOLE("Processed requests: {} ({}), payloads sent {} ({}), payloads received {} ({}) in {}.\nTotal runtime: {}",
+ RequestsSent,
+ NiceRate(RequestsSent, ElapsedMS, "req"),
+ NiceBytes(Sent),
+ NiceByteRate(Sent, ElapsedMS),
+ NiceBytes(Received),
+ NiceByteRate(Received, ElapsedMS),
+ NiceTimeSpanMs(ElapsedMS),
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()));
+}
+
} // namespace zen