// Copyright Epic Games, Inc. All Rights Reserved. #include "cache_cmd.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace zen { namespace { IoBuffer CreateRandomBlob(uint64_t Size) { static uint64_t Seed{0x7CEBF54E45B9F5D1}; auto Next = [](uint64_t& seed) { uint64_t z = (seed += UINT64_C(0x9E3779B97F4A7C15)); z = (z ^ (z >> 30)) * UINT64_C(0xBF58476D1CE4E5B9); z = (z ^ (z >> 27)) * UINT64_C(0x94D049BB133111EB); return z ^ (z >> 31); }; IoBuffer Data(Size); uint64_t* DataPtr = reinterpret_cast(Data.MutableData()); while (Size > sizeof(uint64_t)) { *DataPtr++ = Next(Seed); Size -= sizeof(uint64_t); } uint64_t ByteNext = Next(Seed); uint8_t* ByteDataPtr = reinterpret_cast(DataPtr); while (Size > 0) { *ByteDataPtr++ = static_cast(ByteNext & 0xff); ByteNext >>= 8; Size--; } return Data; }; CompressedBuffer CompressBlob(IoBuffer&& Buffer) { return CompressedBuffer::Compress(SharedBuffer(Buffer), OodleCompressor::Mermaid, OodleCompressionLevel::SuperFast); } } // namespace DropCommand::DropCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), ""); m_Options.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), ""); m_Options.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), ""); m_Options.parse_positional({"namespace", "bucket"}); } DropCommand::~DropCommand() = default; void DropCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ZEN_UNUSED(GlobalOptions); if (!ParseOptions(argc, argv)) { return; } m_HostName = ResolveTargetHostSpec(m_HostName); if (m_HostName.empty()) { throw OptionParseException("Unable to resolve server specification", m_Options.help()); } if (m_NamespaceName.empty()) { throw OptionParseException("'--namespace' is required", m_Options.help()); } std::string Url; std::string DropDescription; if (m_BucketName.empty()) { DropDescription = fmt::format("cache namespace '{}' from '{}'", m_NamespaceName, m_HostName); Url = fmt::format("/z$/{}", m_NamespaceName); } else { DropDescription = fmt::format("cache bucket '{}/{}' from '{}'", m_NamespaceName, m_BucketName, m_HostName); Url = fmt::format("/z$/{}/{}", m_NamespaceName, m_BucketName); } ZEN_CONSOLE("Dropping {}", DropDescription); HttpClient Http = CreateHttpClient(m_HostName); if (HttpClient::Response Response = Http.Delete(Url)) { ZEN_CONSOLE("{}", Response.ToText()); } else { Response.ThrowError(fmt::format("Failed to drop {}", DropDescription)); } } CacheInfoCommand::CacheInfoCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), ""); m_Options.add_option("", "n", "namespace", "Namespace name", cxxopts::value(m_NamespaceName), ""); m_Options.add_option("", "", "bucketsizes", "Comma delimited list of bucket names to get size info from, * to get info on all buckets", cxxopts::value(m_SizeInfoBucketNames), ""); m_Options.add_option("", "b", "bucket", "Bucket name", cxxopts::value(m_BucketName), ""); m_Options.add_option("", "", "bucketsize", "Show detailed bucket size info", cxxopts::value(m_BucketSizeInfo), ""); m_Options.parse_positional({"namespace", "bucket"}); } CacheInfoCommand::~CacheInfoCommand() = default; void CacheInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ZEN_UNUSED(GlobalOptions); if (!ParseOptions(argc, argv)) { return; } m_HostName = ResolveTargetHostSpec(m_HostName); if (m_HostName.empty()) { throw OptionParseException("Unable to resolve server specification", m_Options.help()); } std::string Url; if (m_HostName.empty()) { if (!m_SizeInfoBucketNames.empty()) { throw OptionParseException("'--bucketsizes' requires '--namespace'", m_Options.help()); } if (m_BucketSizeInfo) { throw OptionParseException("'--bucketsize' requires '--namespace' and '--bucket'", m_Options.help()); } ZEN_CONSOLE("Info on cache from '{}'", m_HostName); Url = "/z$"; } else if (m_BucketName.empty()) { if (m_BucketSizeInfo) { throw OptionParseException(fmt::format("'--bucketsize' requires '--namespace' and '--bucket' ('{}')", m_BucketName), m_Options.help()); } ZEN_CONSOLE("Info on cache namespace '{}' from '{}'", m_NamespaceName, m_HostName); Url = fmt::format("/z$/{}", m_NamespaceName); } else { if (!m_SizeInfoBucketNames.empty()) { throw OptionParseException("'--bucketsizes' conflicts with '--bucket'", m_Options.help()); } ZEN_CONSOLE("Info on cache bucket '{}/{}' from '{}'", m_NamespaceName, m_BucketName, m_HostName); Url = fmt::format("/z$/{}/{}", m_NamespaceName, m_BucketName); } HttpClient::KeyValueMap Parameters; if (!m_SizeInfoBucketNames.empty()) { Parameters.Entries.insert({"bucketsizes", m_SizeInfoBucketNames}); } if (m_BucketSizeInfo) { Parameters.Entries.insert({"bucketsize", "true"}); } HttpClient Http = CreateHttpClient(m_HostName); if (HttpClient::Response Response = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON), Parameters)) { ZEN_CONSOLE("{}", Response.ToText()); } else { Response.ThrowError("Info failed"); } } CacheStatsCommand::CacheStatsCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), ""); } CacheStatsCommand::~CacheStatsCommand() = default; void CacheStatsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ZEN_UNUSED(GlobalOptions); if (!ParseOptions(argc, argv)) { return; } m_HostName = ResolveTargetHostSpec(m_HostName); if (m_HostName.empty()) { throw OptionParseException("Unable to resolve server specification", m_Options.help()); } HttpClient Http = CreateHttpClient(m_HostName); if (HttpClient::Response Response = Http.Get("/stats/z$", HttpClient::Accept(ZenContentType::kJSON))) { ZEN_CONSOLE("{}", Response.ToText()); } else { Response.ThrowError("Info failed"); } } CacheDetailsCommand::CacheDetailsCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), ""); m_Options.add_option("", "c", "csv", "Info on csv format", cxxopts::value(m_CSV), ""); m_Options.add_option("", "d", "details", "Get detailed information about records", cxxopts::value(m_Details), "
"); m_Options.add_option("", "a", "attachmentdetails", "Get detailed information about attachments", cxxopts::value(m_AttachmentDetails), ""); m_Options.add_option("", "n", "namespace", "Namespace name to get info for", cxxopts::value(m_Namespace), ""); m_Options.add_option("", "b", "bucket", "Filter on bucket name", cxxopts::value(m_Bucket), ""); m_Options.add_option("", "v", "valuekey", "Filter on value key hash string", cxxopts::value(m_ValueKey), ""); } CacheDetailsCommand::~CacheDetailsCommand() = default; void CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ZEN_UNUSED(GlobalOptions); if (!ParseOptions(argc, argv)) { return; } m_HostName = ResolveTargetHostSpec(m_HostName); if (m_HostName.empty()) { throw OptionParseException("Unable to resolve server specification", m_Options.help()); } HttpClient::KeyValueMap Parameters; if (m_Details) { Parameters.Entries.insert({"details", "true"}); } if (m_AttachmentDetails) { Parameters.Entries.insert({"attachmentdetails", "true"}); } HttpClient::KeyValueMap Headers; if (m_CSV) { Parameters.Entries.insert({"csv", "true"}); } else { Headers = HttpClient::Accept(ZenContentType::kJSON); } std::string Url; if (!m_ValueKey.empty()) { if (m_Namespace.empty()) { throw OptionParseException("'--namespace' is required", m_Options.help()); } if (m_Bucket.empty()) { throw OptionParseException("'--bucket' is required", m_Options.help()); } Url = fmt::format("/z$/details$/{}/{}/{}", m_Namespace, m_Bucket, m_ValueKey); } else if (!m_Bucket.empty()) { if (m_Namespace.empty()) { throw OptionParseException("'--namespace' is required", m_Options.help()); } Url = fmt::format("/z$/details$/{}/{}", m_Namespace, m_Bucket); } else if (!m_Namespace.empty()) { Url = fmt::format("/z$/details$/{}", m_Namespace); } else { Url = "/z$/details$"; } HttpClient Http = CreateHttpClient(m_HostName); if (HttpClient::Response Response = Http.Get(Url, Headers, Parameters)) { ZEN_CONSOLE("{}", Response.ToText()); } else { Response.ThrowError("Info failed"); } } CacheGenerateCommand::CacheGenerateCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), ""); m_Options .add_option("", "n", "namespace", "Namespace to generate cache values/records for", cxxopts::value(m_Namespace), ""); m_Options.add_option("", "b", "bucket", "Bucket name to generate cache values/records for", cxxopts::value(m_Bucket), ""); m_Options.add_option("", "", "count", "Number of cache values/records to generate", cxxopts::value(m_Count), ""); m_Options.add_option("", "", "min-size", "Minimum size of cache value/attachments", cxxopts::value(m_MinSize), ""); m_Options.add_option("", "", "max-size", "Maximum size of cache value/attachments", cxxopts::value(m_MaxSize), ""); m_Options.add_option("", "", "min-attachments", "Minimum number of attachments when creating record based values", cxxopts::value(m_MinAttachmentCount), ""); m_Options.add_option("", "", "max-attachments", "Minimum number of attachments when creating record based values, 0 to only create cache values", cxxopts::value(m_MaxAttachmentCount), ""); m_Options.parse_positional({"namespace", "bucket", "count"}); m_Options.positional_help("namespace bucket count"); } CacheGenerateCommand::~CacheGenerateCommand() = default; void CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ZEN_UNUSED(GlobalOptions); if (!ParseOptions(argc, argv)) { return; } m_HostName = ResolveTargetHostSpec(m_HostName); if (m_HostName.empty()) { throw OptionParseException("Unable to resolve server specification", m_Options.help()); } if (m_MaxSize == 0 && m_MinSize == 0) { m_MinSize = 17; if (m_MaxAttachmentCount == 0) { // For cache values this max size will result in about 0.5% of values being saved as loose file in a cache bucket m_MaxSize = 65u * 1024u; } else { // For cache records this max size will result in about 0.5% of attachments begin saved as loose files in cas m_MaxSize = 768u * 1024u; } } std::vector> Variations; std::vector SizeRanges; SizeRanges.push_back(m_MinSize); Variations.push_back(std::uniform_int_distribution(0, m_MinSize - 1)); while (SizeRanges.back() < m_MaxSize) { SizeRanges.push_back(SizeRanges.back() * 2); Variations.push_back(std::uniform_int_distribution(0, SizeRanges.back() - 1)); } if (SizeRanges.back() > m_MaxSize) { SizeRanges.back() = m_MaxSize; Variations.push_back(std::uniform_int_distribution(0, m_MaxSize - 1)); } std::random_device RandomDevice; std::mt19937 Generator(RandomDevice()); std::uniform_int_distribution SizeRangeDistribution(0, SizeRanges.size() - 1); std::vector Sizes; Sizes.reserve(m_Count); for (uint64_t n = 0; n != m_Count; ++n) { uint64_t Range = SizeRangeDistribution(Generator); uint64_t Size = SizeRanges[Range]; uint64_t Variation = Variations[Range](Generator); Sizes.push_back(Size + Variation); } std::uniform_int_distribution KeyDistribution; HttpClient Http = CreateHttpClient(m_HostName); auto GeneratePutCacheValueRequest( [this, &KeyDistribution, &Generator](std::span BatchSizes, uint64_t RequestIndex) -> CbPackage { CbPackage Package; CbObjectWriter Writer; Writer << "Method" << "PutCacheValues"; Writer << "Accept" << kCbPkgMagic; Writer.BeginObject("Params"); { Writer << "DefaultPolicy" << WriteToString<128>(CachePolicy::Default); Writer << "Namespace" << m_Namespace; Writer.BeginArray("Requests"); for (std::uint64_t ValueSize : BatchSizes) { Writer.BeginObject(); { uint64_t KeyBase = KeyDistribution(Generator); std::string KeyString = fmt::format("{}-{}-{}", RequestIndex, KeyBase, ValueSize); IoHash ValueKey = IoHash::HashBuffer(KeyString.c_str(), KeyString.length()); Writer.BeginObject("Key"); { Writer << "Bucket" << m_Bucket; Writer << "Hash" << ValueKey; } Writer.EndObject(); // Key CompressedBuffer Payload = CompressBlob(CreateRandomBlob(ValueSize)); Writer.AddBinaryAttachment("RawHash", Payload.DecodeRawHash()); Package.AddAttachment(CbAttachment(Payload, Payload.DecodeRawHash())); } Writer.EndObject(); } Writer.EndArray(); // Requests } Writer.EndObject(); // Params Package.SetObject(Writer.Save()); return Package; }); auto GeneratePutCacheRecordRequest([this, &KeyDistribution, &Generator](std::span BatchSizes, uint64_t RequestIndex) { CbPackage Package; CbObjectWriter Writer; Writer << "Method" << "PutCacheRecords"; Writer << "Accept" << kCbPkgMagic; Writer.BeginObject("Params"); { Writer << "DefaultPolicy" << WriteToString<128>(CachePolicy::Default); Writer << "Namespace" << m_Namespace; Writer.BeginArray("Requests"); { Writer.BeginObject(); { Writer.BeginObject("Record"); { uint64_t KeyBase = KeyDistribution(Generator); std::string RecordKeyString = fmt::format("{}-{}-{}", RequestIndex, KeyBase, BatchSizes.size()); IoHash RecordKey = IoHash::HashBuffer(RecordKeyString.c_str(), RecordKeyString.length()); Writer.BeginObject("Key"); { Writer << "Bucket" << m_Bucket; Writer << "Hash" << RecordKey; } Writer.EndObject(); // Key Writer.BeginArray("Values"); for (std::uint64_t ValueSize : BatchSizes) { Writer.BeginObject(); { Writer.AddObjectId("Id", Oid::NewOid()); CompressedBuffer Payload = CompressBlob(CreateRandomBlob(ValueSize)); Writer.AddBinaryAttachment("RawHash", Payload.DecodeRawHash()); Package.AddAttachment(CbAttachment(Payload, Payload.DecodeRawHash())); Writer.AddInteger("RawSize", Payload.DecodeRawSize()); } Writer.EndObject(); } Writer.EndArray(); // Values } Writer.EndObject(); // Record } Writer.EndObject(); } Writer.EndArray(); // Requests } Writer.EndObject(); // Params Package.SetObject(Writer.Save()); return Package; }); WorkerThreadPool WorkerPool(gsl::narrow(Max((GetHardwareConcurrency() / 2u), 2u))); Latch WorkLatch(1); std::uniform_int_distribution SizeCountDistribution(m_MaxAttachmentCount > 0 ? 0 : 1, m_MaxAttachmentCount > 0 ? m_MaxAttachmentCount : 8); std::size_t Offset = 0; uint64_t RequestIndex = 0; while (Offset < Sizes.size()) { size_t SizeCount = SizeCountDistribution(Generator); std::span BatchSizes = std::span(Sizes).subspan(Offset, Min(Max(SizeCount, 1u), Sizes.size() - Offset)); WorkLatch.AddCount(1); WorkerPool.ScheduleWork( [&, BatchSizes, RequestIndex]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); CbPackage Package; if (m_MaxAttachmentCount > 0 && SizeCount > 0) { Package = GeneratePutCacheRecordRequest(BatchSizes, RequestIndex); } else { Package = GeneratePutCacheValueRequest(BatchSizes, RequestIndex); } if (HttpClient::Response Response = Http.Post("/z$/$rpc", Package, HttpClient::Accept(ZenContentType::kCbPackage)); !Response) { ZEN_CONSOLE("{}", Response.ErrorMessage(fmt::format("{}: ", RequestIndex))); } }, WorkerThreadPool::EMode::EnableBacklog); Offset += BatchSizes.size(); RequestIndex++; } WorkLatch.CountDown(); while (!WorkLatch.Wait(1000)) { ZEN_INFO("Creating data, {} requests remaining", WorkLatch.Remaining()); } } CacheGetCommand::CacheGetCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), ""); m_Options .add_option("", "n", "namespace", "Namespace to generate cache values/records for", cxxopts::value(m_Namespace), ""); m_Options.add_option("", "b", "bucket", "Bucket name to generate cache values/records for", cxxopts::value(m_Bucket), ""); m_Options.add_option("", "v", "valuekey", "Cache entry iohash id", cxxopts::value(m_ValueKey), ""); m_Options.add_option("", "a", "attachmenthash", "For a cache entry record, get a particular attachment based on the 'RawHash'", cxxopts::value(m_AttachmentHash), ""); m_Options.add_option("", "o", "output-path", "File path for output data", cxxopts::value(m_OutputPath), ""); m_Options.add_option("", "t", "text", "Ouput content of cache entry record as text", cxxopts::value(m_AsText), ""); m_Options .add_option("", "d", "decompress", "Decompress data when applicable. Default = true", cxxopts::value(m_Decompress), ""); m_Options.parse_positional({"namespace", "bucket", "valuekey", "attachmenthash"}); m_Options.positional_help("namespace bucket valuekey attachmenthash"); } CacheGetCommand::~CacheGetCommand() = default; void CacheGetCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ZEN_UNUSED(GlobalOptions); using namespace std::literals; if (!ParseOptions(argc, argv)) { return; } m_HostName = ResolveTargetHostSpec(m_HostName); if (m_HostName.empty()) { throw OptionParseException("Unable to resolve server specification", m_Options.help()); } if (m_Namespace.empty()) { throw OptionParseException("'--namespace' is required", m_Options.help()); } if (m_Bucket.empty()) { throw OptionParseException("'--bucket' is required", m_Options.help()); } if (m_ValueKey.empty()) { throw OptionParseException("'--valuekey' is required", m_Options.help()); } IoHash ValueId; if (!IoHash::TryParse(m_ValueKey, ValueId)) { throw OptionParseException(fmt::format("'--value-key' ('{}') is malformed", m_ValueKey), m_Options.help()); } IoHash AttachmentHash; if (!m_AttachmentHash.empty()) { if (!IoHash::TryParse(m_AttachmentHash, AttachmentHash)) { throw OptionParseException(fmt::format("'--attachmenthash' ('{}') is malformed", m_AttachmentHash), m_Options.help()); } } HttpClient Http = CreateHttpClient(m_HostName); if (!m_OutputPath.empty()) { if (IsDir(m_OutputPath)) { m_OutputPath = m_OutputPath / (m_AttachmentHash.empty() ? m_ValueKey : m_AttachmentHash); } else { CreateDirectories(m_OutputPath.parent_path()); } } if (m_OutputPath.empty()) { m_OutputPath = (m_AttachmentHash.empty() ? m_ValueKey : m_AttachmentHash); } std::string Url = fmt::format("/z$/{}/{}/{}", m_Namespace, m_Bucket, m_ValueKey); if (AttachmentHash != IoHash::Zero) { Url = fmt::format("{}/{}", Url, AttachmentHash); } if (HttpClient::Response Result = Http.Download(Url, std::filesystem::temp_directory_path()); Result) { auto TryDecompress = [](const IoBuffer& Buffer) -> IoBuffer { IoHash RawHash; uint64_t RawSize; if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize)) { return Compressed.Decompress().AsIoBuffer(); }; return Buffer; }; IoBuffer ChunkData = m_Decompress ? TryDecompress(Result.ResponsePayload) : Result.ResponsePayload; if (m_AsText) { std::string StringData = Result.ToText(); if (m_OutputPath.empty()) { ZEN_CONSOLE("{}", StringData); } else { WriteFile(m_OutputPath, IoBuffer(IoBuffer::Wrap, StringData.data(), StringData.length())); ZEN_CONSOLE("Wrote {} to '{}' ({})", NiceBytes(StringData.length()), m_OutputPath, ToString(ChunkData.GetContentType())); } } else { if (!MoveToFile(m_OutputPath, ChunkData)) { WriteFile(m_OutputPath, ChunkData); } ZEN_CONSOLE("Wrote {} to '{}' ({})", NiceBytes(ChunkData.GetSize()), m_OutputPath, ToString(ChunkData.GetContentType())); } } else { Result.ThrowError("Failed to fetch data"sv); } } } // namespace zen