diff options
| author | Stefan Boberg <[email protected]> | 2026-04-20 10:00:10 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-20 10:00:10 +0200 |
| commit | dde485f0b777a62d65d817906a8e05caf2d18bc3 (patch) | |
| tree | 502642993f78b3df78dae4adec316b688493f1ea /src/zen/cmds/cache_cmd.cpp | |
| parent | builds cmd refactor (#975) (diff) | |
| download | archived-zen-dde485f0b777a62d65d817906a8e05caf2d18bc3.tar.xz archived-zen-dde485f0b777a62d65d817906a8e05caf2d18bc3.zip | |
consolidate cache commands into `cache` subcommand (#978)
Consolidate the scattered cache-related top-level commands into a single `zen cache <sub>` command tree, keeping the old names as hidden deprecated aliases so any existing scripts keep working.
## Motivation
`zen` has accumulated a flat list of cache-adjacent commands (`cache-info`, `cache-stats`, `cache-details`, `cache-gen`, `cache-get`, `drop`, `rpc-record-start/stop`, `rpc-record-replay`). Each one re-declares `--hosturl` parsing and host resolution, and there is no natural home for new cache tooling. Grouping them under `cache` gives a consistent UX and a shared base class to hang common options off of.
## Changes
### Subcommand consolidation
- Moved into `cache <sub>` form:
- `cache info`, `cache stats`, `cache details`, `cache gen`, `cache get`, `cache drop`
- `cache record <path>` / `cache record stop` (formerly `rpc-record-start` / `rpc-record-stop`)
- `cache replay` (formerly `rpc-record-replay`)
- All old top-level names remain as deprecated aliases and forward through a shared legacy-shim dispatcher that rewrites `argv` and re-enters the new dispatcher, so behavior is byte-identical for existing callers.
- Deprecated aliases are now hidden from the top-level `zen --help` listing (new `ZenCmdBase::IsHidden()` + `DeprecatedCacheStoreCommand` base). They still dispatch normally; `zen cache --help` is the canonical discovery surface.
### Shared base class
- New `CacheSubCmdBase` owns the `--hosturl` option and `ResolveHost()` logic, eliminating the copy/pasted block at the top of every `Run()`.
### Output format
- Added `--yaml` to `cache info`, `cache stats`, and `cache details` (negotiated server-side via `Accept: text/yaml`). `cache details` now rejects `--csv --yaml` combined.
### Hardening
- `cache gen`: bounds-check requested sizes before allocating.
- `cache replay`: validate `--stride` / `--offset` and fix progress-math overflow edge cases.
Diffstat (limited to 'src/zen/cmds/cache_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/cache_cmd.cpp | 840 |
1 files changed, 651 insertions, 189 deletions
diff --git a/src/zen/cmds/cache_cmd.cpp b/src/zen/cmds/cache_cmd.cpp index a8c15f119..f024ea57e 100644 --- a/src/zen/cmds/cache_cmd.cpp +++ b/src/zen/cmds/cache_cmd.cpp @@ -8,19 +8,32 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/process.h> #include <zencore/scopeguard.h> +#include <zencore/session.h> +#include <zencore/stream.h> #include <zencore/thread.h> +#include <zencore/timer.h> #include <zencore/workthreadpool.h> +#include <zenhttp/formatters.h> #include <zenhttp/httpclient.h> #include <zenhttp/httpcommon.h> #include <zenhttp/packageformat.h> #include <zenstore/cache/cachepolicy.h> +#include <zenutil/rpcrecording.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END #include <memory> #include <random> namespace zen { +using namespace std::literals; + namespace { IoBuffer CreateRandomBlob(uint64_t Size) { @@ -56,37 +69,110 @@ namespace { } } // namespace -DropCommand::DropCommand() +//////////////////////////////////////////////////////////////////////////////// +// CacheCommand + +CacheCommand::CacheCommand() { m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); - m_Options.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), "<namespacename>"); - m_Options.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), "<bucketname>"); - m_Options.parse_positional({"namespace", "bucket"}); + + AddSubCommand(m_DetailsSubCmd); + AddSubCommand(m_DropSubCmd); + AddSubCommand(m_GenSubCmd); + AddSubCommand(m_GetSubCmd); + AddSubCommand(m_InfoSubCmd); + AddSubCommand(m_RecordSubCmd); + AddSubCommand(m_ReplaySubCmd); + AddSubCommand(m_StatsSubCmd); } -DropCommand::~DropCommand() = default; +CacheCommand::~CacheCommand() = default; -void -DropCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +//////////////////////////////////////////////////////////////////////////////// +// CacheSubCmdBase + +CacheSubCmdBase::CacheSubCmdBase(std::string_view Name, std::string_view Description) : ZenSubCmdBase(Name, Description) { - ZEN_UNUSED(GlobalOptions); + m_SubOptions.add_option("", "u", "hosturl", ZenCmdBase::kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); +} - if (!ParseOptions(argc, argv)) +void +CacheSubCmdBase::ResolveHost() +{ + m_HostName = ZenCmdBase::ResolveTargetHostSpec(m_HostName); + if (m_HostName.empty()) { - return; + throw OptionParseException("Unable to resolve server specification", m_SubOptions.help()); } +} - m_HostName = ResolveTargetHostSpec(m_HostName); +//////////////////////////////////////////////////////////////////////////////// +// Legacy shim dispatcher - if (m_HostName.empty()) +namespace cache_legacy_shim { + static void Dispatch(std::span<const std::string_view> Injected, const ZenCliOptions& GlobalOptions, int argc, char** argv) + { + // cxxopts treats argv as writable char** in the style of C main(argv). + // Stage the injected tokens in writable std::string storage so we never + // hand out pointers to string literals. + std::vector<std::string> Storage; + Storage.reserve(Injected.size()); + for (std::string_view Token : Injected) + { + Storage.emplace_back(Token); + } + + std::vector<char*> NewArgv; + NewArgv.reserve(static_cast<size_t>(argc) + Storage.size()); + NewArgv.push_back(argv[0]); + for (std::string& Token : Storage) + { + NewArgv.push_back(Token.data()); + } + for (int i = 1; i < argc; ++i) + { + NewArgv.push_back(argv[i]); + } + + CacheCommand Impl; + Impl.Run(GlobalOptions, static_cast<int>(NewArgv.size()), NewArgv.data()); + } + + void RunAs(const char* SubCommandName, const ZenCliOptions& GlobalOptions, int argc, char** argv) { - throw OptionParseException("Unable to resolve server specification", m_Options.help()); + const std::string_view Tokens[] = {std::string_view(SubCommandName)}; + Dispatch(Tokens, GlobalOptions, argc, argv); } +} // namespace cache_legacy_shim + +// RpcStopRecordingCommand is unique among legacy shims in that it needs to +// inject two tokens ("record" and "stop") rather than a single subcommand name. +void +RpcStopRecordingCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + using namespace std::literals; + const std::string_view Tokens[] = {"record"sv, "stop"sv}; + cache_legacy_shim::Dispatch(Tokens, GlobalOptions, argc, argv); +} + +//////////////////////////////////////////////////////////////////////////////// +// CacheDropSubCmd + +CacheDropSubCmd::CacheDropSubCmd() : CacheSubCmdBase("drop", "Drop cache namespace or bucket") +{ + m_SubOptions.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), "<namespacename>"); + m_SubOptions.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), "<bucketname>"); + m_SubOptions.parse_positional({"namespace", "bucket"}); +} + +void +CacheDropSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) +{ + ResolveHost(); if (m_NamespaceName.empty()) { - throw OptionParseException("'--namespace' is required", m_Options.help()); + throw OptionParseException("'--namespace' is required", m_SubOptions.help()); } std::string Url; @@ -105,7 +191,7 @@ DropCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZEN_CONSOLE("Dropping {}", DropDescription); - HttpClient Http = CreateHttpClient(m_HostName); + HttpClient Http = CacheCommand::CreateHttpClient(m_HostName); if (HttpClient::Response Response = Http.Delete(Url)) { ZEN_CONSOLE("{}", Response.ToText()); @@ -116,52 +202,39 @@ DropCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } -CacheInfoCommand::CacheInfoCommand() +//////////////////////////////////////////////////////////////////////////////// +// CacheInfoSubCmd + +CacheInfoSubCmd::CacheInfoSubCmd() : CacheSubCmdBase("info", "Info on cache, namespace or bucket") { - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); - m_Options.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), "<namespacename>"); - m_Options.add_option("", - "", - "bucketsizes", - "Comma delimited list of bucket names to get size info from, * to get info on all buckets", - cxxopts::value(m_SizeInfoBucketNames), - "<bucketnames>"); - m_Options.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), "<bucketname>"); - m_Options.add_option("", "", "bucketsize", "Show detailed bucket size info", cxxopts::value(m_BucketSizeInfo), "<bucketsize>"); - - m_Options.parse_positional({"namespace", "bucket"}); + m_SubOptions.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), "<namespacename>"); + m_SubOptions.add_option("", + "", + "bucketsizes", + "Comma delimited list of bucket names to get size info from, * to get info on all buckets", + cxxopts::value(m_SizeInfoBucketNames), + "<bucketnames>"); + m_SubOptions.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), "<bucketname>"); + m_SubOptions.add_option("", "", "bucketsize", "Show detailed bucket size info", cxxopts::value(m_BucketSizeInfo), "<bucketsize>"); + m_SubOptions.add_option("", "y", "yaml", "Output as YAML instead of JSON", cxxopts::value(m_YAML), "<yaml>"); + m_SubOptions.parse_positional({"namespace", "bucket"}); } -CacheInfoCommand::~CacheInfoCommand() = default; - void -CacheInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +CacheInfoSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { - ZEN_UNUSED(GlobalOptions); - - if (!ParseOptions(argc, argv)) - { - return; - } - - m_HostName = ResolveTargetHostSpec(m_HostName); - - if (m_HostName.empty()) - { - throw OptionParseException("Unable to resolve server specification", m_Options.help()); - } + ResolveHost(); std::string Url; - if (m_HostName.empty()) + if (m_NamespaceName.empty()) { if (!m_SizeInfoBucketNames.empty()) { - throw OptionParseException("'--bucketsizes' requires '--namespace'", m_Options.help()); + throw OptionParseException("'--bucketsizes' requires '--namespace'", m_SubOptions.help()); } if (m_BucketSizeInfo) { - throw OptionParseException("'--bucketsize' requires '--namespace' and '--bucket'", m_Options.help()); + throw OptionParseException("'--bucketsize' requires '--namespace' and '--bucket'", m_SubOptions.help()); } ZEN_CONSOLE("Info on cache from '{}'", m_HostName); Url = "/z$"; @@ -171,7 +244,7 @@ CacheInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (m_BucketSizeInfo) { throw OptionParseException(fmt::format("'--bucketsize' requires '--namespace' and '--bucket' ('{}')", m_BucketName), - m_Options.help()); + m_SubOptions.help()); } ZEN_CONSOLE("Info on cache namespace '{}' from '{}'", m_NamespaceName, m_HostName); Url = fmt::format("/z$/{}", m_NamespaceName); @@ -180,7 +253,7 @@ CacheInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { if (!m_SizeInfoBucketNames.empty()) { - throw OptionParseException("'--bucketsizes' conflicts with '--bucket'", m_Options.help()); + throw OptionParseException("'--bucketsizes' conflicts with '--bucket'", m_SubOptions.help()); } ZEN_CONSOLE("Info on cache bucket '{}/{}' from '{}'", m_NamespaceName, m_BucketName, m_HostName); Url = fmt::format("/z$/{}/{}", m_NamespaceName, m_BucketName); @@ -196,8 +269,10 @@ CacheInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) Parameters.Entries.insert({"bucketsize", "true"}); } - HttpClient Http = CreateHttpClient(m_HostName); - if (HttpClient::Response Response = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON), Parameters)) + const ZenContentType AcceptType = m_YAML ? ZenContentType::kYAML : ZenContentType::kJSON; + + HttpClient Http = CacheCommand::CreateHttpClient(m_HostName); + if (HttpClient::Response Response = Http.Get(Url, HttpClient::Accept(AcceptType), Parameters)) { ZEN_CONSOLE("{}", Response.ToText()); } @@ -207,76 +282,59 @@ CacheInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } -CacheStatsCommand::CacheStatsCommand() +//////////////////////////////////////////////////////////////////////////////// +// CacheStatsSubCmd + +CacheStatsSubCmd::CacheStatsSubCmd() : CacheSubCmdBase("stats", "Stats on cache") { - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_SubOptions.add_option("", "y", "yaml", "Output as YAML instead of JSON", cxxopts::value(m_YAML), "<yaml>"); } -CacheStatsCommand::~CacheStatsCommand() = default; - void -CacheStatsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +CacheStatsSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { - ZEN_UNUSED(GlobalOptions); - - if (!ParseOptions(argc, argv)) - { - return; - } + ResolveHost(); - m_HostName = ResolveTargetHostSpec(m_HostName); + const ZenContentType AcceptType = m_YAML ? ZenContentType::kYAML : ZenContentType::kJSON; - if (m_HostName.empty()) - { - throw OptionParseException("Unable to resolve server specification", m_Options.help()); - } - - HttpClient Http = CreateHttpClient(m_HostName); - if (HttpClient::Response Response = Http.Get("/stats/z$", HttpClient::Accept(ZenContentType::kJSON))) + HttpClient Http = CacheCommand::CreateHttpClient(m_HostName); + if (HttpClient::Response Response = Http.Get("/stats/z$", HttpClient::Accept(AcceptType))) { ZEN_CONSOLE("{}", Response.ToText()); } else { - Response.ThrowError("Info failed"); + Response.ThrowError("Stats failed"); } } -CacheDetailsCommand::CacheDetailsCommand() +//////////////////////////////////////////////////////////////////////////////// +// CacheDetailsSubCmd + +CacheDetailsSubCmd::CacheDetailsSubCmd() : CacheSubCmdBase("details", "Details on cache") { - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); - m_Options.add_option("", "c", "csv", "Info on csv format", cxxopts::value(m_CSV), "<csv>"); - m_Options.add_option("", "d", "details", "Get detailed information about records", cxxopts::value(m_Details), "<details>"); - m_Options.add_option("", - "a", - "attachmentdetails", - "Get detailed information about attachments", - cxxopts::value(m_AttachmentDetails), - "<attachmentdetails>"); - m_Options.add_option("", "n", "namespace", "Namespace name to get info for", cxxopts::value(m_Namespace), "<namespace>"); - m_Options.add_option("", "b", "bucket", "Filter on bucket name", cxxopts::value(m_Bucket), "<bucket>"); - m_Options.add_option("", "v", "valuekey", "Filter on value key hash string", cxxopts::value(m_ValueKey), "<valuekey>"); + m_SubOptions.add_option("", "c", "csv", "Output as CSV instead of JSON", cxxopts::value(m_CSV), "<csv>"); + m_SubOptions.add_option("", "y", "yaml", "Output as YAML instead of JSON", cxxopts::value(m_YAML), "<yaml>"); + m_SubOptions.add_option("", "d", "details", "Get detailed information about records", cxxopts::value(m_Details), "<details>"); + m_SubOptions.add_option("", + "a", + "attachmentdetails", + "Get detailed information about attachments", + cxxopts::value(m_AttachmentDetails), + "<attachmentdetails>"); + m_SubOptions.add_option("", "n", "namespace", "Namespace name to get info for", cxxopts::value(m_Namespace), "<namespace>"); + m_SubOptions.add_option("", "b", "bucket", "Filter on bucket name", cxxopts::value(m_Bucket), "<bucket>"); + m_SubOptions.add_option("", "v", "valuekey", "Filter on value key hash string", cxxopts::value(m_ValueKey), "<valuekey>"); } -CacheDetailsCommand::~CacheDetailsCommand() = default; - void -CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +CacheDetailsSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { - ZEN_UNUSED(GlobalOptions); - - if (!ParseOptions(argc, argv)) - { - return; - } + ResolveHost(); - m_HostName = ResolveTargetHostSpec(m_HostName); - - if (m_HostName.empty()) + if (m_CSV && m_YAML) { - throw OptionParseException("Unable to resolve server specification", m_Options.help()); + throw OptionParseException("'--csv' conflicts with '--yaml'", m_SubOptions.help()); } HttpClient::KeyValueMap Parameters; @@ -296,7 +354,7 @@ CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar } else { - Headers = HttpClient::Accept(ZenContentType::kJSON); + Headers = HttpClient::Accept(m_YAML ? ZenContentType::kYAML : ZenContentType::kJSON); } std::string Url; @@ -304,11 +362,11 @@ CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar { if (m_Namespace.empty()) { - throw OptionParseException("'--namespace' is required", m_Options.help()); + throw OptionParseException("'--namespace' is required", m_SubOptions.help()); } if (m_Bucket.empty()) { - throw OptionParseException("'--bucket' is required", m_Options.help()); + throw OptionParseException("'--bucket' is required", m_SubOptions.help()); } Url = fmt::format("/z$/details$/{}/{}/{}", m_Namespace, m_Bucket, m_ValueKey); } @@ -316,7 +374,7 @@ CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar { if (m_Namespace.empty()) { - throw OptionParseException("'--namespace' is required", m_Options.help()); + throw OptionParseException("'--namespace' is required", m_SubOptions.help()); } Url = fmt::format("/z$/details$/{}/{}", m_Namespace, m_Bucket); } @@ -329,61 +387,48 @@ CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar Url = "/z$/details$"; } - HttpClient Http = CreateHttpClient(m_HostName); + HttpClient Http = CacheCommand::CreateHttpClient(m_HostName); if (HttpClient::Response Response = Http.Get(Url, Headers, Parameters)) { ZEN_CONSOLE("{}", Response.ToText()); } else { - Response.ThrowError("Info failed"); + Response.ThrowError("Details failed"); } } -CacheGenerateCommand::CacheGenerateCommand() +//////////////////////////////////////////////////////////////////////////////// +// CacheGenSubCmd + +CacheGenSubCmd::CacheGenSubCmd() : CacheSubCmdBase("gen", "Generates cache values into a bucket") { - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); - m_Options + m_SubOptions .add_option("", "n", "namespace", "Namespace to generate cache values/records for", cxxopts::value(m_Namespace), "<namespace>"); - m_Options.add_option("", "b", "bucket", "Bucket name to generate cache values/records for", cxxopts::value(m_Bucket), "<bucket>"); - m_Options.add_option("", "", "count", "Number of cache values/records to generate", cxxopts::value(m_Count), "<count>"); - m_Options.add_option("", "", "min-size", "Minimum size of cache value/attachments", cxxopts::value(m_MinSize), "<min>"); - m_Options.add_option("", "", "max-size", "Maximum size of cache value/attachments", cxxopts::value(m_MaxSize), "<max>"); - m_Options.add_option("", - "", - "min-attachments", - "Minimum number of attachments when creating record based values", - cxxopts::value(m_MinAttachmentCount), - "<minattachments>"); - m_Options.add_option("", - "", - "max-attachments", - "Minimum number of attachments when creating record based values, 0 to only create cache values", - cxxopts::value(m_MaxAttachmentCount), - "<maxattachments>"); - m_Options.parse_positional({"namespace", "bucket", "count"}); - m_Options.positional_help("namespace bucket count"); + m_SubOptions.add_option("", "b", "bucket", "Bucket name to generate cache values/records for", cxxopts::value(m_Bucket), "<bucket>"); + m_SubOptions.add_option("", "", "count", "Number of cache values/records to generate", cxxopts::value(m_Count), "<count>"); + m_SubOptions.add_option("", "", "min-size", "Minimum size of cache value/attachments", cxxopts::value(m_MinSize), "<min>"); + m_SubOptions.add_option("", "", "max-size", "Maximum size of cache value/attachments", cxxopts::value(m_MaxSize), "<max>"); + m_SubOptions.add_option("", + "", + "min-attachments", + "Minimum number of attachments when creating record based values", + cxxopts::value(m_MinAttachmentCount), + "<minattachments>"); + m_SubOptions.add_option("", + "", + "max-attachments", + "Minimum number of attachments when creating record based values, 0 to only create cache values", + cxxopts::value(m_MaxAttachmentCount), + "<maxattachments>"); + m_SubOptions.parse_positional({"namespace", "bucket", "count"}); + m_SubOptions.positional_help("namespace bucket count"); } -CacheGenerateCommand::~CacheGenerateCommand() = default; - void -CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +CacheGenSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { - ZEN_UNUSED(GlobalOptions); - - if (!ParseOptions(argc, argv)) - { - return; - } - - m_HostName = ResolveTargetHostSpec(m_HostName); - - if (m_HostName.empty()) - { - throw OptionParseException("Unable to resolve server specification", m_Options.help()); - } + ResolveHost(); if (m_MaxSize == 0 && m_MinSize == 0) { @@ -400,6 +445,16 @@ CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a } } + // The size-range expansion below requires MinSize >= 1 (it uses + // `MinSize - 1` as a uniform distribution upper bound, which would + // underflow on an unsigned zero) and MaxSize >= MinSize. + if (m_MinSize == 0 || m_MaxSize < m_MinSize) + { + throw OptionParseException( + fmt::format("'--min-size' ({}) must be >= 1 and '--max-size' ({}) must be >= '--min-size'", m_MinSize, m_MaxSize), + m_SubOptions.help()); + } + std::vector<std::uniform_int_distribution<uint64_t>> Variations; std::vector<size_t> SizeRanges; SizeRanges.push_back(m_MinSize); @@ -431,7 +486,7 @@ CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a std::uniform_int_distribution<uint64_t> KeyDistribution; - HttpClient Http = CreateHttpClient(m_HostName); + HttpClient Http = CacheCommand::CreateHttpClient(m_HostName); auto GeneratePutCacheValueRequest( [this, &KeyDistribution, &Generator](std::span<std::uint64_t> BatchSizes, uint64_t RequestIndex) -> CbPackage { @@ -583,68 +638,52 @@ CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a } } -CacheGetCommand::CacheGetCommand() +//////////////////////////////////////////////////////////////////////////////// +// CacheGetSubCmd + +CacheGetSubCmd::CacheGetSubCmd() : CacheSubCmdBase("get", "Get cache values/records or attachments from a bucket") { - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); - m_Options - .add_option("", "n", "namespace", "Namespace to generate cache values/records for", cxxopts::value(m_Namespace), "<namespace>"); - m_Options.add_option("", "b", "bucket", "Bucket name to generate cache values/records for", cxxopts::value(m_Bucket), "<bucket>"); - m_Options.add_option("", "v", "valuekey", "Cache entry iohash id", cxxopts::value(m_ValueKey), "<valuekey>"); - m_Options.add_option("", - "a", - "attachmenthash", - "For a cache entry record, get a particular attachment based on the 'RawHash'", - cxxopts::value(m_AttachmentHash), - "<attachmenthash>"); - m_Options.add_option("", "o", "output-path", "File path for output data", cxxopts::value(m_OutputPath), "<path>"); - m_Options.add_option("", "t", "text", "Ouput content of cache entry record as text", cxxopts::value(m_AsText), "<text>"); - m_Options + m_SubOptions.add_option("", "n", "namespace", "Namespace of the cache entry", cxxopts::value(m_Namespace), "<namespace>"); + m_SubOptions.add_option("", "b", "bucket", "Bucket of the cache entry", cxxopts::value(m_Bucket), "<bucket>"); + m_SubOptions.add_option("", "v", "valuekey", "Cache entry iohash id", cxxopts::value(m_ValueKey), "<valuekey>"); + m_SubOptions.add_option("", + "a", + "attachmenthash", + "For a cache entry record, get a particular attachment based on the 'RawHash'", + cxxopts::value(m_AttachmentHash), + "<attachmenthash>"); + m_SubOptions.add_option("", "o", "output-path", "File path for output data", cxxopts::value(m_OutputPath), "<path>"); + m_SubOptions.add_option("", "t", "text", "Output content of cache entry record as text", cxxopts::value(m_AsText), "<text>"); + m_SubOptions .add_option("", "d", "decompress", "Decompress data when applicable. Default = true", cxxopts::value(m_Decompress), "<decompress>"); - m_Options.parse_positional({"namespace", "bucket", "valuekey", "attachmenthash"}); - m_Options.positional_help("namespace bucket valuekey attachmenthash"); + m_SubOptions.parse_positional({"namespace", "bucket", "valuekey", "attachmenthash"}); + m_SubOptions.positional_help("namespace bucket valuekey attachmenthash"); } -CacheGetCommand::~CacheGetCommand() = default; - void -CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +CacheGetSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { - ZEN_UNUSED(GlobalOptions); - - using namespace std::literals; - - if (!ParseOptions(argc, argv)) - { - return; - } - - m_HostName = ResolveTargetHostSpec(m_HostName); - - if (m_HostName.empty()) - { - throw OptionParseException("Unable to resolve server specification", m_Options.help()); - } + ResolveHost(); if (m_Namespace.empty()) { - throw OptionParseException("'--namespace' is required", m_Options.help()); + throw OptionParseException("'--namespace' is required", m_SubOptions.help()); } if (m_Bucket.empty()) { - throw OptionParseException("'--bucket' is required", m_Options.help()); + throw OptionParseException("'--bucket' is required", m_SubOptions.help()); } if (m_ValueKey.empty()) { - throw OptionParseException("'--valuekey' is required", m_Options.help()); + throw OptionParseException("'--valuekey' is required", m_SubOptions.help()); } IoHash ValueId; if (!IoHash::TryParse(m_ValueKey, ValueId)) { - throw OptionParseException(fmt::format("'--value-key' ('{}') is malformed", m_ValueKey), m_Options.help()); + throw OptionParseException(fmt::format("'--valuekey' ('{}') is malformed", m_ValueKey), m_SubOptions.help()); } IoHash AttachmentHash; @@ -652,11 +691,11 @@ CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { if (!IoHash::TryParse(m_AttachmentHash, AttachmentHash)) { - throw OptionParseException(fmt::format("'--attachmenthash' ('{}') is malformed", m_AttachmentHash), m_Options.help()); + throw OptionParseException(fmt::format("'--attachmenthash' ('{}') is malformed", m_AttachmentHash), m_SubOptions.help()); } } - HttpClient Http = CreateHttpClient(m_HostName); + HttpClient Http = CacheCommand::CreateHttpClient(m_HostName); if (!m_OutputPath.empty()) { @@ -674,7 +713,7 @@ CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_OutputPath = (m_AttachmentHash.empty() ? m_ValueKey : m_AttachmentHash); } - std::string Url = fmt::format("/z$/{}/{}/{}", m_Namespace, m_Bucket, m_ValueKey); + std::string Url = fmt::format("/z$/{}/{}/{}", m_Namespace, m_Bucket, ValueId); if (AttachmentHash != IoHash::Zero) { Url = fmt::format("{}/{}", Url, AttachmentHash); @@ -720,4 +759,427 @@ CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } +//////////////////////////////////////////////////////////////////////////////// +// CacheRecordSubCmd + +CacheRecordSubCmd::CacheRecordSubCmd() +: CacheSubCmdBase("record", "Start recording cache rpc requests ('cache record <path>'), or stop ('cache record stop')") +{ + m_SubOptions.add_option("", "p", "path", "Recording file path, or 'stop' to stop recording", cxxopts::value(m_Path), "<path>"); + m_SubOptions.parse_positional("path"); + m_SubOptions.positional_help("<path>|stop"); +} + +void +CacheRecordSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) +{ + ResolveHost(); + + HttpClient Http = CacheCommand::CreateHttpClient(m_HostName); + + if (m_Path == "stop") + { + if (HttpClient::Response Response = Http.Post("/z$/exec$/stop-recording"sv)) + { + ZEN_CONSOLE("{}", Response.ToText()); + } + else + { + Response.ThrowError("Failed to stop recording"); + } + return; + } + + if (m_Path.empty()) + { + throw OptionParseException("recording path is required (use '<path>' to start, 'stop' to stop)", m_SubOptions.help()); + } + + if (HttpClient::Response Response = + Http.Post("/z$/exec$/start-recording"sv, HttpClient::KeyValueMap{}, HttpClient::KeyValueMap({{"path", m_Path}}))) + { + ZEN_CONSOLE("{}", Response.ToText()); + } + else + { + Response.ThrowError("Failed to start recording"); + } +} + +//////////////////////////////////////////////////////////////////////////////// +// CacheReplaySubCmd + +CacheReplaySubCmd::CacheReplaySubCmd() : CacheSubCmdBase("replay", "Replays a previously recorded session of rpc requests") +{ + m_SubOptions.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), "<path>"); + m_SubOptions.add_option("", "", "dry", "Do a dry run", cxxopts::value(m_DryRun), "<enable>"); + m_SubOptions.add_option("", + "w", + "numthreads", + "Number of worker threads per process", + cxxopts::value(m_ThreadCount)->default_value(fmt::format("{}", GetHardwareConcurrency())), + "<count>"); + m_SubOptions.add_option("", "", "onhost", "Replay on host, bypassing http/network layer", cxxopts::value(m_OnHost), "<onhost>"); + m_SubOptions.add_option("", + "", + "showmethodstats", + "Show statistics of which RPC methods are used", + cxxopts::value(m_ShowMethodStats), + "<showmethodstats>"); + m_SubOptions.add_option("", + "", + "offset", + "Offset into request recording to start replay", + cxxopts::value(m_Offset)->default_value("0"), + "<offset>"); + m_SubOptions.add_option("", + "", + "stride", + "Stride for request recording when replaying requests", + cxxopts::value(m_Stride)->default_value("1"), + "<stride>"); + m_SubOptions.add_option("", "", "numproc", "Number of worker processes", cxxopts::value(m_ProcessCount)->default_value("1"), "<count>"); + m_SubOptions.add_option("", + "", + "forceallowlocalrefs", + "Force enable local refs in requests", + cxxopts::value(m_ForceAllowLocalRefs), + "<enable>"); + m_SubOptions + .add_option("", "", "disablelocalrefs", "Force disable local refs in requests", cxxopts::value(m_DisableLocalRefs), "<enable>"); + m_SubOptions.add_option("", + "", + "forceallowlocalhandlerefs", + "Force enable local refs as handles in requests", + cxxopts::value(m_ForceAllowLocalHandleRef), + "<enable>"); + m_SubOptions.add_option("", + "", + "disablelocalhandlerefs", + "Force disable local refs as handles in requests", + cxxopts::value(m_DisableLocalHandleRefs), + "<enable>"); + m_SubOptions.add_option("", + "", + "forceallowpartiallocalrefs", + "Force enable local refs for all sizes", + cxxopts::value(m_ForceAllowPartialLocalRefs), + "<enable>"); + m_SubOptions.add_option("", + "", + "disablepartiallocalrefs", + "Force disable local refs for all sizes", + cxxopts::value(m_DisablePartialLocalRefs), + "<enable>"); + m_SubOptions.parse_positional("path"); +} + +void +CacheReplaySubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) +{ + ResolveHost(); + + if (m_RecordingPath.empty()) + { + throw OptionParseException("'--path' is required", m_SubOptions.help()); + } + + if (!IsDir(m_RecordingPath)) + { + throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath)); + } + + if (m_Stride == 0) + { + throw OptionParseException("'--stride' must be >= 1", m_SubOptions.help()); + } + + m_ThreadCount = Max(m_ThreadCount, 1); + + ZEN_CONSOLE("Replay '{}' (start offset {}, stride {}) to '{}', {} threads", + m_RecordingPath, + m_Offset, + m_Stride, + m_HostName, + m_ThreadCount); + + Stopwatch TotalTimer; + + if (m_OnHost) + { + HttpClient Http = CacheCommand::CreateHttpClient(m_HostName); + if (HttpClient::Response Response = + Http.Post("/z$/exec$/replay-recording"sv, + HttpClient::KeyValueMap{}, + HttpClient::KeyValueMap({{"path", m_RecordingPath}, {"thread-count", fmt::format("{}", m_ThreadCount)}}))) + { + ZEN_CONSOLE("{}", Response.ToText()); + + return; + } + else + { + Response.ThrowError("Failed to start replay"); + } + } + + std::unique_ptr<cache::IRpcRequestReplayer> Replayer = cache::MakeDiskRequestReplayer(m_RecordingPath, true); + uint64_t EntryCount = Replayer->GetRequestCount(); + + if (m_Offset >= EntryCount) + { + ZEN_CONSOLE("Offset {} is at or past the end of the recording ({} entries); nothing to replay", m_Offset, EntryCount); + return; + } + + std::atomic_uint64_t EntryOffset = m_Offset; + std::atomic_uint64_t BytesSent = 0; + std::atomic_uint64_t BytesReceived = 0; + + Stopwatch Timer; + + // The subcommand API does not receive argv, so look the zen executable path + // up from the current process to spawn child workers. + const std::filesystem::path SelfExePath = GetRunningExecutablePath(); + + if (m_ProcessCount > 1) + { + std::vector<std::unique_ptr<ProcessHandle>> WorkerProcesses; + WorkerProcesses.resize(m_ProcessCount); + + ProcessMonitor Monitor; + for (int ProcessIndex = 0; ProcessIndex < m_ProcessCount; ++ProcessIndex) + { + std::string CommandLine = + fmt::format("{} cache replay --hosturl {} --path \"{}\" --offset {} --stride {} --numthreads {} --numproc {}"sv, + SelfExePath.string(), + m_HostName, + m_RecordingPath, + m_Stride == 1 ? 0 : m_Offset + ProcessIndex, + m_Stride, + m_ThreadCount, + 1); + CreateProcResult Result(CreateProc(SelfExePath, CommandLine)); + WorkerProcesses[ProcessIndex] = std::make_unique<ProcessHandle>(); + WorkerProcesses[ProcessIndex]->Initialize(Result); + Monitor.AddPid(WorkerProcesses[ProcessIndex]->Pid()); + } + while (Monitor.IsRunning()) + { + ZEN_CONSOLE("Waiting for worker processes..."); + Sleep(1000); + } + return; + } + else + { + std::map<std::string, size_t> MethodTypes; + RwLock MethodTypesLock; + + WorkerThreadPool WorkerPool(m_ThreadCount); + + Latch WorkLatch(m_ThreadCount); + for (int WorkerIndex = 0; WorkerIndex < m_ThreadCount; ++WorkerIndex) + { + WorkerPool.ScheduleWork( + [this, &WorkLatch, EntryCount, &EntryOffset, &Replayer, &BytesSent, &BytesReceived, &MethodTypes, &MethodTypesLock]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + + std::map<std::string, size_t> LocalMethodTypes; + + auto ReduceTypes = MakeGuard([&] { + RwLock::ExclusiveLockScope __(MethodTypesLock); + + for (auto& Entry : LocalMethodTypes) + { + MethodTypes[Entry.first] += Entry.second; + } + }); + + HttpClient Http = CacheCommand::CreateHttpClient(m_HostName); + + uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride); + while (EntryIndex < EntryCount) + { + IoBuffer Payload; + const zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload); + + if (RequestInfo != zen::cache::RecordedRequestInfo::NullRequest) + { + CbPackage RequestPackage; + CbObject Request; + + switch (RequestInfo.ContentType) + { + case ZenContentType::kCbPackage: + { + if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage)) + { + Request = RequestPackage.GetObject(); + } + } + break; + case ZenContentType::kCbObject: + { + Request = LoadCompactBinaryObject(Payload); + } + break; + } + + RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u)); + int OriginalProcessPid = Request["Pid"sv].AsInt32(0); + + int AdjustedPid = 0; + RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone; + + if (!m_DisableLocalRefs) + { + if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || + m_ForceAllowLocalRefs) + { + AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences; + if (!m_DisablePartialLocalRefs) + { + if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) || + m_ForceAllowPartialLocalRefs) + { + AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences; + } + } + if (!m_DisableLocalHandleRefs) + { + if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef) + { + AdjustedPid = GetCurrentProcessId(); + } + } + } + } + + if (m_ShowMethodStats) + { + std::string MethodName = std::string(Request["Method"sv].AsString()); + if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end()) + { + It->second++; + } + else + { + LocalMethodTypes[MethodName] = 1; + } + } + + if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid) + { + CbObjectWriter RequestCopyWriter; + for (const CbFieldView& Field : Request) + { + if (!Field.HasName()) + { + RequestCopyWriter.AddField(Field); + continue; + } + std::string_view FieldName = Field.GetName(); + if (FieldName == "Pid"sv) + { + continue; + } + if (FieldName == "AcceptFlags"sv) + { + continue; + } + RequestCopyWriter.AddField(FieldName, Field); + } + if (AdjustedPid != 0) + { + RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid); + } + if (AdjustedAcceptOptions != RpcAcceptOptions::kNone) + { + RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions)); + } + + if (RequestInfo.ContentType == ZenContentType::kCbPackage) + { + RequestPackage.SetObject(RequestCopyWriter.Save()); + std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage); + std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end()); + Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer(); + } + else + { + RequestCopyWriter.Finalize(); + Payload = IoBuffer(RequestCopyWriter.GetSaveSize()); + RequestCopyWriter.Save(Payload.GetMutableView()); + } + } + + if (!m_DryRun) + { + Http.SetSessionId(RequestInfo.SessionId); + Payload.SetContentType(RequestInfo.ContentType); + + HttpClient::Response Response = + Http.Post("/z$/$rpc", Payload, {HttpClient::Accept(RequestInfo.AcceptType)}); + + BytesSent.fetch_add(Payload.GetSize()); + if (!Response) + { + ZEN_CONSOLE_ERROR("{}", Response); + break; + } + BytesReceived.fetch_add(Response.DownloadedBytes); + } + } + + EntryIndex = EntryOffset.fetch_add(m_Stride); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + + while (!WorkLatch.Wait(1000)) + { + // EntryCount > m_Offset is guaranteed by the early-return above. + // EntryOffset atomically overshoots EntryCount (fetch_add past the + // end) when the workload finishes, so clamp before subtracting. + const uint64_t RequestsTotal = (EntryCount - m_Offset) / m_Stride; + const uint64_t CurrentOffset = EntryOffset.load(); + const uint64_t RequestsRemaining = CurrentOffset < EntryCount ? (EntryCount - CurrentOffset) / m_Stride : 0; + const uint64_t PercentDone = RequestsTotal > 0 ? (RequestsTotal - RequestsRemaining) * 100 / RequestsTotal : 100; + + ZEN_CONSOLE("[{:3}%] [{}] {} requests, {} remaining (sent {}, received {})", + PercentDone, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + RequestsTotal, + RequestsRemaining, + NiceBytes(BytesSent.load()), + NiceBytes(BytesReceived.load())); + } + + if (m_ShowMethodStats) + { + for (const auto& It : MethodTypes) + { + ZEN_CONSOLE("{:18}: {:10}", It.first, It.second); + } + } + } + + const uint64_t RequestsSent = (EntryOffset.load() - m_Offset) / m_Stride; + const uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); + const uint64_t Sent = BytesSent.load(); + const uint64_t Received = BytesReceived.load(); + + ZEN_CONSOLE("Processed requests: {} ({}), payloads sent {} ({}), payloads received {} ({}) in {}.\nTotal runtime: {}", + RequestsSent, + NiceRate(RequestsSent, ElapsedMS, "req"), + NiceBytes(Sent), + NiceByteRate(Sent, ElapsedMS), + NiceBytes(Received), + NiceByteRate(Received, ElapsedMS), + NiceTimeSpanMs(ElapsedMS), + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); +} + } // namespace zen |