aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-09-18 10:30:29 +0200
committerGitHub Enterprise <[email protected]>2024-09-18 10:30:29 +0200
commitf8191d5fba3bfbc93b33cf47a3b16853219c105a (patch)
treee1bd4ef561ca76f3d42faa03aff60f05aec71881 /src
parentMerge pull request #161 from ue-foundation/mr/sym-not-loading (diff)
downloadzen-f8191d5fba3bfbc93b33cf47a3b16853219c105a.tar.xz
zen-f8191d5fba3bfbc93b33cf47a3b16853219c105a.zip
cache generate command to create large data sets for testing (#159)
* add CacheGenerateCommand
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/cache_cmd.cpp211
-rw-r--r--src/zen/cmds/cache_cmd.h24
-rw-r--r--src/zen/zen.cpp2
3 files changed, 236 insertions, 1 deletions
diff --git a/src/zen/cmds/cache_cmd.cpp b/src/zen/cmds/cache_cmd.cpp
index 823f10f1c..eecc3494e 100644
--- a/src/zen/cmds/cache_cmd.cpp
+++ b/src/zen/cmds/cache_cmd.cpp
@@ -2,13 +2,20 @@
#include "cache_cmd.h"
+#include <zencore/compress.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/thread.h>
+#include <zencore/workthreadpool.h>
+#include <zenhttp/httpclient.h>
#include <zenhttp/httpcommon.h>
-#include <zenutil/zenserverprocess.h>
+#include <zenutil/cache/cacherequests.h>
+#include <zenutil/packageformat.h>
#include <memory>
+#include <random>
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
@@ -16,6 +23,41 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
+namespace {
+ IoBuffer CreateRandomBlob(uint64_t Size)
+ {
+ static uint64_t Seed{0x7CEBF54E45B9F5D1};
+ auto Next = [](uint64_t& seed) {
+ uint64_t z = (seed += UINT64_C(0x9E3779B97F4A7C15));
+ z = (z ^ (z >> 30)) * UINT64_C(0xBF58476D1CE4E5B9);
+ z = (z ^ (z >> 27)) * UINT64_C(0x94D049BB133111EB);
+ return z ^ (z >> 31);
+ };
+
+ IoBuffer Data(Size);
+ uint64_t* DataPtr = reinterpret_cast<uint64_t*>(Data.MutableData());
+ while (Size > sizeof(uint64_t))
+ {
+ *DataPtr++ = Next(Seed);
+ Size -= sizeof(uint64_t);
+ }
+ uint64_t ByteNext = Next(Seed);
+ uint8_t* ByteDataPtr = reinterpret_cast<uint8_t*>(DataPtr);
+ while (Size > 0)
+ {
+ *ByteDataPtr++ = static_cast<uint8_t>(ByteNext & 0xff);
+ ByteNext >>= 8;
+ Size--;
+ }
+ return Data;
+ };
+
+ CompressedBuffer CompressBlob(IoBuffer&& Buffer)
+ {
+ return CompressedBuffer::Compress(SharedBuffer(Buffer), OodleCompressor::Mermaid, OodleCompressionLevel::SuperFast);
+ }
+} // namespace
+
DropCommand::DropCommand()
{
m_Options.add_options()("h,help", "Print help");
@@ -305,4 +347,171 @@ CacheDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar
return 1;
}
+CacheGenerateCommand::CacheGenerateCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
+ m_Options
+ .add_option("", "n", "namespace", "Namespace to generate cache values/records for", cxxopts::value(m_Namespace), "<namespace>");
+ m_Options.add_option("", "b", "bucket", "Bucket name to generate cache values/records for", cxxopts::value(m_Bucket), "<bucket>");
+ m_Options.add_option("", "", "count", "Number of cache values/records to generate", cxxopts::value(m_Count), "<count>");
+ m_Options.add_option("", "", "min-size", "Minimum size of cache value/attachments", cxxopts::value(m_MinSize), "<min>");
+ m_Options.add_option("", "", "max-size", "Maximum size of cache value/attachments", cxxopts::value(m_MaxSize), "<max>");
+ m_Options.add_option("",
+ "",
+ "min-attachments",
+ "Minimum number of attachments when creating record based values",
+ cxxopts::value(m_MinAttachmentCount),
+ "<minattachments>");
+ m_Options.add_option("",
+ "",
+ "max-attachments",
+ "Minimum number of attachments when creating record based values, 0 to only create cache values",
+ cxxopts::value(m_MaxAttachmentCount),
+ "<maxattachments>");
+ m_Options.parse_positional({"namespace", "bucket", "count"});
+ m_Options.positional_help("namespace bucket count");
+}
+
+CacheGenerateCommand::~CacheGenerateCommand() = default;
+
+int
+CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ ZEN_UNUSED(GlobalOptions);
+
+ if (!ParseOptions(argc, argv))
+ {
+ return 0;
+ }
+
+ m_HostName = ResolveTargetHostSpec(m_HostName);
+
+ if (m_HostName.empty())
+ {
+ throw zen::OptionParseException("unable to resolve server specification");
+ }
+
+ if (m_MaxSize == 0 && m_MinSize == 0)
+ {
+ m_MinSize = 17;
+ if (m_MaxAttachmentCount == 0)
+ {
+ // For cache values this max size will result in about 0.5% of values being saved as loose file in a cache bucket
+ m_MaxSize = 65u * 1024u;
+ }
+ else
+ {
+ // For cache records this max size will result in about 0.5% of attachments begin saved as loose files in cas
+ m_MaxSize = 768u * 1024u;
+ }
+ }
+
+ std::vector<std::uniform_int_distribution<uint64_t>> Variations;
+ std::vector<size_t> SizeRanges;
+ SizeRanges.push_back(m_MinSize);
+ Variations.push_back(std::uniform_int_distribution<uint64_t>(0, m_MinSize - 1));
+ while (SizeRanges.back() < m_MaxSize)
+ {
+ SizeRanges.push_back(SizeRanges.back() * 2);
+ Variations.push_back(std::uniform_int_distribution<uint64_t>(0, SizeRanges.back() - 1));
+ }
+ if (SizeRanges.back() > m_MaxSize)
+ {
+ SizeRanges.back() = m_MaxSize;
+ Variations.push_back(std::uniform_int_distribution<uint64_t>(0, m_MaxSize - 1));
+ }
+
+ std::random_device RandomDevice;
+ std::mt19937 Generator(RandomDevice());
+ std::uniform_int_distribution<uint64_t> SizeRangeDistribution(0, SizeRanges.size() - 1);
+
+ std::vector<uint64_t> Sizes;
+ Sizes.reserve(m_Count);
+ for (uint64_t n = 0; n != m_Count; ++n)
+ {
+ uint64_t Range = SizeRangeDistribution(Generator);
+ uint64_t Size = SizeRanges[Range];
+ uint64_t Variation = Variations[Range](Generator);
+ Sizes.push_back(Size + Variation);
+ }
+
+ std::uniform_int_distribution<uint64_t> KeyDistribution;
+
+ HttpClient Http(m_HostName);
+
+ auto GeneratePutCacheValueRequest([this, &KeyDistribution, &Generator](std::span<std::uint64_t> BatchSizes, uint64_t RequestIndex) {
+ cacherequests::PutCacheValuesRequest Request({.AcceptMagic = kCbPkgMagic, .Namespace = m_Namespace});
+ for (std::uint64_t ValueSize : BatchSizes)
+ {
+ uint64_t KeyBase = KeyDistribution(Generator);
+ std::string KeyString = fmt::format("{}-{}-{}", RequestIndex, KeyBase, ValueSize);
+ IoHash ValueKey = IoHash::HashBuffer(KeyString.c_str(), KeyString.length());
+
+ Request.Requests.emplace_back(cacherequests::PutCacheValueRequest{.Key = {.Bucket = m_Bucket, .Hash = ValueKey},
+ .Body = CompressBlob(CreateRandomBlob(ValueSize))});
+ }
+ return Request;
+ });
+
+ auto GeneratePutCacheRecordRequest([this, &KeyDistribution, &Generator](std::span<std::uint64_t> BatchSizes, uint64_t RequestIndex) {
+ cacherequests::PutCacheRecordsRequest Request({.AcceptMagic = kCbPkgMagic, .Namespace = m_Namespace});
+ uint64_t KeyBase = KeyDistribution(Generator);
+ std::string RecordKeyString = fmt::format("{}-{}-{}", RequestIndex, KeyBase, BatchSizes.size());
+ IoHash RecordKey = IoHash::HashBuffer(RecordKeyString.c_str(), RecordKeyString.length());
+
+ Request.Requests.emplace_back(cacherequests::PutCacheRecordRequest{.Key = {.Bucket = m_Bucket, .Hash = RecordKey}});
+ for (std::uint64_t ValueSize : BatchSizes)
+ {
+ Request.Requests.back().Values.push_back({.Id = Oid::NewOid(), .Body = CompressBlob(CreateRandomBlob(ValueSize))});
+ }
+ return Request;
+ });
+
+ WorkerThreadPool WorkerPool(gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 2u), 2u)));
+ Latch WorkLatch(1);
+
+ std::uniform_int_distribution<uint32_t> SizeCountDistribution(m_MaxAttachmentCount > 0 ? 0 : 1,
+ m_MaxAttachmentCount > 0 ? m_MaxAttachmentCount : 8);
+
+ std::size_t Offset = 0;
+ uint64_t RequestIndex = 0;
+ while (Offset < Sizes.size())
+ {
+ size_t SizeCount = SizeCountDistribution(Generator);
+ std::span<uint64_t> BatchSizes = std::span<uint64_t>(Sizes).subspan(Offset, Min(Max(SizeCount, 1u), Sizes.size() - Offset));
+
+ WorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&, BatchSizes, RequestIndex]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ CbPackage Package;
+ if (m_MaxAttachmentCount > 0 && SizeCount > 0)
+ {
+ auto Request = GeneratePutCacheRecordRequest(BatchSizes, RequestIndex);
+ ZEN_ASSERT(Request.Format(Package));
+ }
+ else
+ {
+ auto Request = GeneratePutCacheValueRequest(BatchSizes, RequestIndex);
+ ZEN_ASSERT(Request.Format(Package));
+ }
+
+ if (HttpClient::Response Response = Http.Post("/z$/$rpc", Package, HttpClient::Accept(ZenContentType::kCbPackage)); !Response)
+ {
+ ZEN_CONSOLE("{}", Response.ErrorMessage(fmt::format("{}: ", RequestIndex)));
+ }
+ });
+ Offset += BatchSizes.size();
+ RequestIndex++;
+ }
+
+ WorkLatch.CountDown();
+ while (!WorkLatch.Wait(1000))
+ {
+ ZEN_INFO("Creating data, {} requests remaining", WorkLatch.Remaining());
+ }
+
+ return 0;
+}
+
} // namespace zen
diff --git a/src/zen/cmds/cache_cmd.h b/src/zen/cmds/cache_cmd.h
index 80079c452..b8181d5a2 100644
--- a/src/zen/cmds/cache_cmd.h
+++ b/src/zen/cmds/cache_cmd.h
@@ -69,4 +69,28 @@ private:
std::string m_ValueKey;
};
+class CacheGenerateCommand : public CacheStoreCommand
+{
+public:
+ static constexpr char Name[] = "cache-gen";
+ static constexpr char Description[] = "Generates cache values into a bucket";
+
+ CacheGenerateCommand();
+ ~CacheGenerateCommand();
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options& Options() override { return m_Options; }
+
+private:
+ cxxopts::Options m_Options{Name, Description};
+ std::string m_HostName;
+ std::string m_Namespace;
+ std::string m_Bucket;
+ uint64_t m_Count = 1;
+
+ uint64_t m_MinSize = 0;
+ uint64_t m_MaxSize = 0;
+ uint32_t m_MinAttachmentCount = 0;
+ uint32_t m_MaxAttachmentCount = 0;
+};
+
} // namespace zen
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index 0f21de130..79338f6f3 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -290,6 +290,7 @@ main(int argc, char** argv)
AttachCommand AttachCmd;
BenchCommand BenchCmd;
CacheDetailsCommand CacheDetailsCmd;
+ CacheGenerateCommand CacheGenerateCmd;
CacheInfoCommand CacheInfoCmd;
CacheStatsCommand CacheStatsCmd;
CopyCommand CopyCmd;
@@ -343,6 +344,7 @@ main(int argc, char** argv)
{"bench", &BenchCmd, "Utility command for benchmarking"},
{"cache-details", &CacheDetailsCmd, "Details on cache"},
{"cache-info", &CacheInfoCmd, "Info on cache, namespace or bucket"},
+ {CacheGenerateCommand::Name, &CacheGenerateCmd, CacheGenerateCommand::Description},
{"cache-stats", &CacheStatsCmd, "Stats on cache"},
{"copy", &CopyCmd, "Copy file(s)"},
{"copy-state", &CopyStateCmd, "Copy zen server disk state"},