aboutsummaryrefslogtreecommitdiff
path: root/zenserver-test/zenserver-test.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-19 17:42:01 +0200
committerDan Engelbrecht <[email protected]>2022-06-13 12:04:43 +0200
commit42c522788579fe2254d6c5f2feca3948b765feba (patch)
tree794e4c0dc68d44d4daf174571341decf892505fb /zenserver-test/zenserver-test.cpp
parentWIP (diff)
downloadzen-42c522788579fe2254d6c5f2feca3948b765feba.tar.xz
zen-42c522788579fe2254d6c5f2feca3948b765feba.zip
more WIP
Diffstat (limited to 'zenserver-test/zenserver-test.cpp')
-rw-r--r--zenserver-test/zenserver-test.cpp257
1 files changed, 224 insertions, 33 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 239a1f768..0ffaf6b69 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -48,7 +48,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include <random>
#include <span>
#include <thread>
-#include <unordered_map>
#if ZEN_PLATFORM_WINDOWS
# include <ppl.h>
@@ -67,6 +66,10 @@ ZEN_THIRD_PARTY_INCLUDES_END
#if ZEN_WITH_TESTS
# define ZEN_TEST_WITH_RUNNER 1
# include <zencore/testing.h>
+# include <zencore/workthreadpool.h>
+# include <zencore/scopeguard.h>
+# include <unordered_set>
+# include <unordered_map>
#endif
using namespace std::literals;
@@ -2376,71 +2379,259 @@ TEST_CASE("zcache.rpc.upstream")
ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+ SpawnServer(LocalServer, LocalCfg);
+
+ const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalCfg.Port);
+ const auto UpstreamBaseUri = fmt::format("http://localhost:{}/z$", UpstreamCfg.Port);
+
const std::string Bucket = "AutoTestDummy";
- const size_t InNumKeys = 1;
- const size_t InNumValues = 1;
+ const size_t PackageCount = 500;
+ const size_t NumAttachments = 7;
+ const size_t BufferSizeMultiplier = (1024 * 1024) / NumAttachments;
- auto GenerateValue = [](size_t KeyIndex, size_t ValueIndex) {
- const size_t NumBytes = (ValueIndex + 1) * 10;
+ auto GenerateAttachment = [BufferSizeMultiplier](size_t DataIndexOffset, size_t ValueIndex) {
+ const size_t NumBytes = (ValueIndex + 1) * BufferSizeMultiplier * sizeof(uint32_t);
IoBuffer ValueContents(NumBytes);
- // Add N zeroed bytes where N corresponds to the value index times 10.
- uint8_t* DataPtr = reinterpret_cast<uint8_t*>(ValueContents.MutableData());
- for (size_t ContentIndex = 0; ContentIndex < NumBytes; ++ContentIndex)
+ uint8_t* DataPtr = reinterpret_cast<uint8_t*>(ValueContents.MutableData());
+
+ std::mt19937 mt(static_cast<uint32_t>(DataIndexOffset + ValueIndex));
+ uint32_t start = mt();
+ uint32_t step = mt();
+ for (size_t ContentIndex = 0; ContentIndex < NumBytes / sizeof(uint32_t); ++ContentIndex)
{
- DataPtr[ContentIndex] = static_cast<uint8_t>(KeyIndex + ContentIndex);
+ *reinterpret_cast<uint32_t*>(&DataPtr[ContentIndex * sizeof(uint32_t)]) = start;
+ start += step;
}
return ValueContents;
};
- auto GenerateCacheRecord = [](IoHash Hash, const std::string& Bucket, const std::vector<CompressedBuffer>& Values) {
+ auto GenerateCacheRecord = [](IoHash Hash, const std::string& Bucket, const std::vector<CompressedBuffer>& Attachments) {
CbObjectWriter Builder;
Builder.BeginObject("key"sv);
Builder << "Bucket"sv << Bucket << "Hash"sv << Hash;
Builder.EndObject();
Builder.BeginArray("Values"sv);
- for (size_t ValueIndex = 0; ValueIndex < Values.size(); ++ValueIndex)
+ for (size_t AttachmentIndex = 0; AttachmentIndex < Attachments.size(); ++AttachmentIndex)
{
Builder.BeginObject();
- std::string ValueName = fmt::format("%d", ValueIndex);
+ std::string ValueName = fmt::format("%d", AttachmentIndex);
Oid ValueId = Oid::FromMemory(IoHash::HashBuffer(ValueName.data(), ValueName.size() * sizeof(ValueName.data()[0])).Hash);
- Builder.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Values[ValueIndex].GetRawHash()));
+ Builder.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Attachments[AttachmentIndex].GetRawHash()));
Builder.AddObjectId("Id"sv, ValueId);
- Builder.AddInteger("RawSize"sv, Values[ValueIndex].GetRawSize());
+ Builder.AddInteger("RawSize"sv, Attachments[AttachmentIndex].GetRawSize());
Builder.EndObject();
}
Builder.EndArray();
return Builder.Save();
};
- auto GenerateCacheRecords = [&GenerateValue, &GenerateCacheRecord](const std::string Bucket, size_t InNumKeys, size_t InNumValues) {
- std::vector<CbObject> CacheRecords;
- CacheRecords.reserve(InNumKeys);
- for (size_t KeyIndex = 0; KeyIndex < InNumKeys; ++KeyIndex)
+ auto GeneratePackage = [&GenerateAttachment,
+ &GenerateCacheRecord](const std::string Bucket, size_t NumAttachments, size_t DataIndexOffset) {
+ IoHashStream Hasher;
+
+ std::vector<CompressedBuffer> CompressedValues;
+ CompressedValues.reserve(NumAttachments);
+ for (size_t AttachmentIndex = 0; AttachmentIndex < NumAttachments; ++AttachmentIndex)
{
- IoHashStream Hasher;
+ IoBuffer ValueContents = GenerateAttachment(BufferSizeMultiplier * DataIndexOffset, AttachmentIndex);
+ Hasher.Append(ValueContents.GetView());
+ auto CompressedData = CompressedBuffer::Compress(SharedBuffer(ValueContents));
+ size_t CompressedSize = CompressedData.GetCompressedSize();
+ CHECK(CompressedSize > 0);
+ CompressedValues.emplace_back(CompressedData);
+ }
+
+ IoHash Key = Hasher.GetHash();
+
+ CbObject CacheRecord = GenerateCacheRecord(Key, Bucket, CompressedValues);
- std::vector<CompressedBuffer> Values;
- Values.reserve(InNumValues);
- for (size_t ValueIndex = 0; ValueIndex < InNumValues; ++ValueIndex)
+ CbPackage Package;
+ Package.SetObject(CacheRecord);
+ for (const auto& Value : CompressedValues)
+ {
+ Package.AddAttachment(zen::CbAttachment(Value));
+ }
+ return Package;
+ };
+
+ // auto GeneratePackages =
+ // [&GeneratePackage](const std::string Bucket, size_t PackageCount, size_t NumAttachments, size_t DataIndexOffset) {
+ // std::vector<CbPackage> Packages;
+ // Packages.reserve(PackageCount);
+ // for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex)
+ // {
+ // CbPackage Package = GeneratePackage(Bucket, NumAttachments, PackageIndex + DataIndexOffset);
+ // Packages.emplace_back(Package);
+ // }
+ // return Packages;
+ // };
+
+ const size_t Iterations = 50;
+ std::vector<IoHash> AllPackages;
+ AllPackages.reserve(PackageCount * Iterations);
+
+ WorkerThreadPool WorkerPool(64);
+ for (size_t Iteration = 0; Iteration < Iterations; ++Iteration)
+ {
+ std::vector<std::pair<IoHash, std::unordered_set<BLAKE3, BLAKE3::Hasher> > > SentPackages;
+ SentPackages.resize(PackageCount);
+
+ {
+ std::vector<uint64_t> Timers;
+ Timers.resize(PackageCount);
+ std::atomic<size_t> WorkCompleted = 0;
+ size_t ItemIndex = 0;
+ Stopwatch PutRemoteTimer;
+ const auto _ = MakeGuard([&AllPackages, &PutRemoteTimer, &Timers] {
+ uint64_t ElapsedUS = PutRemoteTimer.GetElapsedTimeUs();
+ std::sort(Timers.begin(), Timers.end());
+ uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0;
+ uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0;
+ ZEN_INFO("put #{} items in remote in {}, worst {}, best {}",
+ AllPackages.size(),
+ NiceLatencyNs(ElapsedUS * 1000),
+ NiceLatencyNs(WorstUS * 1000),
+ NiceLatencyNs(BestUS * 1000));
+ });
+ for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex)
+ {
+ WorkerPool.ScheduleWork([&GeneratePackage,
+ &PackageIndex,
+ &Iteration,
+ &UpstreamBaseUri,
+ &Bucket,
+ &SentPackages,
+ &WorkCompleted,
+ &Timers,
+ ItemIndex]() {
+ CbPackage Package = GeneratePackage(Bucket, NumAttachments, PackageIndex + Iteration * PackageCount);
+ zen::IoHash Key = IoHash::HashBuffer(Package.GetObject().GetBuffer().GetView());
+ zen::BinaryWriter MemStream;
+ Package.Save(MemStream);
+
+ cpr::Response Result;
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); });
+ Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamBaseUri, Bucket, Key)},
+ cpr::Body{(const char*)MemStream.Data(), MemStream.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ }
+ CHECK(Result.status_code == 201);
+ std::unordered_set<BLAKE3, BLAKE3::Hasher> Attachments;
+ Attachments.reserve(Package.GetAttachments().size());
+ for (const CbAttachment& Attachment : Package.GetAttachments())
+ {
+ Attachments.insert(Attachment.AsCompressedBinary().GetRawHash());
+ }
+ SentPackages[ItemIndex] = {Key, Attachments};
+ WorkCompleted.fetch_add(1);
+ });
+ ItemIndex++;
+ }
+ while (WorkCompleted < PackageCount)
{
- IoBuffer ValueContents = GenerateValue(KeyIndex, ValueIndex);
- Hasher.Append(ValueContents.GetView());
- auto CompressedData = CompressedBuffer::Compress(SharedBuffer(ValueContents));
- Values.emplace_back(CompressedData);
+ Sleep(1);
}
+ }
- IoHash Hash = Hasher.GetHash();
+ {
+ std::vector<uint64_t> Timers;
+ Timers.resize(AllPackages.size());
+ std::atomic<size_t> WorkCompleted = 0;
+ size_t ItemIndex = 0;
+
+ Stopwatch HotLocalTimer;
+ const auto _ = MakeGuard([&AllPackages, &HotLocalTimer, &Timers] {
+ uint64_t ElapsedUS = HotLocalTimer.GetElapsedTimeUs();
+ std::sort(Timers.begin(), Timers.end());
+ uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0;
+ uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0;
+ ZEN_INFO("fetched #{} hot local, hot remote items in {}, worst {}, best {}",
+ AllPackages.size(),
+ NiceLatencyNs(ElapsedUS * 1000),
+ NiceLatencyNs(WorstUS * 1000),
+ NiceLatencyNs(BestUS * 1000));
+ });
+ for (const IoHash& Key : AllPackages)
+ {
+ WorkerPool.ScheduleWork([&LocalBaseUri, &Bucket, &Key, &WorkCompleted, &Timers, ItemIndex]() {
+ cpr::Response Result;
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); });
+ Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default", LocalBaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ }
+ CHECK(IsHttpSuccessCode(Result.status_code));
+ WorkCompleted.fetch_add(1);
+ });
+ ItemIndex++;
+ }
+ while (WorkCompleted < AllPackages.size())
+ {
+ Sleep(1);
+ }
+ }
- CbObject CacheRecord = GenerateCacheRecord(Hash, Bucket, Values);
- CacheRecords.emplace_back(CacheRecord);
+ {
+ std::vector<uint64_t> Timers;
+ Timers.resize(SentPackages.size());
+ std::atomic<size_t> WorkCompleted = 0;
+ size_t ItemIndex = 0;
+
+ Stopwatch HotRemoteTimer;
+ const auto _ = MakeGuard([&SentPackages, &HotRemoteTimer, &Timers] {
+ uint64_t ElapsedUS = HotRemoteTimer.GetElapsedTimeUs();
+ std::sort(Timers.begin(), Timers.end());
+ uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0;
+ uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0;
+ ZEN_INFO("fetched #{} cold local, hot remote items in {}, worst {}, best {}",
+ SentPackages.size(),
+ NiceLatencyNs(ElapsedUS * 1000),
+ NiceLatencyNs(WorstUS * 1000),
+ NiceLatencyNs(BestUS * 1000));
+ });
+ for (const auto& Entry : SentPackages)
+ {
+ WorkerPool.ScheduleWork([&Entry, &LocalBaseUri, &Bucket, &WorkCompleted, &Timers, ItemIndex]() {
+ const IoHash& Key = Entry.first;
+ const std::unordered_set<BLAKE3, BLAKE3::Hasher>& ExpectedAttachments = Entry.second;
+ // Get package with attachments
+ {
+ cpr::Response Result;
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); });
+ Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default", LocalBaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ }
+ CHECK(IsHttpSuccessCode(Result.status_code));
+ IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size());
+ CbPackage ResponsePackage;
+ CHECK(ResponsePackage.TryLoad(Buffer));
+ std::unordered_set<BLAKE3, BLAKE3::Hasher> ReceivedAttachments;
+ for (const auto& Attachement : ResponsePackage.GetAttachments())
+ {
+ ReceivedAttachments.insert(Attachement.AsCompressedBinary().GetRawHash());
+ }
+ CHECK(ReceivedAttachments == ExpectedAttachments);
+ }
+ WorkCompleted.fetch_add(1);
+ });
+ ItemIndex++;
+ AllPackages.push_back(Entry.first);
+ }
+ while (WorkCompleted < SentPackages.size())
+ {
+ Sleep(1);
+ }
}
- return CacheRecords;
- };
- std::vector<CbObject> CacheRecords = GenerateCacheRecords(Bucket, InNumKeys, InNumValues);
- CacheRecords.clear();
+ }
}
# if ZEN_WITH_EXEC_SERVICES