diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-26 00:40:36 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-06-13 12:06:27 +0200 |
| commit | 62e5a5a5537bfacd8e7dc8197b692763dce5bee4 (patch) | |
| tree | 274e676c89c760dc62ad722b2df716d1460b5911 /zenserver-test/zenserver-test.cpp | |
| parent | more WIP (diff) | |
| download | zen-62e5a5a5537bfacd8e7dc8197b692763dce5bee4.tar.xz zen-62e5a5a5537bfacd8e7dc8197b692763dce5bee4.zip | |
WIP
Diffstat (limited to 'zenserver-test/zenserver-test.cpp')
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 615 |
1 files changed, 442 insertions, 173 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 0ffaf6b69..c91af976c 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -70,6 +70,65 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <zencore/scopeguard.h> # include <unordered_set> # include <unordered_map> + +template<> +struct fmt::formatter<cpr::Response> +{ + constexpr auto parse(format_parse_context& Ctx) -> decltype(Ctx.begin()) { return Ctx.end(); } + + template<typename FormatContext> + auto format(const cpr::Response& Response, FormatContext& Ctx) -> decltype(Ctx.out()) + { + using namespace std::literals; + + if (Response.status_code == 200 || Response.status_code == 201) + { + return fmt::format_to(Ctx.out(), + "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s", + Response.url.str(), + Response.status_code, + Response.uploaded_bytes, + Response.downloaded_bytes, + Response.elapsed); + } + else + { + const auto It = Response.header.find("Content-Type"); + const std::string_view ContentType = It != Response.header.end() ? It->second : "<None>"sv; + + if (ContentType == "application/x-ue-cb"sv) + { + zen::IoBuffer Body(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + zen::CbObjectView Obj(Body.Data()); + zen::ExtendableStringBuilder<256> Sb; + std::string_view Json = Obj.ToJson(Sb).ToView(); + + return fmt::format_to(Ctx.out(), + "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Response: '{}', Reason: '{}'", + Response.url.str(), + Response.status_code, + Response.uploaded_bytes, + Response.downloaded_bytes, + Response.elapsed, + Json, + Response.reason); + } + else + { + return fmt::format_to(Ctx.out(), + "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Reponse: '{}', Reason: '{}'", + Response.url.str(), + Response.status_code, + Response.uploaded_bytes, + Response.downloaded_bytes, + Response.elapsed, + Response.text, + Response.reason); + } + } + } +}; + #endif using namespace std::literals; @@ -733,9 +792,10 @@ namespace utils { .Args = std::move(Args)}; } - static ZenConfig NewWithUpstream(uint16_t UpstreamPort) + static ZenConfig NewWithUpstream(uint16_t UpstreamPort, std::string Args = "") { - return New(13337, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort)); + return New(13337, + fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{} {}", UpstreamPort, Args)); } void Spawn(ZenServerInstance& Inst) @@ -2374,25 +2434,23 @@ TEST_CASE("zcache.rpc.upstream") { using namespace utils; - ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenConfig UpstreamCfg = ZenConfig::New(13338, "--http=asio"); ZenServerInstance UpstreamServer(TestEnv); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338, "--http=asio"); ZenServerInstance LocalServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); - SpawnServer(LocalServer, LocalCfg); + UpstreamServer.WaitUntilReady(); - const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalCfg.Port); - const auto UpstreamBaseUri = fmt::format("http://localhost:{}/z$", UpstreamCfg.Port); + const auto LocalBaseUri = fmt::format("http://localhost:{}", LocalCfg.Port); + const auto UpstreamBaseUri = fmt::format("http://localhost:{}", UpstreamCfg.Port); const std::string Bucket = "AutoTestDummy"; - const size_t PackageCount = 500; - const size_t NumAttachments = 7; - const size_t BufferSizeMultiplier = (1024 * 1024) / NumAttachments; + const size_t NumAttachments = 7; - auto GenerateAttachment = [BufferSizeMultiplier](size_t DataIndexOffset, size_t ValueIndex) { - const size_t NumBytes = (ValueIndex + 1) * BufferSizeMultiplier * sizeof(uint32_t); + auto GenerateAttachment = [](size_t DataIndexOffset, size_t ValueIndex) { + const size_t NumBytes = (1ull << (ValueIndex + 1) * 3) * sizeof(uint32_t); IoBuffer ValueContents(NumBytes); uint8_t* DataPtr = reinterpret_cast<uint8_t*>(ValueContents.MutableData()); @@ -2408,139 +2466,295 @@ TEST_CASE("zcache.rpc.upstream") return ValueContents; }; - auto GenerateCacheRecord = [](IoHash Hash, const std::string& Bucket, const std::vector<CompressedBuffer>& Attachments) { - CbObjectWriter Builder; - Builder.BeginObject("key"sv); - Builder << "Bucket"sv << Bucket << "Hash"sv << Hash; - Builder.EndObject(); - Builder.BeginArray("Values"sv); - for (size_t AttachmentIndex = 0; AttachmentIndex < Attachments.size(); ++AttachmentIndex) - { - Builder.BeginObject(); - - std::string ValueName = fmt::format("%d", AttachmentIndex); - Oid ValueId = Oid::FromMemory(IoHash::HashBuffer(ValueName.data(), ValueName.size() * sizeof(ValueName.data()[0])).Hash); - Builder.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Attachments[AttachmentIndex].GetRawHash())); - Builder.AddObjectId("Id"sv, ValueId); - Builder.AddInteger("RawSize"sv, Attachments[AttachmentIndex].GetRawSize()); - Builder.EndObject(); + auto AppendCacheRecord = + [](CbPackage& Package, CbWriter& Writer, const zen::CacheKey& CacheKey, const IoBuffer& Payload, CachePolicy RecordPolicy) { + CompressedBuffer Value = CompressedBuffer::Compress(SharedBuffer(Payload)); + CbAttachment Attachment(Value); + + Writer.BeginObject(); + { + Writer.BeginObject("Record"sv); + { + Writer.BeginObject("Key"sv); + { + Writer << "Bucket"sv << CacheKey.Bucket << "Hash"sv << CacheKey.Hash; + } + Writer.EndObject(); + Writer.BeginArray("Values"sv); + { + Writer.BeginObject(); + { + Writer.AddObjectId("Id"sv, Oid::NewOid()); + Writer.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Value.GetRawHash())); + Writer.AddInteger("RawSize"sv, Value.GetRawSize()); + } + Writer.EndObject(); + } + Writer.EndArray(); + } + Writer.EndObject(); + Writer.SetName("Policy"sv); + CacheRecordPolicy(RecordPolicy).Save(Writer); + } + Writer.EndObject(); + + Package.AddAttachment(Attachment); + }; + + auto CreateCachePackage = [&AppendCacheRecord](std::string_view Bucket, + const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& Attachments) -> CbPackage { + CbPackage Package; + CbWriter Writer; + + Writer.BeginObject(); + { + Writer << "Method"sv + << "PutCacheRecords"sv; + Writer.BeginObject("Params"sv); + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer.BeginArray("Requests"sv); + for (const auto& Entry : Attachments) + { + const IoHash& KeyHash = Entry.first; + const IoBuffer& Attachment = Entry.second; + const CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); + AppendCacheRecord(Package, Writer, CacheKey, Attachment, CachePolicy::Default); + } + Writer.EndArray(); + } + Writer.EndObject(); } - Builder.EndArray(); - return Builder.Save(); + Writer.EndObject(); + Package.SetObject(Writer.Save().AsObject()); + + return Package; + }; + + struct CacheResult + { + IoBuffer Response; + int64_t Bytes = {}; + double ElapsedSeconds = {}; + int32_t ErrorCode = {}; + std::string Reason; + bool Success = false; }; - auto GeneratePackage = [&GenerateAttachment, - &GenerateCacheRecord](const std::string Bucket, size_t NumAttachments, size_t DataIndexOffset) { - IoHashStream Hasher; + auto InvokeObjectViewRPC = [](std::string_view BaseUri, const CbObjectView& Request) -> CacheResult { + ExtendableStringBuilder<256> Uri; + Uri << BaseUri << "/z$/$rpc"; + + BinaryWriter Body; + Request.CopyTo(Body); - std::vector<CompressedBuffer> CompressedValues; - CompressedValues.reserve(NumAttachments); - for (size_t AttachmentIndex = 0; AttachmentIndex < NumAttachments; ++AttachmentIndex) + cpr::Session Session; + Session.SetBody({}); + Session.SetHeader({}); + Session.SetConnectTimeout(0); + Session.SetTimeout(0); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}); + Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()}); + + cpr::Response Response = Session.Post(); + // ZEN_DEBUG("POST {}", Response); + + if (Response.error) { - IoBuffer ValueContents = GenerateAttachment(BufferSizeMultiplier * DataIndexOffset, AttachmentIndex); - Hasher.Append(ValueContents.GetView()); - auto CompressedData = CompressedBuffer::Compress(SharedBuffer(ValueContents)); - size_t CompressedSize = CompressedData.GetCompressedSize(); - CHECK(CompressedSize > 0); - CompressedValues.emplace_back(CompressedData); + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; } - IoHash Key = Hasher.GetHash(); + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - CbObject CacheRecord = GenerateCacheRecord(Key, Bucket, CompressedValues); + return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + }; - CbPackage Package; - Package.SetObject(CacheRecord); - for (const auto& Value : CompressedValues) + auto InvokePackageRPC = [](std::string_view BaseUri, const CbPackage& Request) -> CacheResult { + ExtendableStringBuilder<256> Uri; + Uri << BaseUri << "/z$/$rpc"; + + SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten(); + + cpr::Session Session; + Session.SetBody({}); + Session.SetHeader({}); + // Session.SetConnectTimeout(0); + // Session.SetTimeout(0); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}); + Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()}); + + cpr::Response Response = Session.Post(); + // ZEN_DEBUG("POST {}", Response); + + if (Response.error) { - Package.AddAttachment(zen::CbAttachment(Value)); + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; } - return Package; - }; - // auto GeneratePackages = - // [&GeneratePackage](const std::string Bucket, size_t PackageCount, size_t NumAttachments, size_t DataIndexOffset) { - // std::vector<CbPackage> Packages; - // Packages.reserve(PackageCount); - // for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex) - // { - // CbPackage Package = GeneratePackage(Bucket, NumAttachments, PackageIndex + DataIndexOffset); - // Packages.emplace_back(Package); - // } - // return Packages; - // }; - - const size_t Iterations = 50; - std::vector<IoHash> AllPackages; - AllPackages.reserve(PackageCount * Iterations); + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - WorkerThreadPool WorkerPool(64); - for (size_t Iteration = 0; Iteration < Iterations; ++Iteration) - { - std::vector<std::pair<IoHash, std::unordered_set<BLAKE3, BLAKE3::Hasher> > > SentPackages; - SentPackages.resize(PackageCount); + return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + }; + + auto CreateGetCacheRecord = [](std::string_view Bucket, + IoHash Key, + std::unordered_set<IoHash, IoHash::Hasher> AttachmentKeys, + zen::CachePolicy Policy, + BinaryWriter& Body) { + using namespace zen; + CbObjectWriter Request; + Request << "Method"sv + << "GetCacheRecords"sv; + Request.BeginObject("Params"sv); { - std::vector<uint64_t> Timers; - Timers.resize(PackageCount); - std::atomic<size_t> WorkCompleted = 0; - size_t ItemIndex = 0; - Stopwatch PutRemoteTimer; - const auto _ = MakeGuard([&AllPackages, &PutRemoteTimer, &Timers] { - uint64_t ElapsedUS = PutRemoteTimer.GetElapsedTimeUs(); - std::sort(Timers.begin(), Timers.end()); - uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0; - uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0; - ZEN_INFO("put #{} items in remote in {}, worst {}, best {}", - AllPackages.size(), - NiceLatencyNs(ElapsedUS * 1000), - NiceLatencyNs(WorstUS * 1000), - NiceLatencyNs(BestUS * 1000)); - }); - for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex) + Request << "DefaultPolicy"sv << WriteToString<128>(Policy); + Request.BeginArray("Requests"sv); + Request.BeginObject(); { - WorkerPool.ScheduleWork([&GeneratePackage, - &PackageIndex, - &Iteration, - &UpstreamBaseUri, - &Bucket, - &SentPackages, - &WorkCompleted, - &Timers, - ItemIndex]() { - CbPackage Package = GeneratePackage(Bucket, NumAttachments, PackageIndex + Iteration * PackageCount); - zen::IoHash Key = IoHash::HashBuffer(Package.GetObject().GetBuffer().GetView()); - zen::BinaryWriter MemStream; - Package.Save(MemStream); - - cpr::Response Result; + Request.BeginObject("Key"sv); + { + Request << "Bucket"sv << Bucket << "Hash"sv << Key; + } + Request.EndObject(); + } + Request.EndObject(); + for (const auto& AttachmentKey : AttachmentKeys) + { + Request.BeginObject(); + { + Request.BeginObject("Key"sv); { - Stopwatch Timer; - const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); }); - Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamBaseUri, Bucket, Key)}, - cpr::Body{(const char*)MemStream.Data(), MemStream.Size()}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + Request << "Bucket"sv << Bucket << "Hash"sv << AttachmentKey; } - CHECK(Result.status_code == 201); - std::unordered_set<BLAKE3, BLAKE3::Hasher> Attachments; - Attachments.reserve(Package.GetAttachments().size()); - for (const CbAttachment& Attachment : Package.GetAttachments()) + Request.EndObject(); + } + Request.EndObject(); + } + Request.EndArray(); + } + Request.EndObject(); + + Request.Save(Body); + }; + + auto ToIoBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { + zen::BinaryWriter MemStream; + Package.Save(MemStream); + return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + }; + + const size_t PackageCount = 250; + const size_t Iterations = 20; + + std::vector<std::pair<IoHash, std::unordered_set<IoHash, IoHash::Hasher> > > SentPackages; + SentPackages.resize(PackageCount * Iterations); + + ZEN_INFO("generating #{} packages", Iterations * PackageCount); + WorkerThreadPool WorkerPool(32); + for (size_t Iteration = 0; Iteration < Iterations; ++Iteration) + { + size_t ItemOffset = Iteration * PackageCount; + std::vector<uint64_t> Timers; + Timers.resize(PackageCount); + size_t ItemIndex = 0; + std::atomic<size_t> WorkCompleted = 0; + Stopwatch PutRemoteTimer; + const auto _ = MakeGuard([PackageCount, &PutRemoteTimer, &Timers, Iteration, Iterations] { + uint64_t ElapsedUS = PutRemoteTimer.GetElapsedTimeUs(); + std::sort(Timers.begin(), Timers.end()); + uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0; + uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0; + ZEN_INFO("put #{} items in remote in {}, worst {}, best {}, {}/{}", + PackageCount, + NiceLatencyNs(ElapsedUS * 1000), + NiceLatencyNs(WorstUS * 1000), + NiceLatencyNs(BestUS * 1000), + Iteration + 1, + Iterations); + }); + for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex) + { + WorkerPool.ScheduleWork([&CreateCachePackage, + &GenerateAttachment, + &InvokePackageRPC, + &PackageIndex, + &Iteration, + &UpstreamBaseUri, + &Bucket, + &SentPackages, + &WorkCompleted, + &Timers, + ItemIndex, + ItemOffset]() { + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Attachments; + Attachments.reserve(NumAttachments); + for (size_t AttachmentIdx = 0; AttachmentIdx < NumAttachments; ++AttachmentIdx) + { + IoBuffer Attachment = + GenerateAttachment((Iteration * PackageCount * NumAttachments) + (PackageCount * NumAttachments), AttachmentIdx); + IoHash Key = IoHash::HashBuffer(Attachment.GetView()); + + Attachments.insert_or_assign(Key, Attachment); + } + + CbPackage Package = CreateCachePackage(Bucket, Attachments); + + { + Stopwatch Timer; + const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); }); + CacheResult Result = InvokePackageRPC(UpstreamBaseUri, Package); + if (!Result.Success) { - Attachments.insert(Attachment.AsCompressedBinary().GetRawHash()); + Sleep(1000); + Result = InvokePackageRPC(UpstreamBaseUri, Package); } - SentPackages[ItemIndex] = {Key, Attachments}; - WorkCompleted.fetch_add(1); - }); - ItemIndex++; - } - while (WorkCompleted < PackageCount) - { - Sleep(1); - } + CHECK(Result.Success); + } + + std::unordered_set<IoHash, IoHash::Hasher> AttachmentKeys; + AttachmentKeys.reserve(Attachments.size()); + for (const auto& Entry : Attachments) + { + AttachmentKeys.insert(Entry.first); + } + IoHash Key; + ((uint32_t*)(Key.Hash))[0] = (uint32_t)(Iteration * PackageCount + PackageIndex); + SentPackages[ItemIndex + ItemOffset] = make_pair(Key, AttachmentKeys); + WorkCompleted.fetch_add(1); + }); + ItemIndex++; } + while (WorkCompleted < PackageCount) + { + Sleep(1); + } + } + + SpawnServer(LocalServer, LocalCfg); + LocalServer.WaitUntilReady(); + struct FetchInfo + { + uint64_t FetchTime; + uint64_t FetchSize; + IoHash FetchKey; + }; + + std::vector<std::pair<IoHash, std::unordered_set<IoHash, IoHash::Hasher> > > AllPackages; + AllPackages.reserve(PackageCount * Iterations); + for (size_t Iteration = 0; Iteration < Iterations; ++Iteration) + { + if (!AllPackages.empty()) { - std::vector<uint64_t> Timers; + std::vector<FetchInfo> Timers; Timers.resize(AllPackages.size()); std::atomic<size_t> WorkCompleted = 0; size_t ItemIndex = 0; @@ -2548,28 +2762,60 @@ TEST_CASE("zcache.rpc.upstream") Stopwatch HotLocalTimer; const auto _ = MakeGuard([&AllPackages, &HotLocalTimer, &Timers] { uint64_t ElapsedUS = HotLocalTimer.GetElapsedTimeUs(); - std::sort(Timers.begin(), Timers.end()); - uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0; - uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0; - ZEN_INFO("fetched #{} hot local, hot remote items in {}, worst {}, best {}", + std::sort(Timers.begin(), Timers.end(), [](const struct FetchInfo& LHS, const struct FetchInfo& RHS) { + return LHS.FetchTime < RHS.FetchTime; + }); + FetchInfo Worst = Timers.size() > 0 ? Timers.back() : FetchInfo{}; + FetchInfo Best = Timers.size() > 0 ? Timers.front() : FetchInfo{}; + FetchInfo Median = Timers.size() > 0 ? Timers[Timers.size() / 2] : FetchInfo{}; + ZEN_INFO("fetched #{} hot local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}", AllPackages.size(), NiceLatencyNs(ElapsedUS * 1000), - NiceLatencyNs(WorstUS * 1000), - NiceLatencyNs(BestUS * 1000)); + Worst.FetchKey.ToHexString().substr(0, 6), + NiceBytes(Worst.FetchSize), + NiceLatencyNs(Worst.FetchTime * 1000), + Best.FetchKey.ToHexString().substr(0, 6), + NiceBytes(Best.FetchSize), + NiceLatencyNs(Best.FetchTime * 1000), + Median.FetchKey.ToHexString().substr(0, 6), + NiceBytes(Median.FetchSize), + NiceLatencyNs(Median.FetchTime * 1000)); }); - for (const IoHash& Key : AllPackages) + for (const auto& Entry : AllPackages) { - WorkerPool.ScheduleWork([&LocalBaseUri, &Bucket, &Key, &WorkCompleted, &Timers, ItemIndex]() { - cpr::Response Result; - { - Stopwatch Timer; - const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); }); - Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default", LocalBaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - } - CHECK(IsHttpSuccessCode(Result.status_code)); - WorkCompleted.fetch_add(1); - }); + WorkerPool.ScheduleWork( + [&LocalBaseUri, &Bucket, &Entry, &WorkCompleted, &Timers, ItemIndex, &CreateGetCacheRecord, &InvokeObjectViewRPC]() { + const IoHash& Key = Entry.first; + BinaryWriter Body; + CreateGetCacheRecord(Bucket, Key, Entry.second, CachePolicy::Default, Body); + auto BodyView = CbObjectView(Body.GetData()); + + CacheResult Result; + { + Stopwatch Timer; + const auto _ = + MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex].FetchTime = Timer.GetElapsedTimeUs(); }); + Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); + while (!Result.Success && Result.ErrorCode == 1) + { + Sleep(1000); + Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); + } + CHECK(Result.Success); + } + + CbPackage Response; + CHECK(Response.TryLoad(Result.Response)); + CHECK(!Response.IsNull()); + uint64_t Size = Response.GetObject().GetSize(); + for (const auto& Attachment : Response.GetAttachments()) + { + Size += Attachment.AsCompressedBinary().GetCompressedSize(); + } + Timers[ItemIndex].FetchSize = Size; + Timers[ItemIndex].FetchKey = Key; + WorkCompleted.fetch_add(1); + }); ItemIndex++; } while (WorkCompleted < AllPackages.size()) @@ -2579,54 +2825,77 @@ TEST_CASE("zcache.rpc.upstream") } { - std::vector<uint64_t> Timers; - Timers.resize(SentPackages.size()); + std::vector<FetchInfo> Timers; + Timers.resize(PackageCount); std::atomic<size_t> WorkCompleted = 0; size_t ItemIndex = 0; Stopwatch HotRemoteTimer; - const auto _ = MakeGuard([&SentPackages, &HotRemoteTimer, &Timers] { + const auto _ = MakeGuard([&HotRemoteTimer, &Timers] { uint64_t ElapsedUS = HotRemoteTimer.GetElapsedTimeUs(); - std::sort(Timers.begin(), Timers.end()); - uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0; - uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0; - ZEN_INFO("fetched #{} cold local, hot remote items in {}, worst {}, best {}", - SentPackages.size(), + std::sort(Timers.begin(), Timers.end(), [](const struct FetchInfo& LHS, const struct FetchInfo& RHS) { + return LHS.FetchTime < RHS.FetchTime; + }); + FetchInfo Worst = Timers.size() > 0 ? Timers.back() : FetchInfo{}; + FetchInfo Best = Timers.size() > 0 ? Timers.front() : FetchInfo{}; + FetchInfo Median = Timers.size() > 0 ? Timers[Timers.size() / 2] : FetchInfo{}; + ZEN_INFO("fetched #{} cold local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}", + Timers.size(), NiceLatencyNs(ElapsedUS * 1000), - NiceLatencyNs(WorstUS * 1000), - NiceLatencyNs(BestUS * 1000)); + Worst.FetchKey.ToHexString().substr(0, 6), + NiceBytes(Worst.FetchSize), + NiceLatencyNs(Worst.FetchTime * 1000), + Best.FetchKey.ToHexString().substr(0, 6), + NiceBytes(Best.FetchSize), + NiceLatencyNs(Best.FetchTime * 1000), + Median.FetchKey.ToHexString().substr(0, 6), + NiceBytes(Median.FetchSize), + NiceLatencyNs(Median.FetchTime * 1000)); }); - for (const auto& Entry : SentPackages) + for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex) { - WorkerPool.ScheduleWork([&Entry, &LocalBaseUri, &Bucket, &WorkCompleted, &Timers, ItemIndex]() { - const IoHash& Key = Entry.first; - const std::unordered_set<BLAKE3, BLAKE3::Hasher>& ExpectedAttachments = Entry.second; - // Get package with attachments - { - cpr::Response Result; + const auto& Entry = SentPackages[Iteration * PackageCount + PackageIndex]; + + WorkerPool.ScheduleWork( + [&LocalBaseUri, &Bucket, &Entry, &WorkCompleted, &Timers, ItemIndex, &CreateGetCacheRecord, &InvokeObjectViewRPC]() { + const IoHash& Key = Entry.first; + + BinaryWriter Body; + CreateGetCacheRecord(Bucket, Key, Entry.second, CachePolicy::Default, Body); + auto BodyView = CbObjectView(Body.GetData()); + + CacheResult Result; { Stopwatch Timer; - const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); }); - Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default", LocalBaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + const auto _ = + MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex].FetchTime = Timer.GetElapsedTimeUs(); }); + Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); + Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); + while (!Result.Success && Result.ErrorCode == 1) + { + Sleep(1000); + Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); + } + CHECK(Result.Success); } - CHECK(IsHttpSuccessCode(Result.status_code)); - IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size()); - CbPackage ResponsePackage; - CHECK(ResponsePackage.TryLoad(Buffer)); - std::unordered_set<BLAKE3, BLAKE3::Hasher> ReceivedAttachments; - for (const auto& Attachement : ResponsePackage.GetAttachments()) + + CbPackage Response; + CHECK(Response.TryLoad(Result.Response)); + CHECK(!Response.IsNull()); + uint64_t Size = Response.GetObject().GetSize(); + for (const auto& Attachment : Response.GetAttachments()) { - ReceivedAttachments.insert(Attachement.AsCompressedBinary().GetRawHash()); + Size += Attachment.AsCompressedBinary().GetCompressedSize(); } - CHECK(ReceivedAttachments == ExpectedAttachments); - } - WorkCompleted.fetch_add(1); - }); + Timers[ItemIndex].FetchSize = Size; + Timers[ItemIndex].FetchKey = Key; + WorkCompleted.fetch_add(1); + }); + ItemIndex++; - AllPackages.push_back(Entry.first); + AllPackages.push_back(Entry); } - while (WorkCompleted < SentPackages.size()) + while (WorkCompleted < PackageCount) { Sleep(1); } |