// Copyright Epic Games, Inc. All Rights Reserved. #include "cache_cmd.h" #include "zenserviceclient.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #include #include namespace zen { using namespace std::literals; namespace { IoBuffer CreateRandomBlob(uint64_t Size) { static uint64_t Seed{0x7CEBF54E45B9F5D1}; auto Next = [](uint64_t& seed) { uint64_t z = (seed += UINT64_C(0x9E3779B97F4A7C15)); z = (z ^ (z >> 30)) * UINT64_C(0xBF58476D1CE4E5B9); z = (z ^ (z >> 27)) * UINT64_C(0x94D049BB133111EB); return z ^ (z >> 31); }; IoBuffer Data(Size); uint64_t* DataPtr = reinterpret_cast(Data.MutableData()); while (Size > sizeof(uint64_t)) { *DataPtr++ = Next(Seed); Size -= sizeof(uint64_t); } uint64_t ByteNext = Next(Seed); uint8_t* ByteDataPtr = reinterpret_cast(DataPtr); while (Size > 0) { *ByteDataPtr++ = static_cast(ByteNext & 0xff); ByteNext >>= 8; Size--; } return Data; }; CompressedBuffer CompressBlob(IoBuffer&& Buffer) { return CompressedBuffer::Compress(SharedBuffer(Buffer), OodleCompressor::Mermaid, OodleCompressionLevel::SuperFast); } } // namespace //////////////////////////////////////////////////////////////////////////////// // CacheCommand CacheCommand::CacheCommand() { m_Options.add_options()("h,help", "Print help"); AddSubCommand(m_DetailsSubCmd); AddSubCommand(m_DropSubCmd); AddSubCommand(m_GenSubCmd); AddSubCommand(m_GetSubCmd); AddSubCommand(m_InfoSubCmd); AddSubCommand(m_RecordSubCmd); AddSubCommand(m_ReplaySubCmd); AddSubCommand(m_StatsSubCmd); } CacheCommand::~CacheCommand() = default; //////////////////////////////////////////////////////////////////////////////// // CacheSubCmdBase CacheSubCmdBase::CacheSubCmdBase(std::string_view Name, std::string_view Description) : ZenSubCmdBase(Name, Description) { m_SubOptions.add_option("", "u", "hosturl", ZenCmdBase::kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), ""); } void CacheSubCmdBase::ResolveHost() { m_HostName = ZenCmdBase::ResolveTargetHostSpec(m_HostName); if (m_HostName.empty()) { throw OptionParseException("Unable to resolve server specification", m_SubOptions.help()); } } //////////////////////////////////////////////////////////////////////////////// // Legacy shim dispatcher namespace cache_legacy_shim { static void Dispatch(std::span Injected, const ZenCliOptions& GlobalOptions, int argc, char** argv) { // cxxopts treats argv as writable char** in the style of C main(argv). // Stage the injected tokens in writable std::string storage so we never // hand out pointers to string literals. std::vector Storage; Storage.reserve(Injected.size()); for (std::string_view Token : Injected) { Storage.emplace_back(Token); } std::vector NewArgv; NewArgv.reserve(static_cast(argc) + Storage.size()); NewArgv.push_back(argv[0]); for (std::string& Token : Storage) { NewArgv.push_back(Token.data()); } for (int i = 1; i < argc; ++i) { NewArgv.push_back(argv[i]); } CacheCommand Impl; Impl.Run(GlobalOptions, static_cast(NewArgv.size()), NewArgv.data()); } void RunAs(const char* SubCommandName, const ZenCliOptions& GlobalOptions, int argc, char** argv) { const std::string_view Tokens[] = {std::string_view(SubCommandName)}; Dispatch(Tokens, GlobalOptions, argc, argv); } } // namespace cache_legacy_shim // RpcStopRecordingCommand is unique among legacy shims in that it needs to // inject two tokens ("record" and "stop") rather than a single subcommand name. void RpcStopRecordingCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { using namespace std::literals; const std::string_view Tokens[] = {"record"sv, "stop"sv}; cache_legacy_shim::Dispatch(Tokens, GlobalOptions, argc, argv); } //////////////////////////////////////////////////////////////////////////////// // CacheDropSubCmd CacheDropSubCmd::CacheDropSubCmd() : CacheSubCmdBase("drop", "Drop cache namespace or bucket") { m_SubOptions.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), ""); m_SubOptions.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), ""); m_SubOptions.parse_positional({"namespace", "bucket"}); } void CacheDropSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { ResolveHost(); ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "drop"}); HttpClient& Http = Service.Http(); if (m_NamespaceName.empty()) { throw OptionParseException("'--namespace' is required", m_SubOptions.help()); } std::string Url; std::string DropDescription; if (m_BucketName.empty()) { DropDescription = fmt::format("cache namespace '{}' from '{}'", m_NamespaceName, Service.HostSpec()); Url = fmt::format("/z$/{}", m_NamespaceName); } else { DropDescription = fmt::format("cache bucket '{}/{}' from '{}'", m_NamespaceName, m_BucketName, Service.HostSpec()); Url = fmt::format("/z$/{}/{}", m_NamespaceName, m_BucketName); } ZEN_CONSOLE("Dropping {}", DropDescription); if (HttpClient::Response Response = Http.Delete(Url)) { ZEN_CONSOLE("{}", Response.ToText()); } else { Response.ThrowError(fmt::format("Failed to drop {}", DropDescription)); } } //////////////////////////////////////////////////////////////////////////////// // CacheInfoSubCmd CacheInfoSubCmd::CacheInfoSubCmd() : CacheSubCmdBase("info", "Info on cache, namespace or bucket") { m_SubOptions.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), ""); m_SubOptions.add_option("", "", "bucketsizes", "Comma delimited list of bucket names to get size info from, * to get info on all buckets", cxxopts::value(m_SizeInfoBucketNames), ""); m_SubOptions.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), ""); m_SubOptions.add_option("", "", "bucketsize", "Show detailed bucket size info", cxxopts::value(m_BucketSizeInfo), ""); m_SubOptions.add_option("", "y", "yaml", "Output as YAML instead of JSON", cxxopts::value(m_YAML), ""); m_SubOptions.parse_positional({"namespace", "bucket"}); } void CacheInfoSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { ResolveHost(); ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "info"}); HttpClient& Http = Service.Http(); std::string Url; if (m_NamespaceName.empty()) { if (!m_SizeInfoBucketNames.empty()) { throw OptionParseException("'--bucketsizes' requires '--namespace'", m_SubOptions.help()); } if (m_BucketSizeInfo) { throw OptionParseException("'--bucketsize' requires '--namespace' and '--bucket'", m_SubOptions.help()); } ZEN_CONSOLE("Info on cache from '{}'", Service.HostSpec()); Url = "/z$"; } else if (m_BucketName.empty()) { if (m_BucketSizeInfo) { throw OptionParseException(fmt::format("'--bucketsize' requires '--namespace' and '--bucket' ('{}')", m_BucketName), m_SubOptions.help()); } ZEN_CONSOLE("Info on cache namespace '{}' from '{}'", m_NamespaceName, Service.HostSpec()); Url = fmt::format("/z$/{}", m_NamespaceName); } else { if (!m_SizeInfoBucketNames.empty()) { throw OptionParseException("'--bucketsizes' conflicts with '--bucket'", m_SubOptions.help()); } ZEN_CONSOLE("Info on cache bucket '{}/{}' from '{}'", m_NamespaceName, m_BucketName, Service.HostSpec()); Url = fmt::format("/z$/{}/{}", m_NamespaceName, m_BucketName); } HttpClient::KeyValueMap Parameters; if (!m_SizeInfoBucketNames.empty()) { Parameters.Entries.insert({"bucketsizes", m_SizeInfoBucketNames}); } if (m_BucketSizeInfo) { Parameters.Entries.insert({"bucketsize", "true"}); } const ZenContentType AcceptType = m_YAML ? ZenContentType::kYAML : ZenContentType::kJSON; if (HttpClient::Response Response = Http.Get(Url, HttpClient::Accept(AcceptType), Parameters)) { ZEN_CONSOLE("{}", Response.ToText()); } else { Response.ThrowError("Info failed"); } } //////////////////////////////////////////////////////////////////////////////// // CacheStatsSubCmd CacheStatsSubCmd::CacheStatsSubCmd() : CacheSubCmdBase("stats", "Stats on cache") { m_SubOptions.add_option("", "y", "yaml", "Output as YAML instead of JSON", cxxopts::value(m_YAML), ""); } void CacheStatsSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { ResolveHost(); ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "stats"}); HttpClient& Http = Service.Http(); const ZenContentType AcceptType = m_YAML ? ZenContentType::kYAML : ZenContentType::kJSON; if (HttpClient::Response Response = Http.Get("/stats/z$", HttpClient::Accept(AcceptType))) { ZEN_CONSOLE("{}", Response.ToText()); } else { Response.ThrowError("Stats failed"); } } //////////////////////////////////////////////////////////////////////////////// // CacheDetailsSubCmd CacheDetailsSubCmd::CacheDetailsSubCmd() : CacheSubCmdBase("details", "Details on cache") { m_SubOptions.add_option("", "c", "csv", "Output as CSV instead of JSON", cxxopts::value(m_CSV), ""); m_SubOptions.add_option("", "y", "yaml", "Output as YAML instead of JSON", cxxopts::value(m_YAML), ""); m_SubOptions.add_option("", "d", "details", "Get detailed information about records", cxxopts::value(m_Details), "
"); m_SubOptions.add_option("", "a", "attachmentdetails", "Get detailed information about attachments", cxxopts::value(m_AttachmentDetails), ""); m_SubOptions.add_option("", "n", "namespace", "Namespace name to get info for", cxxopts::value(m_Namespace), ""); m_SubOptions.add_option("", "b", "bucket", "Filter on bucket name", cxxopts::value(m_Bucket), ""); m_SubOptions.add_option("", "v", "valuekey", "Filter on value key hash string", cxxopts::value(m_ValueKey), ""); } void CacheDetailsSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { ResolveHost(); ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "details"}); HttpClient& Http = Service.Http(); if (m_CSV && m_YAML) { throw OptionParseException("'--csv' conflicts with '--yaml'", m_SubOptions.help()); } HttpClient::KeyValueMap Parameters; if (m_Details) { Parameters.Entries.insert({"details", "true"}); } if (m_AttachmentDetails) { Parameters.Entries.insert({"attachmentdetails", "true"}); } HttpClient::KeyValueMap Headers; if (m_CSV) { Parameters.Entries.insert({"csv", "true"}); } else { Headers = HttpClient::Accept(m_YAML ? ZenContentType::kYAML : ZenContentType::kJSON); } std::string Url; if (!m_ValueKey.empty()) { if (m_Namespace.empty()) { throw OptionParseException("'--namespace' is required", m_SubOptions.help()); } if (m_Bucket.empty()) { throw OptionParseException("'--bucket' is required", m_SubOptions.help()); } Url = fmt::format("/z$/details$/{}/{}/{}", m_Namespace, m_Bucket, m_ValueKey); } else if (!m_Bucket.empty()) { if (m_Namespace.empty()) { throw OptionParseException("'--namespace' is required", m_SubOptions.help()); } Url = fmt::format("/z$/details$/{}/{}", m_Namespace, m_Bucket); } else if (!m_Namespace.empty()) { Url = fmt::format("/z$/details$/{}", m_Namespace); } else { Url = "/z$/details$"; } if (HttpClient::Response Response = Http.Get(Url, Headers, Parameters)) { ZEN_CONSOLE("{}", Response.ToText()); } else { Response.ThrowError("Details failed"); } } //////////////////////////////////////////////////////////////////////////////// // CacheGenSubCmd CacheGenSubCmd::CacheGenSubCmd() : CacheSubCmdBase("gen", "Generates cache values into a bucket") { m_SubOptions .add_option("", "n", "namespace", "Namespace to generate cache values/records for", cxxopts::value(m_Namespace), ""); m_SubOptions.add_option("", "b", "bucket", "Bucket name to generate cache values/records for", cxxopts::value(m_Bucket), ""); m_SubOptions.add_option("", "", "count", "Number of cache values/records to generate", cxxopts::value(m_Count), ""); m_SubOptions.add_option("", "", "min-size", "Minimum size of cache value/attachments", cxxopts::value(m_MinSize), ""); m_SubOptions.add_option("", "", "max-size", "Maximum size of cache value/attachments", cxxopts::value(m_MaxSize), ""); m_SubOptions.add_option("", "", "min-attachments", "Minimum number of attachments when creating record based values", cxxopts::value(m_MinAttachmentCount), ""); m_SubOptions.add_option("", "", "max-attachments", "Minimum number of attachments when creating record based values, 0 to only create cache values", cxxopts::value(m_MaxAttachmentCount), ""); m_SubOptions.parse_positional({"namespace", "bucket", "count"}); m_SubOptions.positional_help("namespace bucket count"); } void CacheGenSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { ResolveHost(); ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "gen"}); HttpClient& Http = Service.Http(); if (m_MaxSize == 0 && m_MinSize == 0) { m_MinSize = 17; if (m_MaxAttachmentCount == 0) { // For cache values this max size will result in about 0.5% of values being saved as loose file in a cache bucket m_MaxSize = 65u * 1024u; } else { // For cache records this max size will result in about 0.5% of attachments begin saved as loose files in cas m_MaxSize = 768u * 1024u; } } // The size-range expansion below requires MinSize >= 1 (it uses // `MinSize - 1` as a uniform distribution upper bound, which would // underflow on an unsigned zero) and MaxSize >= MinSize. if (m_MinSize == 0 || m_MaxSize < m_MinSize) { throw OptionParseException( fmt::format("'--min-size' ({}) must be >= 1 and '--max-size' ({}) must be >= '--min-size'", m_MinSize, m_MaxSize), m_SubOptions.help()); } std::vector> Variations; std::vector SizeRanges; SizeRanges.push_back(m_MinSize); Variations.push_back(std::uniform_int_distribution(0, m_MinSize - 1)); while (SizeRanges.back() < m_MaxSize) { SizeRanges.push_back(SizeRanges.back() * 2); Variations.push_back(std::uniform_int_distribution(0, SizeRanges.back() - 1)); } if (SizeRanges.back() > m_MaxSize) { SizeRanges.back() = m_MaxSize; Variations.push_back(std::uniform_int_distribution(0, m_MaxSize - 1)); } std::random_device RandomDevice; std::mt19937 Generator(RandomDevice()); std::uniform_int_distribution SizeRangeDistribution(0, SizeRanges.size() - 1); std::vector Sizes; Sizes.reserve(m_Count); for (uint64_t n = 0; n != m_Count; ++n) { uint64_t Range = SizeRangeDistribution(Generator); uint64_t Size = SizeRanges[Range]; uint64_t Variation = Variations[Range](Generator); Sizes.push_back(Size + Variation); } std::uniform_int_distribution KeyDistribution; auto GeneratePutCacheValueRequest( [this, &KeyDistribution, &Generator](std::span BatchSizes, uint64_t RequestIndex) -> CbPackage { CbPackage Package; CbObjectWriter Writer; Writer << "Method" << "PutCacheValues"; Writer << "Accept" << kCbPkgMagic; Writer.BeginObject("Params"); { Writer << "DefaultPolicy" << WriteToString<128>(CachePolicy::Default); Writer << "Namespace" << m_Namespace; Writer.BeginArray("Requests"); for (std::uint64_t ValueSize : BatchSizes) { Writer.BeginObject(); { uint64_t KeyBase = KeyDistribution(Generator); std::string KeyString = fmt::format("{}-{}-{}", RequestIndex, KeyBase, ValueSize); IoHash ValueKey = IoHash::HashBuffer(KeyString.c_str(), KeyString.length()); Writer.BeginObject("Key"); { Writer << "Bucket" << m_Bucket; Writer << "Hash" << ValueKey; } Writer.EndObject(); // Key CompressedBuffer Payload = CompressBlob(CreateRandomBlob(ValueSize)); Writer.AddBinaryAttachment("RawHash", Payload.DecodeRawHash()); Package.AddAttachment(CbAttachment(Payload, Payload.DecodeRawHash())); } Writer.EndObject(); } Writer.EndArray(); // Requests } Writer.EndObject(); // Params Package.SetObject(Writer.Save()); return Package; }); auto GeneratePutCacheRecordRequest([this, &KeyDistribution, &Generator](std::span BatchSizes, uint64_t RequestIndex) { CbPackage Package; CbObjectWriter Writer; Writer << "Method" << "PutCacheRecords"; Writer << "Accept" << kCbPkgMagic; Writer.BeginObject("Params"); { Writer << "DefaultPolicy" << WriteToString<128>(CachePolicy::Default); Writer << "Namespace" << m_Namespace; Writer.BeginArray("Requests"); { Writer.BeginObject(); { Writer.BeginObject("Record"); { uint64_t KeyBase = KeyDistribution(Generator); std::string RecordKeyString = fmt::format("{}-{}-{}", RequestIndex, KeyBase, BatchSizes.size()); IoHash RecordKey = IoHash::HashBuffer(RecordKeyString.c_str(), RecordKeyString.length()); Writer.BeginObject("Key"); { Writer << "Bucket" << m_Bucket; Writer << "Hash" << RecordKey; } Writer.EndObject(); // Key Writer.BeginArray("Values"); for (std::uint64_t ValueSize : BatchSizes) { Writer.BeginObject(); { Writer.AddObjectId("Id", Oid::NewOid()); CompressedBuffer Payload = CompressBlob(CreateRandomBlob(ValueSize)); Writer.AddBinaryAttachment("RawHash", Payload.DecodeRawHash()); Package.AddAttachment(CbAttachment(Payload, Payload.DecodeRawHash())); Writer.AddInteger("RawSize", Payload.DecodeRawSize()); } Writer.EndObject(); } Writer.EndArray(); // Values } Writer.EndObject(); // Record } Writer.EndObject(); } Writer.EndArray(); // Requests } Writer.EndObject(); // Params Package.SetObject(Writer.Save()); return Package; }); WorkerThreadPool WorkerPool(gsl::narrow(Max((GetHardwareConcurrency() / 2u), 2u))); Latch WorkLatch(1); std::uniform_int_distribution SizeCountDistribution(m_MaxAttachmentCount > 0 ? 0 : 1, m_MaxAttachmentCount > 0 ? m_MaxAttachmentCount : 8); std::size_t Offset = 0; uint64_t RequestIndex = 0; while (Offset < Sizes.size()) { size_t SizeCount = SizeCountDistribution(Generator); std::span BatchSizes = std::span(Sizes).subspan(Offset, Min(Max(SizeCount, 1u), Sizes.size() - Offset)); WorkLatch.AddCount(1); WorkerPool.ScheduleWork( [&, BatchSizes, RequestIndex]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); CbPackage Package; if (m_MaxAttachmentCount > 0 && SizeCount > 0) { Package = GeneratePutCacheRecordRequest(BatchSizes, RequestIndex); } else { Package = GeneratePutCacheValueRequest(BatchSizes, RequestIndex); } if (HttpClient::Response Response = Http.Post("/z$/$rpc", Package, HttpClient::Accept(ZenContentType::kCbPackage)); !Response) { ZEN_CONSOLE("{}", Response.ErrorMessage(fmt::format("{}: ", RequestIndex))); } }, WorkerThreadPool::EMode::EnableBacklog); Offset += BatchSizes.size(); RequestIndex++; } WorkLatch.CountDown(); while (!WorkLatch.Wait(1000)) { ZEN_INFO("Creating data, {} requests remaining", WorkLatch.Remaining()); } } //////////////////////////////////////////////////////////////////////////////// // CacheGetSubCmd CacheGetSubCmd::CacheGetSubCmd() : CacheSubCmdBase("get", "Get cache values/records or attachments from a bucket") { m_SubOptions.add_option("", "n", "namespace", "Namespace of the cache entry", cxxopts::value(m_Namespace), ""); m_SubOptions.add_option("", "b", "bucket", "Bucket of the cache entry", cxxopts::value(m_Bucket), ""); m_SubOptions.add_option("", "v", "valuekey", "Cache entry iohash id", cxxopts::value(m_ValueKey), ""); m_SubOptions.add_option("", "a", "attachmenthash", "For a cache entry record, get a particular attachment based on the 'RawHash'", cxxopts::value(m_AttachmentHash), ""); m_SubOptions.add_option("", "o", "output-path", "File path for output data", cxxopts::value(m_OutputPath), ""); m_SubOptions.add_option("", "t", "text", "Output content of cache entry record as text", cxxopts::value(m_AsText), ""); m_SubOptions .add_option("", "d", "decompress", "Decompress data when applicable. Default = true", cxxopts::value(m_Decompress), ""); m_SubOptions.parse_positional({"namespace", "bucket", "valuekey", "attachmenthash"}); m_SubOptions.positional_help("namespace bucket valuekey attachmenthash"); } void CacheGetSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { using namespace std::literals; ResolveHost(); ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "get"}); HttpClient& Http = Service.Http(); if (m_Namespace.empty()) { throw OptionParseException("'--namespace' is required", m_SubOptions.help()); } if (m_Bucket.empty()) { throw OptionParseException("'--bucket' is required", m_SubOptions.help()); } if (m_ValueKey.empty()) { throw OptionParseException("'--valuekey' is required", m_SubOptions.help()); } IoHash ValueId; if (!IoHash::TryParse(m_ValueKey, ValueId)) { throw OptionParseException(fmt::format("'--valuekey' ('{}') is malformed", m_ValueKey), m_SubOptions.help()); } IoHash AttachmentHash; if (!m_AttachmentHash.empty()) { if (!IoHash::TryParse(m_AttachmentHash, AttachmentHash)) { throw OptionParseException(fmt::format("'--attachmenthash' ('{}') is malformed", m_AttachmentHash), m_SubOptions.help()); } } if (m_OutputPath.empty() && !m_AsText) { throw OptionParseException("'--output-path' is required (or pass '--as-text' to print to stdout)", m_SubOptions.help()); } if (!m_OutputPath.empty()) { if (IsDir(m_OutputPath)) { m_OutputPath = m_OutputPath / (m_AttachmentHash.empty() ? m_ValueKey : m_AttachmentHash); } else { CreateDirectories(m_OutputPath.parent_path()); } } std::string Url = fmt::format("/z$/{}/{}/{}", m_Namespace, m_Bucket, ValueId); if (AttachmentHash != IoHash::Zero) { Url = fmt::format("{}/{}", Url, AttachmentHash); } if (HttpClient::Response Result = Http.Download(Url, std::filesystem::temp_directory_path()); Result) { // `Http.Download` parks the payload in the system temp dir and returns // a buffer that already has delete-on-close set, so every exit path // (exception, fallback WriteFile, `--as-text` console print) reaps it. // A successful MoveToFile below clears the flag so the payload's // handle-close doesn't delete the caller's output afterwards. auto TryDecompress = [](const IoBuffer& Buffer) -> IoBuffer { IoHash RawHash; uint64_t RawSize; if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize)) { return Compressed.Decompress().AsIoBuffer(); }; return Buffer; }; IoBuffer ChunkData = m_Decompress ? TryDecompress(Result.ResponsePayload) : Result.ResponsePayload; if (m_AsText) { std::string StringData = Result.ToText(); if (m_OutputPath.empty()) { ZEN_CONSOLE("{}", StringData); } else { WriteFile(m_OutputPath, IoBuffer(IoBuffer::Wrap, StringData.data(), StringData.length())); ZEN_CONSOLE("Wrote {} to '{}' ({})", NiceBytes(StringData.length()), m_OutputPath, ToString(ChunkData.GetContentType())); } } else { if (MoveToFile(m_OutputPath, ChunkData)) { // The file was renamed into place; clearing DeleteOnClose prevents // the move'd-out file at m_OutputPath from being deleted when the // payload's handle closes. When m_Decompress is false ChunkData // shares a core with ResponsePayload so either clear suffices; // when decompressed ChunkData is in-memory and MoveToFile would // have failed, so we don't reach this branch. Result.ResponsePayload.SetDeleteOnClose(false); } else { WriteFile(m_OutputPath, ChunkData); } ZEN_CONSOLE("Wrote {} to '{}' ({})", NiceBytes(ChunkData.GetSize()), m_OutputPath, ToString(ChunkData.GetContentType())); } } else { Result.ThrowError("Failed to fetch data"sv); } } //////////////////////////////////////////////////////////////////////////////// // CacheRecordSubCmd CacheRecordSubCmd::CacheRecordSubCmd() : CacheSubCmdBase("record", "Start recording cache rpc requests ('cache record '), or stop ('cache record stop')") { m_SubOptions.add_option("", "p", "path", "Recording file path, or 'stop' to stop recording", cxxopts::value(m_Path), ""); m_SubOptions.parse_positional("path"); m_SubOptions.positional_help("|stop"); } void CacheRecordSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { ResolveHost(); ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "record"}); HttpClient& Http = Service.Http(); if (m_Path == "stop") { if (HttpClient::Response Response = Http.Post("/z$/exec$/stop-recording"sv)) { ZEN_CONSOLE("{}", Response.ToText()); } else { Response.ThrowError("Failed to stop recording"); } return; } if (m_Path.empty()) { throw OptionParseException("recording path is required (use '' to start, 'stop' to stop)", m_SubOptions.help()); } if (HttpClient::Response Response = Http.Post("/z$/exec$/start-recording"sv, HttpClient::KeyValueMap{}, HttpClient::KeyValueMap({{"path", m_Path}}))) { ZEN_CONSOLE("{}", Response.ToText()); } else { Response.ThrowError("Failed to start recording"); } } //////////////////////////////////////////////////////////////////////////////// // CacheReplaySubCmd CacheReplaySubCmd::CacheReplaySubCmd() : CacheSubCmdBase("replay", "Replays a previously recorded session of rpc requests") { m_SubOptions.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), ""); m_SubOptions.add_option("", "", "dry", "Do a dry run", cxxopts::value(m_DryRun), ""); m_SubOptions.add_option("", "w", "numthreads", "Number of worker threads per process", cxxopts::value(m_ThreadCount)->default_value(fmt::format("{}", GetHardwareConcurrency())), ""); m_SubOptions.add_option("", "", "onhost", "Replay on host, bypassing http/network layer", cxxopts::value(m_OnHost), ""); m_SubOptions.add_option("", "", "showmethodstats", "Show statistics of which RPC methods are used", cxxopts::value(m_ShowMethodStats), ""); m_SubOptions.add_option("", "", "offset", "Offset into request recording to start replay", cxxopts::value(m_Offset)->default_value("0"), ""); m_SubOptions.add_option("", "", "stride", "Stride for request recording when replaying requests", cxxopts::value(m_Stride)->default_value("1"), ""); m_SubOptions.add_option("", "", "numproc", "Number of worker processes", cxxopts::value(m_ProcessCount)->default_value("1"), ""); m_SubOptions.add_option("", "", "forceallowlocalrefs", "Force enable local refs in requests", cxxopts::value(m_ForceAllowLocalRefs), ""); m_SubOptions .add_option("", "", "disablelocalrefs", "Force disable local refs in requests", cxxopts::value(m_DisableLocalRefs), ""); m_SubOptions.add_option("", "", "forceallowlocalhandlerefs", "Force enable local refs as handles in requests", cxxopts::value(m_ForceAllowLocalHandleRef), ""); m_SubOptions.add_option("", "", "disablelocalhandlerefs", "Force disable local refs as handles in requests", cxxopts::value(m_DisableLocalHandleRefs), ""); m_SubOptions.add_option("", "", "forceallowpartiallocalrefs", "Force enable local refs for all sizes", cxxopts::value(m_ForceAllowPartialLocalRefs), ""); m_SubOptions.add_option("", "", "disablepartiallocalrefs", "Force disable local refs for all sizes", cxxopts::value(m_DisablePartialLocalRefs), ""); m_SubOptions.parse_positional("path"); } void CacheReplaySubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { if (m_RecordingPath.empty()) { throw OptionParseException("'--path' is required", m_SubOptions.help()); } if (!IsDir(m_RecordingPath)) { throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath)); } if (m_Stride == 0) { throw OptionParseException("'--stride' must be >= 1", m_SubOptions.help()); } m_ThreadCount = Max(m_ThreadCount, 1); ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = "replay"}); m_HostName = Service.HostSpec(); ZEN_CONSOLE("Replay '{}' (start offset {}, stride {}) to '{}', {} threads", m_RecordingPath, m_Offset, m_Stride, m_HostName, m_ThreadCount); Stopwatch TotalTimer; if (m_OnHost) { HttpClient& Http = Service.Http(); if (HttpClient::Response Response = Http.Post("/z$/exec$/replay-recording"sv, HttpClient::KeyValueMap{}, HttpClient::KeyValueMap({{"path", m_RecordingPath}, {"thread-count", fmt::format("{}", m_ThreadCount)}}))) { ZEN_CONSOLE("{}", Response.ToText()); return; } else { Response.ThrowError("Failed to start replay"); } } std::unique_ptr Replayer = cache::MakeDiskRequestReplayer(m_RecordingPath, true); uint64_t EntryCount = Replayer->GetRequestCount(); if (m_Offset >= EntryCount) { ZEN_CONSOLE("Offset {} is at or past the end of the recording ({} entries); nothing to replay", m_Offset, EntryCount); return; } std::atomic_uint64_t EntryOffset = m_Offset; std::atomic_uint64_t BytesSent = 0; std::atomic_uint64_t BytesReceived = 0; Stopwatch Timer; // The subcommand API does not receive argv, so look the zen executable path // up from the current process to spawn child workers. const std::filesystem::path SelfExePath = GetRunningExecutablePath(); if (m_ProcessCount > 1) { std::vector> WorkerProcesses; WorkerProcesses.resize(m_ProcessCount); ProcessMonitor Monitor; for (int ProcessIndex = 0; ProcessIndex < m_ProcessCount; ++ProcessIndex) { std::string CommandLine = fmt::format("{} cache replay --hosturl {} --path \"{}\" --offset {} --stride {} --numthreads {} --numproc {}"sv, SelfExePath.string(), m_HostName, m_RecordingPath, m_Stride == 1 ? 0 : m_Offset + ProcessIndex, m_Stride, m_ThreadCount, 1); CreateProcResult Result(CreateProc(SelfExePath, CommandLine)); WorkerProcesses[ProcessIndex] = std::make_unique(); WorkerProcesses[ProcessIndex]->Initialize(Result); Monitor.AddPid(WorkerProcesses[ProcessIndex]->Pid()); } while (Monitor.IsRunning()) { ZEN_CONSOLE("Waiting for worker processes..."); Sleep(1000); } return; } else { std::map MethodTypes; RwLock MethodTypesLock; WorkerThreadPool WorkerPool(m_ThreadCount); Latch WorkLatch(m_ThreadCount); for (int WorkerIndex = 0; WorkerIndex < m_ThreadCount; ++WorkerIndex) { WorkerPool.ScheduleWork( [this, &WorkLatch, EntryCount, &EntryOffset, &Replayer, &BytesSent, &BytesReceived, &MethodTypes, &MethodTypesLock]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); std::map LocalMethodTypes; auto ReduceTypes = MakeGuard([&] { RwLock::ExclusiveLockScope __(MethodTypesLock); for (auto& Entry : LocalMethodTypes) { MethodTypes[Entry.first] += Entry.second; } }); HttpClient Http = CacheCommand::CreateHttpClient(m_HostName); uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride); while (EntryIndex < EntryCount) { IoBuffer Payload; const zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload); if (RequestInfo != zen::cache::RecordedRequestInfo::NullRequest) { CbPackage RequestPackage; CbObject Request; switch (RequestInfo.ContentType) { case ZenContentType::kCbPackage: { if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage)) { Request = RequestPackage.GetObject(); } } break; case ZenContentType::kCbObject: { Request = LoadCompactBinaryObject(Payload); } break; } RpcAcceptOptions OriginalAcceptOptions = static_cast(Request["AcceptFlags"sv].AsUInt16(0u)); int OriginalProcessPid = Request["Pid"sv].AsInt32(0); int AdjustedPid = 0; RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone; if (!m_DisableLocalRefs) { if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || m_ForceAllowLocalRefs) { AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences; if (!m_DisablePartialLocalRefs) { if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) || m_ForceAllowPartialLocalRefs) { AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences; } } if (!m_DisableLocalHandleRefs) { if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef) { AdjustedPid = GetCurrentProcessId(); } } } } if (m_ShowMethodStats) { std::string MethodName = std::string(Request["Method"sv].AsString()); if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end()) { It->second++; } else { LocalMethodTypes[MethodName] = 1; } } if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid) { CbObjectWriter RequestCopyWriter; for (const CbFieldView& Field : Request) { if (!Field.HasName()) { RequestCopyWriter.AddField(Field); continue; } std::string_view FieldName = Field.GetName(); if (FieldName == "Pid"sv) { continue; } if (FieldName == "AcceptFlags"sv) { continue; } RequestCopyWriter.AddField(FieldName, Field); } if (AdjustedPid != 0) { RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid); } if (AdjustedAcceptOptions != RpcAcceptOptions::kNone) { RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast(AdjustedAcceptOptions)); } if (RequestInfo.ContentType == ZenContentType::kCbPackage) { RequestPackage.SetObject(RequestCopyWriter.Save()); std::vector Buffers = FormatPackageMessage(RequestPackage); std::vector SharedBuffers(Buffers.begin(), Buffers.end()); Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer(); } else { RequestCopyWriter.Finalize(); Payload = IoBuffer(RequestCopyWriter.GetSaveSize()); RequestCopyWriter.Save(Payload.GetMutableView()); } } if (!m_DryRun) { Http.SetSessionId(RequestInfo.SessionId); Payload.SetContentType(RequestInfo.ContentType); HttpClient::Response Response = Http.Post("/z$/$rpc", Payload, {HttpClient::Accept(RequestInfo.AcceptType)}); BytesSent.fetch_add(Payload.GetSize()); if (!Response) { ZEN_CONSOLE_ERROR("{}", Response); break; } BytesReceived.fetch_add(Response.DownloadedBytes); } } EntryIndex = EntryOffset.fetch_add(m_Stride); } }, WorkerThreadPool::EMode::EnableBacklog); } while (!WorkLatch.Wait(1000)) { // EntryCount > m_Offset is guaranteed by the early-return above. // EntryOffset atomically overshoots EntryCount (fetch_add past the // end) when the workload finishes, so clamp before subtracting. const uint64_t RequestsTotal = (EntryCount - m_Offset) / m_Stride; const uint64_t CurrentOffset = EntryOffset.load(); const uint64_t RequestsRemaining = CurrentOffset < EntryCount ? (EntryCount - CurrentOffset) / m_Stride : 0; const uint64_t PercentDone = RequestsTotal > 0 ? (RequestsTotal - RequestsRemaining) * 100 / RequestsTotal : 100; ZEN_CONSOLE("[{:3}%] [{}] {} requests, {} remaining (sent {}, received {})", PercentDone, NiceTimeSpanMs(Timer.GetElapsedTimeMs()), RequestsTotal, RequestsRemaining, NiceBytes(BytesSent.load()), NiceBytes(BytesReceived.load())); } if (m_ShowMethodStats) { for (const auto& It : MethodTypes) { ZEN_CONSOLE("{:18}: {:10}", It.first, It.second); } } } const uint64_t RequestsSent = (EntryOffset.load() - m_Offset) / m_Stride; const uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); const uint64_t Sent = BytesSent.load(); const uint64_t Received = BytesReceived.load(); ZEN_CONSOLE("Processed requests: {} ({}), payloads sent {} ({}), payloads received {} ({}) in {}.\nTotal runtime: {}", RequestsSent, NiceRate(RequestsSent, ElapsedMS, "req"), NiceBytes(Sent), NiceByteRate(Sent, ElapsedMS), NiceBytes(Received), NiceByteRate(Received, ElapsedMS), NiceTimeSpanMs(ElapsedMS), NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); } } // namespace zen