diff options
| author | Dan Engelbrecht <[email protected]> | 2024-09-18 10:30:29 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-09-18 10:30:29 +0200 |
| commit | f8191d5fba3bfbc93b33cf47a3b16853219c105a (patch) | |
| tree | e1bd4ef561ca76f3d42faa03aff60f05aec71881 /src | |
| parent | Merge pull request #161 from ue-foundation/mr/sym-not-loading (diff) | |
| download | zen-f8191d5fba3bfbc93b33cf47a3b16853219c105a.tar.xz zen-f8191d5fba3bfbc93b33cf47a3b16853219c105a.zip | |
cache generate command to create large data sets for testing (#159)
* add CacheGenerateCommand
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/cache_cmd.cpp | 211 | ||||
| -rw-r--r-- | src/zen/cmds/cache_cmd.h | 24 | ||||
| -rw-r--r-- | src/zen/zen.cpp | 2 |
3 files changed, 236 insertions, 1 deletions
diff --git a/src/zen/cmds/cache_cmd.cpp b/src/zen/cmds/cache_cmd.cpp index 823f10f1c..eecc3494e 100644 --- a/src/zen/cmds/cache_cmd.cpp +++ b/src/zen/cmds/cache_cmd.cpp @@ -2,13 +2,20 @@ #include "cache_cmd.h" +#include <zencore/compress.h> #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/thread.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpclient.h> #include <zenhttp/httpcommon.h> -#include <zenutil/zenserverprocess.h> +#include <zenutil/cache/cacherequests.h> +#include <zenutil/packageformat.h> #include <memory> +#include <random> ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> @@ -16,6 +23,41 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +namespace { + IoBuffer CreateRandomBlob(uint64_t Size) + { + static uint64_t Seed{0x7CEBF54E45B9F5D1}; + auto Next = [](uint64_t& seed) { + uint64_t z = (seed += UINT64_C(0x9E3779B97F4A7C15)); + z = (z ^ (z >> 30)) * UINT64_C(0xBF58476D1CE4E5B9); + z = (z ^ (z >> 27)) * UINT64_C(0x94D049BB133111EB); + return z ^ (z >> 31); + }; + + IoBuffer Data(Size); + uint64_t* DataPtr = reinterpret_cast<uint64_t*>(Data.MutableData()); + while (Size > sizeof(uint64_t)) + { + *DataPtr++ = Next(Seed); + Size -= sizeof(uint64_t); + } + uint64_t ByteNext = Next(Seed); + uint8_t* ByteDataPtr = reinterpret_cast<uint8_t*>(DataPtr); + while (Size > 0) + { + *ByteDataPtr++ = static_cast<uint8_t>(ByteNext & 0xff); + ByteNext >>= 8; + Size--; + } + return Data; + }; + + CompressedBuffer CompressBlob(IoBuffer&& Buffer) + { + return CompressedBuffer::Compress(SharedBuffer(Buffer), OodleCompressor::Mermaid, OodleCompressionLevel::SuperFast); + } +} // namespace + DropCommand::DropCommand() { m_Options.add_options()("h,help", "Print help"); @@ -305,4 +347,171 @@ CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar return 1; } +CacheGenerateCommand::CacheGenerateCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options + .add_option("", "n", "namespace", "Namespace to generate cache values/records for", cxxopts::value(m_Namespace), "<namespace>"); + m_Options.add_option("", "b", "bucket", "Bucket name to generate cache values/records for", cxxopts::value(m_Bucket), "<bucket>"); + m_Options.add_option("", "", "count", "Number of cache values/records to generate", cxxopts::value(m_Count), "<count>"); + m_Options.add_option("", "", "min-size", "Minimum size of cache value/attachments", cxxopts::value(m_MinSize), "<min>"); + m_Options.add_option("", "", "max-size", "Maximum size of cache value/attachments", cxxopts::value(m_MaxSize), "<max>"); + m_Options.add_option("", + "", + "min-attachments", + "Minimum number of attachments when creating record based values", + cxxopts::value(m_MinAttachmentCount), + "<minattachments>"); + m_Options.add_option("", + "", + "max-attachments", + "Minimum number of attachments when creating record based values, 0 to only create cache values", + cxxopts::value(m_MaxAttachmentCount), + "<maxattachments>"); + m_Options.parse_positional({"namespace", "bucket", "count"}); + m_Options.positional_help("namespace bucket count"); +} + +CacheGenerateCommand::~CacheGenerateCommand() = default; + +int +CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw zen::OptionParseException("unable to resolve server specification"); + } + + if (m_MaxSize == 0 && m_MinSize == 0) + { + m_MinSize = 17; + if (m_MaxAttachmentCount == 0) + { + // For cache values this max size will result in about 0.5% of values being saved as loose file in a cache bucket + m_MaxSize = 65u * 1024u; + } + else + { + // For cache records this max size will result in about 0.5% of attachments begin saved as loose files in cas + m_MaxSize = 768u * 1024u; + } + } + + std::vector<std::uniform_int_distribution<uint64_t>> Variations; + std::vector<size_t> SizeRanges; + SizeRanges.push_back(m_MinSize); + Variations.push_back(std::uniform_int_distribution<uint64_t>(0, m_MinSize - 1)); + while (SizeRanges.back() < m_MaxSize) + { + SizeRanges.push_back(SizeRanges.back() * 2); + Variations.push_back(std::uniform_int_distribution<uint64_t>(0, SizeRanges.back() - 1)); + } + if (SizeRanges.back() > m_MaxSize) + { + SizeRanges.back() = m_MaxSize; + Variations.push_back(std::uniform_int_distribution<uint64_t>(0, m_MaxSize - 1)); + } + + std::random_device RandomDevice; + std::mt19937 Generator(RandomDevice()); + std::uniform_int_distribution<uint64_t> SizeRangeDistribution(0, SizeRanges.size() - 1); + + std::vector<uint64_t> Sizes; + Sizes.reserve(m_Count); + for (uint64_t n = 0; n != m_Count; ++n) + { + uint64_t Range = SizeRangeDistribution(Generator); + uint64_t Size = SizeRanges[Range]; + uint64_t Variation = Variations[Range](Generator); + Sizes.push_back(Size + Variation); + } + + std::uniform_int_distribution<uint64_t> KeyDistribution; + + HttpClient Http(m_HostName); + + auto GeneratePutCacheValueRequest([this, &KeyDistribution, &Generator](std::span<std::uint64_t> BatchSizes, uint64_t RequestIndex) { + cacherequests::PutCacheValuesRequest Request({.AcceptMagic = kCbPkgMagic, .Namespace = m_Namespace}); + for (std::uint64_t ValueSize : BatchSizes) + { + uint64_t KeyBase = KeyDistribution(Generator); + std::string KeyString = fmt::format("{}-{}-{}", RequestIndex, KeyBase, ValueSize); + IoHash ValueKey = IoHash::HashBuffer(KeyString.c_str(), KeyString.length()); + + Request.Requests.emplace_back(cacherequests::PutCacheValueRequest{.Key = {.Bucket = m_Bucket, .Hash = ValueKey}, + .Body = CompressBlob(CreateRandomBlob(ValueSize))}); + } + return Request; + }); + + auto GeneratePutCacheRecordRequest([this, &KeyDistribution, &Generator](std::span<std::uint64_t> BatchSizes, uint64_t RequestIndex) { + cacherequests::PutCacheRecordsRequest Request({.AcceptMagic = kCbPkgMagic, .Namespace = m_Namespace}); + uint64_t KeyBase = KeyDistribution(Generator); + std::string RecordKeyString = fmt::format("{}-{}-{}", RequestIndex, KeyBase, BatchSizes.size()); + IoHash RecordKey = IoHash::HashBuffer(RecordKeyString.c_str(), RecordKeyString.length()); + + Request.Requests.emplace_back(cacherequests::PutCacheRecordRequest{.Key = {.Bucket = m_Bucket, .Hash = RecordKey}}); + for (std::uint64_t ValueSize : BatchSizes) + { + Request.Requests.back().Values.push_back({.Id = Oid::NewOid(), .Body = CompressBlob(CreateRandomBlob(ValueSize))}); + } + return Request; + }); + + WorkerThreadPool WorkerPool(gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 2u), 2u))); + Latch WorkLatch(1); + + std::uniform_int_distribution<uint32_t> SizeCountDistribution(m_MaxAttachmentCount > 0 ? 0 : 1, + m_MaxAttachmentCount > 0 ? m_MaxAttachmentCount : 8); + + std::size_t Offset = 0; + uint64_t RequestIndex = 0; + while (Offset < Sizes.size()) + { + size_t SizeCount = SizeCountDistribution(Generator); + std::span<uint64_t> BatchSizes = std::span<uint64_t>(Sizes).subspan(Offset, Min(Max(SizeCount, 1u), Sizes.size() - Offset)); + + WorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&, BatchSizes, RequestIndex]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + CbPackage Package; + if (m_MaxAttachmentCount > 0 && SizeCount > 0) + { + auto Request = GeneratePutCacheRecordRequest(BatchSizes, RequestIndex); + ZEN_ASSERT(Request.Format(Package)); + } + else + { + auto Request = GeneratePutCacheValueRequest(BatchSizes, RequestIndex); + ZEN_ASSERT(Request.Format(Package)); + } + + if (HttpClient::Response Response = Http.Post("/z$/$rpc", Package, HttpClient::Accept(ZenContentType::kCbPackage)); !Response) + { + ZEN_CONSOLE("{}", Response.ErrorMessage(fmt::format("{}: ", RequestIndex))); + } + }); + Offset += BatchSizes.size(); + RequestIndex++; + } + + WorkLatch.CountDown(); + while (!WorkLatch.Wait(1000)) + { + ZEN_INFO("Creating data, {} requests remaining", WorkLatch.Remaining()); + } + + return 0; +} + } // namespace zen diff --git a/src/zen/cmds/cache_cmd.h b/src/zen/cmds/cache_cmd.h index 80079c452..b8181d5a2 100644 --- a/src/zen/cmds/cache_cmd.h +++ b/src/zen/cmds/cache_cmd.h @@ -69,4 +69,28 @@ private: std::string m_ValueKey; }; +class CacheGenerateCommand : public CacheStoreCommand +{ +public: + static constexpr char Name[] = "cache-gen"; + static constexpr char Description[] = "Generates cache values into a bucket"; + + CacheGenerateCommand(); + ~CacheGenerateCommand(); + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + +private: + cxxopts::Options m_Options{Name, Description}; + std::string m_HostName; + std::string m_Namespace; + std::string m_Bucket; + uint64_t m_Count = 1; + + uint64_t m_MinSize = 0; + uint64_t m_MaxSize = 0; + uint32_t m_MinAttachmentCount = 0; + uint32_t m_MaxAttachmentCount = 0; +}; + } // namespace zen diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 0f21de130..79338f6f3 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -290,6 +290,7 @@ main(int argc, char** argv) AttachCommand AttachCmd; BenchCommand BenchCmd; CacheDetailsCommand CacheDetailsCmd; + CacheGenerateCommand CacheGenerateCmd; CacheInfoCommand CacheInfoCmd; CacheStatsCommand CacheStatsCmd; CopyCommand CopyCmd; @@ -343,6 +344,7 @@ main(int argc, char** argv) {"bench", &BenchCmd, "Utility command for benchmarking"}, {"cache-details", &CacheDetailsCmd, "Details on cache"}, {"cache-info", &CacheInfoCmd, "Info on cache, namespace or bucket"}, + {CacheGenerateCommand::Name, &CacheGenerateCmd, CacheGenerateCommand::Description}, {"cache-stats", &CacheStatsCmd, "Stats on cache"}, {"copy", &CopyCmd, "Copy file(s)"}, {"copy-state", &CopyStateCmd, "Copy zen server disk state"}, |