diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-19 17:42:01 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-06-13 12:04:43 +0200 |
| commit | 42c522788579fe2254d6c5f2feca3948b765feba (patch) | |
| tree | 794e4c0dc68d44d4daf174571341decf892505fb /zenserver-test/zenserver-test.cpp | |
| parent | WIP (diff) | |
| download | zen-42c522788579fe2254d6c5f2feca3948b765feba.tar.xz zen-42c522788579fe2254d6c5f2feca3948b765feba.zip | |
more WIP
Diffstat (limited to 'zenserver-test/zenserver-test.cpp')
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 257 |
1 files changed, 224 insertions, 33 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 239a1f768..0ffaf6b69 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -48,7 +48,6 @@ ZEN_THIRD_PARTY_INCLUDES_END #include <random> #include <span> #include <thread> -#include <unordered_map> #if ZEN_PLATFORM_WINDOWS # include <ppl.h> @@ -67,6 +66,10 @@ ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # define ZEN_TEST_WITH_RUNNER 1 # include <zencore/testing.h> +# include <zencore/workthreadpool.h> +# include <zencore/scopeguard.h> +# include <unordered_set> +# include <unordered_map> #endif using namespace std::literals; @@ -2376,71 +2379,259 @@ TEST_CASE("zcache.rpc.upstream") ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); ZenServerInstance LocalServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + SpawnServer(LocalServer, LocalCfg); + + const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalCfg.Port); + const auto UpstreamBaseUri = fmt::format("http://localhost:{}/z$", UpstreamCfg.Port); + const std::string Bucket = "AutoTestDummy"; - const size_t InNumKeys = 1; - const size_t InNumValues = 1; + const size_t PackageCount = 500; + const size_t NumAttachments = 7; + const size_t BufferSizeMultiplier = (1024 * 1024) / NumAttachments; - auto GenerateValue = [](size_t KeyIndex, size_t ValueIndex) { - const size_t NumBytes = (ValueIndex + 1) * 10; + auto GenerateAttachment = [BufferSizeMultiplier](size_t DataIndexOffset, size_t ValueIndex) { + const size_t NumBytes = (ValueIndex + 1) * BufferSizeMultiplier * sizeof(uint32_t); IoBuffer ValueContents(NumBytes); - // Add N zeroed bytes where N corresponds to the value index times 10. - uint8_t* DataPtr = reinterpret_cast<uint8_t*>(ValueContents.MutableData()); - for (size_t ContentIndex = 0; ContentIndex < NumBytes; ++ContentIndex) + uint8_t* DataPtr = reinterpret_cast<uint8_t*>(ValueContents.MutableData()); + + std::mt19937 mt(static_cast<uint32_t>(DataIndexOffset + ValueIndex)); + uint32_t start = mt(); + uint32_t step = mt(); + for (size_t ContentIndex = 0; ContentIndex < NumBytes / sizeof(uint32_t); ++ContentIndex) { - DataPtr[ContentIndex] = static_cast<uint8_t>(KeyIndex + ContentIndex); + *reinterpret_cast<uint32_t*>(&DataPtr[ContentIndex * sizeof(uint32_t)]) = start; + start += step; } return ValueContents; }; - auto GenerateCacheRecord = [](IoHash Hash, const std::string& Bucket, const std::vector<CompressedBuffer>& Values) { + auto GenerateCacheRecord = [](IoHash Hash, const std::string& Bucket, const std::vector<CompressedBuffer>& Attachments) { CbObjectWriter Builder; Builder.BeginObject("key"sv); Builder << "Bucket"sv << Bucket << "Hash"sv << Hash; Builder.EndObject(); Builder.BeginArray("Values"sv); - for (size_t ValueIndex = 0; ValueIndex < Values.size(); ++ValueIndex) + for (size_t AttachmentIndex = 0; AttachmentIndex < Attachments.size(); ++AttachmentIndex) { Builder.BeginObject(); - std::string ValueName = fmt::format("%d", ValueIndex); + std::string ValueName = fmt::format("%d", AttachmentIndex); Oid ValueId = Oid::FromMemory(IoHash::HashBuffer(ValueName.data(), ValueName.size() * sizeof(ValueName.data()[0])).Hash); - Builder.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Values[ValueIndex].GetRawHash())); + Builder.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Attachments[AttachmentIndex].GetRawHash())); Builder.AddObjectId("Id"sv, ValueId); - Builder.AddInteger("RawSize"sv, Values[ValueIndex].GetRawSize()); + Builder.AddInteger("RawSize"sv, Attachments[AttachmentIndex].GetRawSize()); Builder.EndObject(); } Builder.EndArray(); return Builder.Save(); }; - auto GenerateCacheRecords = [&GenerateValue, &GenerateCacheRecord](const std::string Bucket, size_t InNumKeys, size_t InNumValues) { - std::vector<CbObject> CacheRecords; - CacheRecords.reserve(InNumKeys); - for (size_t KeyIndex = 0; KeyIndex < InNumKeys; ++KeyIndex) + auto GeneratePackage = [&GenerateAttachment, + &GenerateCacheRecord](const std::string Bucket, size_t NumAttachments, size_t DataIndexOffset) { + IoHashStream Hasher; + + std::vector<CompressedBuffer> CompressedValues; + CompressedValues.reserve(NumAttachments); + for (size_t AttachmentIndex = 0; AttachmentIndex < NumAttachments; ++AttachmentIndex) { - IoHashStream Hasher; + IoBuffer ValueContents = GenerateAttachment(BufferSizeMultiplier * DataIndexOffset, AttachmentIndex); + Hasher.Append(ValueContents.GetView()); + auto CompressedData = CompressedBuffer::Compress(SharedBuffer(ValueContents)); + size_t CompressedSize = CompressedData.GetCompressedSize(); + CHECK(CompressedSize > 0); + CompressedValues.emplace_back(CompressedData); + } + + IoHash Key = Hasher.GetHash(); + + CbObject CacheRecord = GenerateCacheRecord(Key, Bucket, CompressedValues); - std::vector<CompressedBuffer> Values; - Values.reserve(InNumValues); - for (size_t ValueIndex = 0; ValueIndex < InNumValues; ++ValueIndex) + CbPackage Package; + Package.SetObject(CacheRecord); + for (const auto& Value : CompressedValues) + { + Package.AddAttachment(zen::CbAttachment(Value)); + } + return Package; + }; + + // auto GeneratePackages = + // [&GeneratePackage](const std::string Bucket, size_t PackageCount, size_t NumAttachments, size_t DataIndexOffset) { + // std::vector<CbPackage> Packages; + // Packages.reserve(PackageCount); + // for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex) + // { + // CbPackage Package = GeneratePackage(Bucket, NumAttachments, PackageIndex + DataIndexOffset); + // Packages.emplace_back(Package); + // } + // return Packages; + // }; + + const size_t Iterations = 50; + std::vector<IoHash> AllPackages; + AllPackages.reserve(PackageCount * Iterations); + + WorkerThreadPool WorkerPool(64); + for (size_t Iteration = 0; Iteration < Iterations; ++Iteration) + { + std::vector<std::pair<IoHash, std::unordered_set<BLAKE3, BLAKE3::Hasher> > > SentPackages; + SentPackages.resize(PackageCount); + + { + std::vector<uint64_t> Timers; + Timers.resize(PackageCount); + std::atomic<size_t> WorkCompleted = 0; + size_t ItemIndex = 0; + Stopwatch PutRemoteTimer; + const auto _ = MakeGuard([&AllPackages, &PutRemoteTimer, &Timers] { + uint64_t ElapsedUS = PutRemoteTimer.GetElapsedTimeUs(); + std::sort(Timers.begin(), Timers.end()); + uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0; + uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0; + ZEN_INFO("put #{} items in remote in {}, worst {}, best {}", + AllPackages.size(), + NiceLatencyNs(ElapsedUS * 1000), + NiceLatencyNs(WorstUS * 1000), + NiceLatencyNs(BestUS * 1000)); + }); + for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex) + { + WorkerPool.ScheduleWork([&GeneratePackage, + &PackageIndex, + &Iteration, + &UpstreamBaseUri, + &Bucket, + &SentPackages, + &WorkCompleted, + &Timers, + ItemIndex]() { + CbPackage Package = GeneratePackage(Bucket, NumAttachments, PackageIndex + Iteration * PackageCount); + zen::IoHash Key = IoHash::HashBuffer(Package.GetObject().GetBuffer().GetView()); + zen::BinaryWriter MemStream; + Package.Save(MemStream); + + cpr::Response Result; + { + Stopwatch Timer; + const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); }); + Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamBaseUri, Bucket, Key)}, + cpr::Body{(const char*)MemStream.Data(), MemStream.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + } + CHECK(Result.status_code == 201); + std::unordered_set<BLAKE3, BLAKE3::Hasher> Attachments; + Attachments.reserve(Package.GetAttachments().size()); + for (const CbAttachment& Attachment : Package.GetAttachments()) + { + Attachments.insert(Attachment.AsCompressedBinary().GetRawHash()); + } + SentPackages[ItemIndex] = {Key, Attachments}; + WorkCompleted.fetch_add(1); + }); + ItemIndex++; + } + while (WorkCompleted < PackageCount) { - IoBuffer ValueContents = GenerateValue(KeyIndex, ValueIndex); - Hasher.Append(ValueContents.GetView()); - auto CompressedData = CompressedBuffer::Compress(SharedBuffer(ValueContents)); - Values.emplace_back(CompressedData); + Sleep(1); } + } - IoHash Hash = Hasher.GetHash(); + { + std::vector<uint64_t> Timers; + Timers.resize(AllPackages.size()); + std::atomic<size_t> WorkCompleted = 0; + size_t ItemIndex = 0; + + Stopwatch HotLocalTimer; + const auto _ = MakeGuard([&AllPackages, &HotLocalTimer, &Timers] { + uint64_t ElapsedUS = HotLocalTimer.GetElapsedTimeUs(); + std::sort(Timers.begin(), Timers.end()); + uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0; + uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0; + ZEN_INFO("fetched #{} hot local, hot remote items in {}, worst {}, best {}", + AllPackages.size(), + NiceLatencyNs(ElapsedUS * 1000), + NiceLatencyNs(WorstUS * 1000), + NiceLatencyNs(BestUS * 1000)); + }); + for (const IoHash& Key : AllPackages) + { + WorkerPool.ScheduleWork([&LocalBaseUri, &Bucket, &Key, &WorkCompleted, &Timers, ItemIndex]() { + cpr::Response Result; + { + Stopwatch Timer; + const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); }); + Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default", LocalBaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + } + CHECK(IsHttpSuccessCode(Result.status_code)); + WorkCompleted.fetch_add(1); + }); + ItemIndex++; + } + while (WorkCompleted < AllPackages.size()) + { + Sleep(1); + } + } - CbObject CacheRecord = GenerateCacheRecord(Hash, Bucket, Values); - CacheRecords.emplace_back(CacheRecord); + { + std::vector<uint64_t> Timers; + Timers.resize(SentPackages.size()); + std::atomic<size_t> WorkCompleted = 0; + size_t ItemIndex = 0; + + Stopwatch HotRemoteTimer; + const auto _ = MakeGuard([&SentPackages, &HotRemoteTimer, &Timers] { + uint64_t ElapsedUS = HotRemoteTimer.GetElapsedTimeUs(); + std::sort(Timers.begin(), Timers.end()); + uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0; + uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0; + ZEN_INFO("fetched #{} cold local, hot remote items in {}, worst {}, best {}", + SentPackages.size(), + NiceLatencyNs(ElapsedUS * 1000), + NiceLatencyNs(WorstUS * 1000), + NiceLatencyNs(BestUS * 1000)); + }); + for (const auto& Entry : SentPackages) + { + WorkerPool.ScheduleWork([&Entry, &LocalBaseUri, &Bucket, &WorkCompleted, &Timers, ItemIndex]() { + const IoHash& Key = Entry.first; + const std::unordered_set<BLAKE3, BLAKE3::Hasher>& ExpectedAttachments = Entry.second; + // Get package with attachments + { + cpr::Response Result; + { + Stopwatch Timer; + const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); }); + Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default", LocalBaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + } + CHECK(IsHttpSuccessCode(Result.status_code)); + IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size()); + CbPackage ResponsePackage; + CHECK(ResponsePackage.TryLoad(Buffer)); + std::unordered_set<BLAKE3, BLAKE3::Hasher> ReceivedAttachments; + for (const auto& Attachement : ResponsePackage.GetAttachments()) + { + ReceivedAttachments.insert(Attachement.AsCompressedBinary().GetRawHash()); + } + CHECK(ReceivedAttachments == ExpectedAttachments); + } + WorkCompleted.fetch_add(1); + }); + ItemIndex++; + AllPackages.push_back(Entry.first); + } + while (WorkCompleted < SentPackages.size()) + { + Sleep(1); + } } - return CacheRecords; - }; - std::vector<CbObject> CacheRecords = GenerateCacheRecords(Bucket, InNumKeys, InNumValues); - CacheRecords.clear(); + } } # if ZEN_WITH_EXEC_SERVICES |