diff options
Diffstat (limited to 'src/zen/cmds/cache_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/cache_cmd.cpp | 884 |
1 files changed, 685 insertions, 199 deletions
diff --git a/src/zen/cmds/cache_cmd.cpp b/src/zen/cmds/cache_cmd.cpp index a8c15f119..f93a5318c 100644 --- a/src/zen/cmds/cache_cmd.cpp +++ b/src/zen/cmds/cache_cmd.cpp @@ -2,25 +2,40 @@ #include "cache_cmd.h" +#include "zenserviceclient.h" + #include <zencore/compactbinarybuilder.h> #include <zencore/compress.h> #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/process.h> #include <zencore/scopeguard.h> +#include <zencore/session.h> +#include <zencore/stream.h> #include <zencore/thread.h> +#include <zencore/timer.h> #include <zencore/workthreadpool.h> +#include <zenhttp/formatters.h> #include <zenhttp/httpclient.h> #include <zenhttp/httpcommon.h> #include <zenhttp/packageformat.h> #include <zenstore/cache/cachepolicy.h> +#include <zenutil/rpcrecording.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END #include <memory> #include <random> namespace zen { +using namespace std::literals; + namespace { IoBuffer CreateRandomBlob(uint64_t Size) { @@ -56,37 +71,112 @@ namespace { } } // namespace -DropCommand::DropCommand() +//////////////////////////////////////////////////////////////////////////////// +// CacheCommand + +CacheCommand::CacheCommand() { m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); - m_Options.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), "<namespacename>"); - m_Options.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), "<bucketname>"); - m_Options.parse_positional({"namespace", "bucket"}); + + AddSubCommand(m_DetailsSubCmd); + AddSubCommand(m_DropSubCmd); + AddSubCommand(m_GenSubCmd); + AddSubCommand(m_GetSubCmd); + AddSubCommand(m_InfoSubCmd); + AddSubCommand(m_RecordSubCmd); + AddSubCommand(m_ReplaySubCmd); + AddSubCommand(m_StatsSubCmd); } -DropCommand::~DropCommand() = default; +CacheCommand::~CacheCommand() = default; -void -DropCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +//////////////////////////////////////////////////////////////////////////////// +// CacheSubCmdBase + +CacheSubCmdBase::CacheSubCmdBase(std::string_view Name, std::string_view Description) : ZenSubCmdBase(Name, Description) { - ZEN_UNUSED(GlobalOptions); + m_SubOptions.add_option("", "u", "hosturl", ZenCmdBase::kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); +} - if (!ParseOptions(argc, argv)) +void +CacheSubCmdBase::ResolveHost() +{ + m_HostName = ZenCmdBase::ResolveTargetHostSpec(m_HostName); + if (m_HostName.empty()) { - return; + throw OptionParseException("Unable to resolve server specification", m_SubOptions.help()); } +} - m_HostName = ResolveTargetHostSpec(m_HostName); +//////////////////////////////////////////////////////////////////////////////// +// Legacy shim dispatcher - if (m_HostName.empty()) +namespace cache_legacy_shim { + static void Dispatch(std::span<const std::string_view> Injected, const ZenCliOptions& GlobalOptions, int argc, char** argv) { - throw OptionParseException("Unable to resolve server specification", m_Options.help()); + // cxxopts treats argv as writable char** in the style of C main(argv). + // Stage the injected tokens in writable std::string storage so we never + // hand out pointers to string literals. + std::vector<std::string> Storage; + Storage.reserve(Injected.size()); + for (std::string_view Token : Injected) + { + Storage.emplace_back(Token); + } + + std::vector<char*> NewArgv; + NewArgv.reserve(static_cast<size_t>(argc) + Storage.size()); + NewArgv.push_back(argv[0]); + for (std::string& Token : Storage) + { + NewArgv.push_back(Token.data()); + } + for (int i = 1; i < argc; ++i) + { + NewArgv.push_back(argv[i]); + } + + CacheCommand Impl; + Impl.Run(GlobalOptions, static_cast<int>(NewArgv.size()), NewArgv.data()); } + void RunAs(const char* SubCommandName, const ZenCliOptions& GlobalOptions, int argc, char** argv) + { + const std::string_view Tokens[] = {std::string_view(SubCommandName)}; + Dispatch(Tokens, GlobalOptions, argc, argv); + } +} // namespace cache_legacy_shim + +// RpcStopRecordingCommand is unique among legacy shims in that it needs to +// inject two tokens ("record" and "stop") rather than a single subcommand name. +void +RpcStopRecordingCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + using namespace std::literals; + const std::string_view Tokens[] = {"record"sv, "stop"sv}; + cache_legacy_shim::Dispatch(Tokens, GlobalOptions, argc, argv); +} + +//////////////////////////////////////////////////////////////////////////////// +// CacheDropSubCmd + +CacheDropSubCmd::CacheDropSubCmd() : CacheSubCmdBase("drop", "Drop cache namespace or bucket") +{ + m_SubOptions.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), "<namespacename>"); + m_SubOptions.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), "<bucketname>"); + m_SubOptions.parse_positional({"namespace", "bucket"}); +} + +void +CacheDropSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) +{ + ResolveHost(); + ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "drop"}); + HttpClient& Http = Service.Http(); + if (m_NamespaceName.empty()) { - throw OptionParseException("'--namespace' is required", m_Options.help()); + throw OptionParseException("'--namespace' is required", m_SubOptions.help()); } std::string Url; @@ -94,18 +184,16 @@ DropCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (m_BucketName.empty()) { - DropDescription = fmt::format("cache namespace '{}' from '{}'", m_NamespaceName, m_HostName); + DropDescription = fmt::format("cache namespace '{}' from '{}'", m_NamespaceName, Service.HostSpec()); Url = fmt::format("/z$/{}", m_NamespaceName); } else { - DropDescription = fmt::format("cache bucket '{}/{}' from '{}'", m_NamespaceName, m_BucketName, m_HostName); + DropDescription = fmt::format("cache bucket '{}/{}' from '{}'", m_NamespaceName, m_BucketName, Service.HostSpec()); Url = fmt::format("/z$/{}/{}", m_NamespaceName, m_BucketName); } ZEN_CONSOLE("Dropping {}", DropDescription); - - HttpClient Http = CreateHttpClient(m_HostName); if (HttpClient::Response Response = Http.Delete(Url)) { ZEN_CONSOLE("{}", Response.ToText()); @@ -116,54 +204,43 @@ DropCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } -CacheInfoCommand::CacheInfoCommand() +//////////////////////////////////////////////////////////////////////////////// +// CacheInfoSubCmd + +CacheInfoSubCmd::CacheInfoSubCmd() : CacheSubCmdBase("info", "Info on cache, namespace or bucket") { - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); - m_Options.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), "<namespacename>"); - m_Options.add_option("", - "", - "bucketsizes", - "Comma delimited list of bucket names to get size info from, * to get info on all buckets", - cxxopts::value(m_SizeInfoBucketNames), - "<bucketnames>"); - m_Options.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), "<bucketname>"); - m_Options.add_option("", "", "bucketsize", "Show detailed bucket size info", cxxopts::value(m_BucketSizeInfo), "<bucketsize>"); - - m_Options.parse_positional({"namespace", "bucket"}); + m_SubOptions.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), "<namespacename>"); + m_SubOptions.add_option("", + "", + "bucketsizes", + "Comma delimited list of bucket names to get size info from, * to get info on all buckets", + cxxopts::value(m_SizeInfoBucketNames), + "<bucketnames>"); + m_SubOptions.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), "<bucketname>"); + m_SubOptions.add_option("", "", "bucketsize", "Show detailed bucket size info", cxxopts::value(m_BucketSizeInfo), "<bucketsize>"); + m_SubOptions.add_option("", "y", "yaml", "Output as YAML instead of JSON", cxxopts::value(m_YAML), "<yaml>"); + m_SubOptions.parse_positional({"namespace", "bucket"}); } -CacheInfoCommand::~CacheInfoCommand() = default; - void -CacheInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +CacheInfoSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { - ZEN_UNUSED(GlobalOptions); - - if (!ParseOptions(argc, argv)) - { - return; - } - - m_HostName = ResolveTargetHostSpec(m_HostName); - - if (m_HostName.empty()) - { - throw OptionParseException("Unable to resolve server specification", m_Options.help()); - } + ResolveHost(); + ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "info"}); + HttpClient& Http = Service.Http(); std::string Url; - if (m_HostName.empty()) + if (m_NamespaceName.empty()) { if (!m_SizeInfoBucketNames.empty()) { - throw OptionParseException("'--bucketsizes' requires '--namespace'", m_Options.help()); + throw OptionParseException("'--bucketsizes' requires '--namespace'", m_SubOptions.help()); } if (m_BucketSizeInfo) { - throw OptionParseException("'--bucketsize' requires '--namespace' and '--bucket'", m_Options.help()); + throw OptionParseException("'--bucketsize' requires '--namespace' and '--bucket'", m_SubOptions.help()); } - ZEN_CONSOLE("Info on cache from '{}'", m_HostName); + ZEN_CONSOLE("Info on cache from '{}'", Service.HostSpec()); Url = "/z$"; } else if (m_BucketName.empty()) @@ -171,18 +248,18 @@ CacheInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (m_BucketSizeInfo) { throw OptionParseException(fmt::format("'--bucketsize' requires '--namespace' and '--bucket' ('{}')", m_BucketName), - m_Options.help()); + m_SubOptions.help()); } - ZEN_CONSOLE("Info on cache namespace '{}' from '{}'", m_NamespaceName, m_HostName); + ZEN_CONSOLE("Info on cache namespace '{}' from '{}'", m_NamespaceName, Service.HostSpec()); Url = fmt::format("/z$/{}", m_NamespaceName); } else { if (!m_SizeInfoBucketNames.empty()) { - throw OptionParseException("'--bucketsizes' conflicts with '--bucket'", m_Options.help()); + throw OptionParseException("'--bucketsizes' conflicts with '--bucket'", m_SubOptions.help()); } - ZEN_CONSOLE("Info on cache bucket '{}/{}' from '{}'", m_NamespaceName, m_BucketName, m_HostName); + ZEN_CONSOLE("Info on cache bucket '{}/{}' from '{}'", m_NamespaceName, m_BucketName, Service.HostSpec()); Url = fmt::format("/z$/{}/{}", m_NamespaceName, m_BucketName); } @@ -196,8 +273,9 @@ CacheInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) Parameters.Entries.insert({"bucketsize", "true"}); } - HttpClient Http = CreateHttpClient(m_HostName); - if (HttpClient::Response Response = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON), Parameters)) + const ZenContentType AcceptType = m_YAML ? ZenContentType::kYAML : ZenContentType::kJSON; + + if (HttpClient::Response Response = Http.Get(Url, HttpClient::Accept(AcceptType), Parameters)) { ZEN_CONSOLE("{}", Response.ToText()); } @@ -207,76 +285,62 @@ CacheInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } -CacheStatsCommand::CacheStatsCommand() +//////////////////////////////////////////////////////////////////////////////// +// CacheStatsSubCmd + +CacheStatsSubCmd::CacheStatsSubCmd() : CacheSubCmdBase("stats", "Stats on cache") { - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_SubOptions.add_option("", "y", "yaml", "Output as YAML instead of JSON", cxxopts::value(m_YAML), "<yaml>"); } -CacheStatsCommand::~CacheStatsCommand() = default; - void -CacheStatsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +CacheStatsSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { - ZEN_UNUSED(GlobalOptions); + ResolveHost(); + ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "stats"}); + HttpClient& Http = Service.Http(); - if (!ParseOptions(argc, argv)) - { - return; - } - - m_HostName = ResolveTargetHostSpec(m_HostName); - - if (m_HostName.empty()) - { - throw OptionParseException("Unable to resolve server specification", m_Options.help()); - } + const ZenContentType AcceptType = m_YAML ? ZenContentType::kYAML : ZenContentType::kJSON; - HttpClient Http = CreateHttpClient(m_HostName); - if (HttpClient::Response Response = Http.Get("/stats/z$", HttpClient::Accept(ZenContentType::kJSON))) + if (HttpClient::Response Response = Http.Get("/stats/z$", HttpClient::Accept(AcceptType))) { ZEN_CONSOLE("{}", Response.ToText()); } else { - Response.ThrowError("Info failed"); + Response.ThrowError("Stats failed"); } } -CacheDetailsCommand::CacheDetailsCommand() +//////////////////////////////////////////////////////////////////////////////// +// CacheDetailsSubCmd + +CacheDetailsSubCmd::CacheDetailsSubCmd() : CacheSubCmdBase("details", "Details on cache") { - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); - m_Options.add_option("", "c", "csv", "Info on csv format", cxxopts::value(m_CSV), "<csv>"); - m_Options.add_option("", "d", "details", "Get detailed information about records", cxxopts::value(m_Details), "<details>"); - m_Options.add_option("", - "a", - "attachmentdetails", - "Get detailed information about attachments", - cxxopts::value(m_AttachmentDetails), - "<attachmentdetails>"); - m_Options.add_option("", "n", "namespace", "Namespace name to get info for", cxxopts::value(m_Namespace), "<namespace>"); - m_Options.add_option("", "b", "bucket", "Filter on bucket name", cxxopts::value(m_Bucket), "<bucket>"); - m_Options.add_option("", "v", "valuekey", "Filter on value key hash string", cxxopts::value(m_ValueKey), "<valuekey>"); + m_SubOptions.add_option("", "c", "csv", "Output as CSV instead of JSON", cxxopts::value(m_CSV), "<csv>"); + m_SubOptions.add_option("", "y", "yaml", "Output as YAML instead of JSON", cxxopts::value(m_YAML), "<yaml>"); + m_SubOptions.add_option("", "d", "details", "Get detailed information about records", cxxopts::value(m_Details), "<details>"); + m_SubOptions.add_option("", + "a", + "attachmentdetails", + "Get detailed information about attachments", + cxxopts::value(m_AttachmentDetails), + "<attachmentdetails>"); + m_SubOptions.add_option("", "n", "namespace", "Namespace name to get info for", cxxopts::value(m_Namespace), "<namespace>"); + m_SubOptions.add_option("", "b", "bucket", "Filter on bucket name", cxxopts::value(m_Bucket), "<bucket>"); + m_SubOptions.add_option("", "v", "valuekey", "Filter on value key hash string", cxxopts::value(m_ValueKey), "<valuekey>"); } -CacheDetailsCommand::~CacheDetailsCommand() = default; - void -CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +CacheDetailsSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { - ZEN_UNUSED(GlobalOptions); + ResolveHost(); + ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "details"}); + HttpClient& Http = Service.Http(); - if (!ParseOptions(argc, argv)) + if (m_CSV && m_YAML) { - return; - } - - m_HostName = ResolveTargetHostSpec(m_HostName); - - if (m_HostName.empty()) - { - throw OptionParseException("Unable to resolve server specification", m_Options.help()); + throw OptionParseException("'--csv' conflicts with '--yaml'", m_SubOptions.help()); } HttpClient::KeyValueMap Parameters; @@ -296,7 +360,7 @@ CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar } else { - Headers = HttpClient::Accept(ZenContentType::kJSON); + Headers = HttpClient::Accept(m_YAML ? ZenContentType::kYAML : ZenContentType::kJSON); } std::string Url; @@ -304,11 +368,11 @@ CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar { if (m_Namespace.empty()) { - throw OptionParseException("'--namespace' is required", m_Options.help()); + throw OptionParseException("'--namespace' is required", m_SubOptions.help()); } if (m_Bucket.empty()) { - throw OptionParseException("'--bucket' is required", m_Options.help()); + throw OptionParseException("'--bucket' is required", m_SubOptions.help()); } Url = fmt::format("/z$/details$/{}/{}/{}", m_Namespace, m_Bucket, m_ValueKey); } @@ -316,7 +380,7 @@ CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar { if (m_Namespace.empty()) { - throw OptionParseException("'--namespace' is required", m_Options.help()); + throw OptionParseException("'--namespace' is required", m_SubOptions.help()); } Url = fmt::format("/z$/details$/{}/{}", m_Namespace, m_Bucket); } @@ -329,61 +393,49 @@ CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar Url = "/z$/details$"; } - HttpClient Http = CreateHttpClient(m_HostName); if (HttpClient::Response Response = Http.Get(Url, Headers, Parameters)) { ZEN_CONSOLE("{}", Response.ToText()); } else { - Response.ThrowError("Info failed"); + Response.ThrowError("Details failed"); } } -CacheGenerateCommand::CacheGenerateCommand() +//////////////////////////////////////////////////////////////////////////////// +// CacheGenSubCmd + +CacheGenSubCmd::CacheGenSubCmd() : CacheSubCmdBase("gen", "Generates cache values into a bucket") { - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); - m_Options + m_SubOptions .add_option("", "n", "namespace", "Namespace to generate cache values/records for", cxxopts::value(m_Namespace), "<namespace>"); - m_Options.add_option("", "b", "bucket", "Bucket name to generate cache values/records for", cxxopts::value(m_Bucket), "<bucket>"); - m_Options.add_option("", "", "count", "Number of cache values/records to generate", cxxopts::value(m_Count), "<count>"); - m_Options.add_option("", "", "min-size", "Minimum size of cache value/attachments", cxxopts::value(m_MinSize), "<min>"); - m_Options.add_option("", "", "max-size", "Maximum size of cache value/attachments", cxxopts::value(m_MaxSize), "<max>"); - m_Options.add_option("", - "", - "min-attachments", - "Minimum number of attachments when creating record based values", - cxxopts::value(m_MinAttachmentCount), - "<minattachments>"); - m_Options.add_option("", - "", - "max-attachments", - "Minimum number of attachments when creating record based values, 0 to only create cache values", - cxxopts::value(m_MaxAttachmentCount), - "<maxattachments>"); - m_Options.parse_positional({"namespace", "bucket", "count"}); - m_Options.positional_help("namespace bucket count"); + m_SubOptions.add_option("", "b", "bucket", "Bucket name to generate cache values/records for", cxxopts::value(m_Bucket), "<bucket>"); + m_SubOptions.add_option("", "", "count", "Number of cache values/records to generate", cxxopts::value(m_Count), "<count>"); + m_SubOptions.add_option("", "", "min-size", "Minimum size of cache value/attachments", cxxopts::value(m_MinSize), "<min>"); + m_SubOptions.add_option("", "", "max-size", "Maximum size of cache value/attachments", cxxopts::value(m_MaxSize), "<max>"); + m_SubOptions.add_option("", + "", + "min-attachments", + "Minimum number of attachments when creating record based values", + cxxopts::value(m_MinAttachmentCount), + "<minattachments>"); + m_SubOptions.add_option("", + "", + "max-attachments", + "Minimum number of attachments when creating record based values, 0 to only create cache values", + cxxopts::value(m_MaxAttachmentCount), + "<maxattachments>"); + m_SubOptions.parse_positional({"namespace", "bucket", "count"}); + m_SubOptions.positional_help("namespace bucket count"); } -CacheGenerateCommand::~CacheGenerateCommand() = default; - void -CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +CacheGenSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { - ZEN_UNUSED(GlobalOptions); - - if (!ParseOptions(argc, argv)) - { - return; - } - - m_HostName = ResolveTargetHostSpec(m_HostName); - - if (m_HostName.empty()) - { - throw OptionParseException("Unable to resolve server specification", m_Options.help()); - } + ResolveHost(); + ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "gen"}); + HttpClient& Http = Service.Http(); if (m_MaxSize == 0 && m_MinSize == 0) { @@ -400,6 +452,16 @@ CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a } } + // The size-range expansion below requires MinSize >= 1 (it uses + // `MinSize - 1` as a uniform distribution upper bound, which would + // underflow on an unsigned zero) and MaxSize >= MinSize. + if (m_MinSize == 0 || m_MaxSize < m_MinSize) + { + throw OptionParseException( + fmt::format("'--min-size' ({}) must be >= 1 and '--max-size' ({}) must be >= '--min-size'", m_MinSize, m_MaxSize), + m_SubOptions.help()); + } + std::vector<std::uniform_int_distribution<uint64_t>> Variations; std::vector<size_t> SizeRanges; SizeRanges.push_back(m_MinSize); @@ -431,8 +493,6 @@ CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a std::uniform_int_distribution<uint64_t> KeyDistribution; - HttpClient Http = CreateHttpClient(m_HostName); - auto GeneratePutCacheValueRequest( [this, &KeyDistribution, &Generator](std::span<std::uint64_t> BatchSizes, uint64_t RequestIndex) -> CbPackage { CbPackage Package; @@ -583,68 +643,56 @@ CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a } } -CacheGetCommand::CacheGetCommand() +//////////////////////////////////////////////////////////////////////////////// +// CacheGetSubCmd + +CacheGetSubCmd::CacheGetSubCmd() : CacheSubCmdBase("get", "Get cache values/records or attachments from a bucket") { - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); - m_Options - .add_option("", "n", "namespace", "Namespace to generate cache values/records for", cxxopts::value(m_Namespace), "<namespace>"); - m_Options.add_option("", "b", "bucket", "Bucket name to generate cache values/records for", cxxopts::value(m_Bucket), "<bucket>"); - m_Options.add_option("", "v", "valuekey", "Cache entry iohash id", cxxopts::value(m_ValueKey), "<valuekey>"); - m_Options.add_option("", - "a", - "attachmenthash", - "For a cache entry record, get a particular attachment based on the 'RawHash'", - cxxopts::value(m_AttachmentHash), - "<attachmenthash>"); - m_Options.add_option("", "o", "output-path", "File path for output data", cxxopts::value(m_OutputPath), "<path>"); - m_Options.add_option("", "t", "text", "Ouput content of cache entry record as text", cxxopts::value(m_AsText), "<text>"); - m_Options + m_SubOptions.add_option("", "n", "namespace", "Namespace of the cache entry", cxxopts::value(m_Namespace), "<namespace>"); + m_SubOptions.add_option("", "b", "bucket", "Bucket of the cache entry", cxxopts::value(m_Bucket), "<bucket>"); + m_SubOptions.add_option("", "v", "valuekey", "Cache entry iohash id", cxxopts::value(m_ValueKey), "<valuekey>"); + m_SubOptions.add_option("", + "a", + "attachmenthash", + "For a cache entry record, get a particular attachment based on the 'RawHash'", + cxxopts::value(m_AttachmentHash), + "<attachmenthash>"); + m_SubOptions.add_option("", "o", "output-path", "File path for output data", cxxopts::value(m_OutputPath), "<path>"); + m_SubOptions.add_option("", "t", "text", "Output content of cache entry record as text", cxxopts::value(m_AsText), "<text>"); + m_SubOptions .add_option("", "d", "decompress", "Decompress data when applicable. Default = true", cxxopts::value(m_Decompress), "<decompress>"); - m_Options.parse_positional({"namespace", "bucket", "valuekey", "attachmenthash"}); - m_Options.positional_help("namespace bucket valuekey attachmenthash"); + m_SubOptions.parse_positional({"namespace", "bucket", "valuekey", "attachmenthash"}); + m_SubOptions.positional_help("namespace bucket valuekey attachmenthash"); } -CacheGetCommand::~CacheGetCommand() = default; - void -CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +CacheGetSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { - ZEN_UNUSED(GlobalOptions); - using namespace std::literals; - if (!ParseOptions(argc, argv)) - { - return; - } - - m_HostName = ResolveTargetHostSpec(m_HostName); - - if (m_HostName.empty()) - { - throw OptionParseException("Unable to resolve server specification", m_Options.help()); - } + ResolveHost(); + ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "get"}); + HttpClient& Http = Service.Http(); if (m_Namespace.empty()) { - throw OptionParseException("'--namespace' is required", m_Options.help()); + throw OptionParseException("'--namespace' is required", m_SubOptions.help()); } if (m_Bucket.empty()) { - throw OptionParseException("'--bucket' is required", m_Options.help()); + throw OptionParseException("'--bucket' is required", m_SubOptions.help()); } if (m_ValueKey.empty()) { - throw OptionParseException("'--valuekey' is required", m_Options.help()); + throw OptionParseException("'--valuekey' is required", m_SubOptions.help()); } IoHash ValueId; if (!IoHash::TryParse(m_ValueKey, ValueId)) { - throw OptionParseException(fmt::format("'--value-key' ('{}') is malformed", m_ValueKey), m_Options.help()); + throw OptionParseException(fmt::format("'--valuekey' ('{}') is malformed", m_ValueKey), m_SubOptions.help()); } IoHash AttachmentHash; @@ -652,11 +700,14 @@ CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { if (!IoHash::TryParse(m_AttachmentHash, AttachmentHash)) { - throw OptionParseException(fmt::format("'--attachmenthash' ('{}') is malformed", m_AttachmentHash), m_Options.help()); + throw OptionParseException(fmt::format("'--attachmenthash' ('{}') is malformed", m_AttachmentHash), m_SubOptions.help()); } } - HttpClient Http = CreateHttpClient(m_HostName); + if (m_OutputPath.empty() && !m_AsText) + { + throw OptionParseException("'--output-path' is required (or pass '--as-text' to print to stdout)", m_SubOptions.help()); + } if (!m_OutputPath.empty()) { @@ -669,18 +720,19 @@ CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) CreateDirectories(m_OutputPath.parent_path()); } } - if (m_OutputPath.empty()) - { - m_OutputPath = (m_AttachmentHash.empty() ? m_ValueKey : m_AttachmentHash); - } - std::string Url = fmt::format("/z$/{}/{}/{}", m_Namespace, m_Bucket, m_ValueKey); + std::string Url = fmt::format("/z$/{}/{}/{}", m_Namespace, m_Bucket, ValueId); if (AttachmentHash != IoHash::Zero) { Url = fmt::format("{}/{}", Url, AttachmentHash); } if (HttpClient::Response Result = Http.Download(Url, std::filesystem::temp_directory_path()); Result) { + // `Http.Download` parks the payload in the system temp dir and returns + // a buffer that already has delete-on-close set, so every exit path + // (exception, fallback WriteFile, `--as-text` console print) reaps it. + // A successful MoveToFile below clears the flag so the payload's + // handle-close doesn't delete the caller's output afterwards. auto TryDecompress = [](const IoBuffer& Buffer) -> IoBuffer { IoHash RawHash; uint64_t RawSize; @@ -707,7 +759,17 @@ CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } else { - if (!MoveToFile(m_OutputPath, ChunkData)) + if (std::error_code MoveEc = MoveToFile(m_OutputPath, ChunkData); MoveEc) + { + // The file was renamed into place; clearing DeleteOnClose prevents + // the move'd-out file at m_OutputPath from being deleted when the + // payload's handle closes. When m_Decompress is false ChunkData + // shares a core with ResponsePayload so either clear suffices; + // when decompressed ChunkData is in-memory and MoveToFile would + // have failed, so we don't reach this branch. + Result.ResponsePayload.SetDeleteOnClose(false); + } + else { WriteFile(m_OutputPath, ChunkData); } @@ -720,4 +782,428 @@ CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } +//////////////////////////////////////////////////////////////////////////////// +// CacheRecordSubCmd + +CacheRecordSubCmd::CacheRecordSubCmd() +: CacheSubCmdBase("record", "Start recording cache rpc requests ('cache record <path>'), or stop ('cache record stop')") +{ + m_SubOptions.add_option("", "p", "path", "Recording file path, or 'stop' to stop recording", cxxopts::value(m_Path), "<path>"); + m_SubOptions.parse_positional("path"); + m_SubOptions.positional_help("<path>|stop"); +} + +void +CacheRecordSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) +{ + ResolveHost(); + ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "record"}); + HttpClient& Http = Service.Http(); + + if (m_Path == "stop") + { + if (HttpClient::Response Response = Http.Post("/z$/exec$/stop-recording"sv)) + { + ZEN_CONSOLE("{}", Response.ToText()); + } + else + { + Response.ThrowError("Failed to stop recording"); + } + return; + } + + if (m_Path.empty()) + { + throw OptionParseException("recording path is required (use '<path>' to start, 'stop' to stop)", m_SubOptions.help()); + } + + if (HttpClient::Response Response = + Http.Post("/z$/exec$/start-recording"sv, HttpClient::KeyValueMap{}, HttpClient::KeyValueMap({{"path", m_Path}}))) + { + ZEN_CONSOLE("{}", Response.ToText()); + } + else + { + Response.ThrowError("Failed to start recording"); + } +} + +//////////////////////////////////////////////////////////////////////////////// +// CacheReplaySubCmd + +CacheReplaySubCmd::CacheReplaySubCmd() : CacheSubCmdBase("replay", "Replays a previously recorded session of rpc requests") +{ + m_SubOptions.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), "<path>"); + m_SubOptions.add_option("", "", "dry", "Do a dry run", cxxopts::value(m_DryRun), "<enable>"); + m_SubOptions.add_option("", + "w", + "numthreads", + "Number of worker threads per process", + cxxopts::value(m_ThreadCount)->default_value(fmt::format("{}", GetHardwareConcurrency())), + "<count>"); + m_SubOptions.add_option("", "", "onhost", "Replay on host, bypassing http/network layer", cxxopts::value(m_OnHost), "<onhost>"); + m_SubOptions.add_option("", + "", + "showmethodstats", + "Show statistics of which RPC methods are used", + cxxopts::value(m_ShowMethodStats), + "<showmethodstats>"); + m_SubOptions.add_option("", + "", + "offset", + "Offset into request recording to start replay", + cxxopts::value(m_Offset)->default_value("0"), + "<offset>"); + m_SubOptions.add_option("", + "", + "stride", + "Stride for request recording when replaying requests", + cxxopts::value(m_Stride)->default_value("1"), + "<stride>"); + m_SubOptions.add_option("", "", "numproc", "Number of worker processes", cxxopts::value(m_ProcessCount)->default_value("1"), "<count>"); + m_SubOptions.add_option("", + "", + "forceallowlocalrefs", + "Force enable local refs in requests", + cxxopts::value(m_ForceAllowLocalRefs), + "<enable>"); + m_SubOptions + .add_option("", "", "disablelocalrefs", "Force disable local refs in requests", cxxopts::value(m_DisableLocalRefs), "<enable>"); + m_SubOptions.add_option("", + "", + "forceallowlocalhandlerefs", + "Force enable local refs as handles in requests", + cxxopts::value(m_ForceAllowLocalHandleRef), + "<enable>"); + m_SubOptions.add_option("", + "", + "disablelocalhandlerefs", + "Force disable local refs as handles in requests", + cxxopts::value(m_DisableLocalHandleRefs), + "<enable>"); + m_SubOptions.add_option("", + "", + "forceallowpartiallocalrefs", + "Force enable local refs for all sizes", + cxxopts::value(m_ForceAllowPartialLocalRefs), + "<enable>"); + m_SubOptions.add_option("", + "", + "disablepartiallocalrefs", + "Force disable local refs for all sizes", + cxxopts::value(m_DisablePartialLocalRefs), + "<enable>"); + m_SubOptions.parse_positional("path"); +} + +void +CacheReplaySubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) +{ + if (m_RecordingPath.empty()) + { + throw OptionParseException("'--path' is required", m_SubOptions.help()); + } + + if (!IsDir(m_RecordingPath)) + { + throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath)); + } + + if (m_Stride == 0) + { + throw OptionParseException("'--stride' must be >= 1", m_SubOptions.help()); + } + + m_ThreadCount = Max(m_ThreadCount, 1); + + ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "replay"}); + m_HostName = Service.HostSpec(); + + ZEN_CONSOLE("Replay '{}' (start offset {}, stride {}) to '{}', {} threads", + m_RecordingPath, + m_Offset, + m_Stride, + m_HostName, + m_ThreadCount); + + Stopwatch TotalTimer; + + if (m_OnHost) + { + HttpClient& Http = Service.Http(); + if (HttpClient::Response Response = + Http.Post("/z$/exec$/replay-recording"sv, + HttpClient::KeyValueMap{}, + HttpClient::KeyValueMap({{"path", m_RecordingPath}, {"thread-count", fmt::format("{}", m_ThreadCount)}}))) + { + ZEN_CONSOLE("{}", Response.ToText()); + + return; + } + else + { + Response.ThrowError("Failed to start replay"); + } + } + + std::unique_ptr<cache::IRpcRequestReplayer> Replayer = cache::MakeDiskRequestReplayer(m_RecordingPath, true); + uint64_t EntryCount = Replayer->GetRequestCount(); + + if (m_Offset >= EntryCount) + { + ZEN_CONSOLE("Offset {} is at or past the end of the recording ({} entries); nothing to replay", m_Offset, EntryCount); + return; + } + + std::atomic_uint64_t EntryOffset = m_Offset; + std::atomic_uint64_t BytesSent = 0; + std::atomic_uint64_t BytesReceived = 0; + + Stopwatch Timer; + + // The subcommand API does not receive argv, so look the zen executable path + // up from the current process to spawn child workers. + const std::filesystem::path SelfExePath = GetRunningExecutablePath(); + + if (m_ProcessCount > 1) + { + std::vector<std::unique_ptr<ProcessHandle>> WorkerProcesses; + WorkerProcesses.resize(m_ProcessCount); + + ProcessMonitor Monitor; + for (int ProcessIndex = 0; ProcessIndex < m_ProcessCount; ++ProcessIndex) + { + std::string CommandLine = + fmt::format("{} cache replay --hosturl {} --path \"{}\" --offset {} --stride {} --numthreads {} --numproc {}"sv, + SelfExePath.string(), + m_HostName, + m_RecordingPath, + m_Stride == 1 ? 0 : m_Offset + ProcessIndex, + m_Stride, + m_ThreadCount, + 1); + CreateProcResult Result(CreateProc(SelfExePath, CommandLine)); + WorkerProcesses[ProcessIndex] = std::make_unique<ProcessHandle>(); + WorkerProcesses[ProcessIndex]->Initialize(Result); + Monitor.AddPid(WorkerProcesses[ProcessIndex]->Pid()); + } + while (Monitor.IsRunning()) + { + ZEN_CONSOLE("Waiting for worker processes..."); + Sleep(1000); + } + return; + } + else + { + std::map<std::string, size_t> MethodTypes; + RwLock MethodTypesLock; + + WorkerThreadPool WorkerPool(m_ThreadCount); + + Latch WorkLatch(m_ThreadCount); + for (int WorkerIndex = 0; WorkerIndex < m_ThreadCount; ++WorkerIndex) + { + WorkerPool.ScheduleWork( + [this, &WorkLatch, EntryCount, &EntryOffset, &Replayer, &BytesSent, &BytesReceived, &MethodTypes, &MethodTypesLock]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + + std::map<std::string, size_t> LocalMethodTypes; + + auto ReduceTypes = MakeGuard([&] { + RwLock::ExclusiveLockScope __(MethodTypesLock); + + for (auto& Entry : LocalMethodTypes) + { + MethodTypes[Entry.first] += Entry.second; + } + }); + + HttpClient Http = CacheCommand::CreateHttpClient(m_HostName); + + uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride); + while (EntryIndex < EntryCount) + { + IoBuffer Payload; + const zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload); + + if (RequestInfo != zen::cache::RecordedRequestInfo::NullRequest) + { + CbPackage RequestPackage; + CbObject Request; + + switch (RequestInfo.ContentType) + { + case ZenContentType::kCbPackage: + { + if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage)) + { + Request = RequestPackage.GetObject(); + } + } + break; + case ZenContentType::kCbObject: + { + Request = LoadCompactBinaryObject(Payload); + } + break; + } + + RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u)); + int OriginalProcessPid = Request["Pid"sv].AsInt32(0); + + int AdjustedPid = 0; + RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone; + + if (!m_DisableLocalRefs) + { + if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || + m_ForceAllowLocalRefs) + { + AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences; + if (!m_DisablePartialLocalRefs) + { + if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) || + m_ForceAllowPartialLocalRefs) + { + AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences; + } + } + if (!m_DisableLocalHandleRefs) + { + if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef) + { + AdjustedPid = GetCurrentProcessId(); + } + } + } + } + + if (m_ShowMethodStats) + { + std::string MethodName = std::string(Request["Method"sv].AsString()); + if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end()) + { + It->second++; + } + else + { + LocalMethodTypes[MethodName] = 1; + } + } + + if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid) + { + CbObjectWriter RequestCopyWriter; + for (const CbFieldView& Field : Request) + { + if (!Field.HasName()) + { + RequestCopyWriter.AddField(Field); + continue; + } + std::string_view FieldName = Field.GetName(); + if (FieldName == "Pid"sv) + { + continue; + } + if (FieldName == "AcceptFlags"sv) + { + continue; + } + RequestCopyWriter.AddField(FieldName, Field); + } + if (AdjustedPid != 0) + { + RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid); + } + if (AdjustedAcceptOptions != RpcAcceptOptions::kNone) + { + RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions)); + } + + if (RequestInfo.ContentType == ZenContentType::kCbPackage) + { + RequestPackage.SetObject(RequestCopyWriter.Save()); + std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage); + std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end()); + Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer(); + } + else + { + RequestCopyWriter.Finalize(); + Payload = IoBuffer(RequestCopyWriter.GetSaveSize()); + RequestCopyWriter.Save(Payload.GetMutableView()); + } + } + + if (!m_DryRun) + { + Http.SetSessionId(RequestInfo.SessionId); + Payload.SetContentType(RequestInfo.ContentType); + + HttpClient::Response Response = + Http.Post("/z$/$rpc", Payload, {HttpClient::Accept(RequestInfo.AcceptType)}); + + BytesSent.fetch_add(Payload.GetSize()); + if (!Response) + { + ZEN_CONSOLE_ERROR("{}", Response); + break; + } + BytesReceived.fetch_add(Response.DownloadedBytes); + } + } + + EntryIndex = EntryOffset.fetch_add(m_Stride); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + + while (!WorkLatch.Wait(1000)) + { + // EntryCount > m_Offset is guaranteed by the early-return above. + // EntryOffset atomically overshoots EntryCount (fetch_add past the + // end) when the workload finishes, so clamp before subtracting. + const uint64_t RequestsTotal = (EntryCount - m_Offset) / m_Stride; + const uint64_t CurrentOffset = EntryOffset.load(); + const uint64_t RequestsRemaining = CurrentOffset < EntryCount ? (EntryCount - CurrentOffset) / m_Stride : 0; + const uint64_t PercentDone = RequestsTotal > 0 ? (RequestsTotal - RequestsRemaining) * 100 / RequestsTotal : 100; + + ZEN_CONSOLE("[{:3}%] [{}] {} requests, {} remaining (sent {}, received {})", + PercentDone, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + RequestsTotal, + RequestsRemaining, + NiceBytes(BytesSent.load()), + NiceBytes(BytesReceived.load())); + } + + if (m_ShowMethodStats) + { + for (const auto& It : MethodTypes) + { + ZEN_CONSOLE("{:18}: {:10}", It.first, It.second); + } + } + } + + const uint64_t RequestsSent = (EntryOffset.load() - m_Offset) / m_Stride; + const uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); + const uint64_t Sent = BytesSent.load(); + const uint64_t Received = BytesReceived.load(); + + ZEN_CONSOLE("Processed requests: {} ({}), payloads sent {} ({}), payloads received {} ({}) in {}.\nTotal runtime: {}", + RequestsSent, + NiceRate(RequestsSent, ElapsedMS, "req"), + NiceBytes(Sent), + NiceByteRate(Sent, ElapsedMS), + NiceBytes(Received), + NiceByteRate(Received, ElapsedMS), + NiceTimeSpanMs(ElapsedMS), + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); +} + } // namespace zen |