aboutsummaryrefslogtreecommitdiff
path: root/zenserver-test/zenserver-test.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-26 00:40:36 +0200
committerDan Engelbrecht <[email protected]>2022-06-13 12:06:27 +0200
commit62e5a5a5537bfacd8e7dc8197b692763dce5bee4 (patch)
tree274e676c89c760dc62ad722b2df716d1460b5911 /zenserver-test/zenserver-test.cpp
parentmore WIP (diff)
downloadzen-62e5a5a5537bfacd8e7dc8197b692763dce5bee4.tar.xz
zen-62e5a5a5537bfacd8e7dc8197b692763dce5bee4.zip
WIP
Diffstat (limited to 'zenserver-test/zenserver-test.cpp')
-rw-r--r--zenserver-test/zenserver-test.cpp615
1 files changed, 442 insertions, 173 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 0ffaf6b69..c91af976c 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -70,6 +70,65 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <zencore/scopeguard.h>
# include <unordered_set>
# include <unordered_map>
+
+template<>
+struct fmt::formatter<cpr::Response>
+{
+ constexpr auto parse(format_parse_context& Ctx) -> decltype(Ctx.begin()) { return Ctx.end(); }
+
+ template<typename FormatContext>
+ auto format(const cpr::Response& Response, FormatContext& Ctx) -> decltype(Ctx.out())
+ {
+ using namespace std::literals;
+
+ if (Response.status_code == 200 || Response.status_code == 201)
+ {
+ return fmt::format_to(Ctx.out(),
+ "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s",
+ Response.url.str(),
+ Response.status_code,
+ Response.uploaded_bytes,
+ Response.downloaded_bytes,
+ Response.elapsed);
+ }
+ else
+ {
+ const auto It = Response.header.find("Content-Type");
+ const std::string_view ContentType = It != Response.header.end() ? It->second : "<None>"sv;
+
+ if (ContentType == "application/x-ue-cb"sv)
+ {
+ zen::IoBuffer Body(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ zen::CbObjectView Obj(Body.Data());
+ zen::ExtendableStringBuilder<256> Sb;
+ std::string_view Json = Obj.ToJson(Sb).ToView();
+
+ return fmt::format_to(Ctx.out(),
+ "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Response: '{}', Reason: '{}'",
+ Response.url.str(),
+ Response.status_code,
+ Response.uploaded_bytes,
+ Response.downloaded_bytes,
+ Response.elapsed,
+ Json,
+ Response.reason);
+ }
+ else
+ {
+ return fmt::format_to(Ctx.out(),
+ "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Reponse: '{}', Reason: '{}'",
+ Response.url.str(),
+ Response.status_code,
+ Response.uploaded_bytes,
+ Response.downloaded_bytes,
+ Response.elapsed,
+ Response.text,
+ Response.reason);
+ }
+ }
+ }
+};
+
#endif
using namespace std::literals;
@@ -733,9 +792,10 @@ namespace utils {
.Args = std::move(Args)};
}
- static ZenConfig NewWithUpstream(uint16_t UpstreamPort)
+ static ZenConfig NewWithUpstream(uint16_t UpstreamPort, std::string Args = "")
{
- return New(13337, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort));
+ return New(13337,
+ fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{} {}", UpstreamPort, Args));
}
void Spawn(ZenServerInstance& Inst)
@@ -2374,25 +2434,23 @@ TEST_CASE("zcache.rpc.upstream")
{
using namespace utils;
- ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenConfig UpstreamCfg = ZenConfig::New(13338, "--http=asio");
ZenServerInstance UpstreamServer(TestEnv);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338, "--http=asio");
ZenServerInstance LocalServer(TestEnv);
SpawnServer(UpstreamServer, UpstreamCfg);
- SpawnServer(LocalServer, LocalCfg);
+ UpstreamServer.WaitUntilReady();
- const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalCfg.Port);
- const auto UpstreamBaseUri = fmt::format("http://localhost:{}/z$", UpstreamCfg.Port);
+ const auto LocalBaseUri = fmt::format("http://localhost:{}", LocalCfg.Port);
+ const auto UpstreamBaseUri = fmt::format("http://localhost:{}", UpstreamCfg.Port);
const std::string Bucket = "AutoTestDummy";
- const size_t PackageCount = 500;
- const size_t NumAttachments = 7;
- const size_t BufferSizeMultiplier = (1024 * 1024) / NumAttachments;
+ const size_t NumAttachments = 7;
- auto GenerateAttachment = [BufferSizeMultiplier](size_t DataIndexOffset, size_t ValueIndex) {
- const size_t NumBytes = (ValueIndex + 1) * BufferSizeMultiplier * sizeof(uint32_t);
+ auto GenerateAttachment = [](size_t DataIndexOffset, size_t ValueIndex) {
+ const size_t NumBytes = (1ull << (ValueIndex + 1) * 3) * sizeof(uint32_t);
IoBuffer ValueContents(NumBytes);
uint8_t* DataPtr = reinterpret_cast<uint8_t*>(ValueContents.MutableData());
@@ -2408,139 +2466,295 @@ TEST_CASE("zcache.rpc.upstream")
return ValueContents;
};
- auto GenerateCacheRecord = [](IoHash Hash, const std::string& Bucket, const std::vector<CompressedBuffer>& Attachments) {
- CbObjectWriter Builder;
- Builder.BeginObject("key"sv);
- Builder << "Bucket"sv << Bucket << "Hash"sv << Hash;
- Builder.EndObject();
- Builder.BeginArray("Values"sv);
- for (size_t AttachmentIndex = 0; AttachmentIndex < Attachments.size(); ++AttachmentIndex)
- {
- Builder.BeginObject();
-
- std::string ValueName = fmt::format("%d", AttachmentIndex);
- Oid ValueId = Oid::FromMemory(IoHash::HashBuffer(ValueName.data(), ValueName.size() * sizeof(ValueName.data()[0])).Hash);
- Builder.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Attachments[AttachmentIndex].GetRawHash()));
- Builder.AddObjectId("Id"sv, ValueId);
- Builder.AddInteger("RawSize"sv, Attachments[AttachmentIndex].GetRawSize());
- Builder.EndObject();
+ auto AppendCacheRecord =
+ [](CbPackage& Package, CbWriter& Writer, const zen::CacheKey& CacheKey, const IoBuffer& Payload, CachePolicy RecordPolicy) {
+ CompressedBuffer Value = CompressedBuffer::Compress(SharedBuffer(Payload));
+ CbAttachment Attachment(Value);
+
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Record"sv);
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << CacheKey.Bucket << "Hash"sv << CacheKey.Hash;
+ }
+ Writer.EndObject();
+ Writer.BeginArray("Values"sv);
+ {
+ Writer.BeginObject();
+ {
+ Writer.AddObjectId("Id"sv, Oid::NewOid());
+ Writer.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Value.GetRawHash()));
+ Writer.AddInteger("RawSize"sv, Value.GetRawSize());
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Writer.SetName("Policy"sv);
+ CacheRecordPolicy(RecordPolicy).Save(Writer);
+ }
+ Writer.EndObject();
+
+ Package.AddAttachment(Attachment);
+ };
+
+ auto CreateCachePackage = [&AppendCacheRecord](std::string_view Bucket,
+ const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& Attachments) -> CbPackage {
+ CbPackage Package;
+ CbWriter Writer;
+
+ Writer.BeginObject();
+ {
+ Writer << "Method"sv
+ << "PutCacheRecords"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ for (const auto& Entry : Attachments)
+ {
+ const IoHash& KeyHash = Entry.first;
+ const IoBuffer& Attachment = Entry.second;
+ const CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
+ AppendCacheRecord(Package, Writer, CacheKey, Attachment, CachePolicy::Default);
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
}
- Builder.EndArray();
- return Builder.Save();
+ Writer.EndObject();
+ Package.SetObject(Writer.Save().AsObject());
+
+ return Package;
+ };
+
+ struct CacheResult
+ {
+ IoBuffer Response;
+ int64_t Bytes = {};
+ double ElapsedSeconds = {};
+ int32_t ErrorCode = {};
+ std::string Reason;
+ bool Success = false;
};
- auto GeneratePackage = [&GenerateAttachment,
- &GenerateCacheRecord](const std::string Bucket, size_t NumAttachments, size_t DataIndexOffset) {
- IoHashStream Hasher;
+ auto InvokeObjectViewRPC = [](std::string_view BaseUri, const CbObjectView& Request) -> CacheResult {
+ ExtendableStringBuilder<256> Uri;
+ Uri << BaseUri << "/z$/$rpc";
+
+ BinaryWriter Body;
+ Request.CopyTo(Body);
- std::vector<CompressedBuffer> CompressedValues;
- CompressedValues.reserve(NumAttachments);
- for (size_t AttachmentIndex = 0; AttachmentIndex < NumAttachments; ++AttachmentIndex)
+ cpr::Session Session;
+ Session.SetBody({});
+ Session.SetHeader({});
+ Session.SetConnectTimeout(0);
+ Session.SetTimeout(0);
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}});
+ Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()});
+
+ cpr::Response Response = Session.Post();
+ // ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
{
- IoBuffer ValueContents = GenerateAttachment(BufferSizeMultiplier * DataIndexOffset, AttachmentIndex);
- Hasher.Append(ValueContents.GetView());
- auto CompressedData = CompressedBuffer::Compress(SharedBuffer(ValueContents));
- size_t CompressedSize = CompressedData.GetCompressedSize();
- CHECK(CompressedSize > 0);
- CompressedValues.emplace_back(CompressedData);
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
}
- IoHash Key = Hasher.GetHash();
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- CbObject CacheRecord = GenerateCacheRecord(Key, Bucket, CompressedValues);
+ return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ };
- CbPackage Package;
- Package.SetObject(CacheRecord);
- for (const auto& Value : CompressedValues)
+ auto InvokePackageRPC = [](std::string_view BaseUri, const CbPackage& Request) -> CacheResult {
+ ExtendableStringBuilder<256> Uri;
+ Uri << BaseUri << "/z$/$rpc";
+
+ SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten();
+
+ cpr::Session Session;
+ Session.SetBody({});
+ Session.SetHeader({});
+ // Session.SetConnectTimeout(0);
+ // Session.SetTimeout(0);
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}});
+ Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()});
+
+ cpr::Response Response = Session.Post();
+ // ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
{
- Package.AddAttachment(zen::CbAttachment(Value));
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
}
- return Package;
- };
- // auto GeneratePackages =
- // [&GeneratePackage](const std::string Bucket, size_t PackageCount, size_t NumAttachments, size_t DataIndexOffset) {
- // std::vector<CbPackage> Packages;
- // Packages.reserve(PackageCount);
- // for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex)
- // {
- // CbPackage Package = GeneratePackage(Bucket, NumAttachments, PackageIndex + DataIndexOffset);
- // Packages.emplace_back(Package);
- // }
- // return Packages;
- // };
-
- const size_t Iterations = 50;
- std::vector<IoHash> AllPackages;
- AllPackages.reserve(PackageCount * Iterations);
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- WorkerThreadPool WorkerPool(64);
- for (size_t Iteration = 0; Iteration < Iterations; ++Iteration)
- {
- std::vector<std::pair<IoHash, std::unordered_set<BLAKE3, BLAKE3::Hasher> > > SentPackages;
- SentPackages.resize(PackageCount);
+ return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ };
+
+ auto CreateGetCacheRecord = [](std::string_view Bucket,
+ IoHash Key,
+ std::unordered_set<IoHash, IoHash::Hasher> AttachmentKeys,
+ zen::CachePolicy Policy,
+ BinaryWriter& Body) {
+ using namespace zen;
+ CbObjectWriter Request;
+ Request << "Method"sv
+ << "GetCacheRecords"sv;
+ Request.BeginObject("Params"sv);
{
- std::vector<uint64_t> Timers;
- Timers.resize(PackageCount);
- std::atomic<size_t> WorkCompleted = 0;
- size_t ItemIndex = 0;
- Stopwatch PutRemoteTimer;
- const auto _ = MakeGuard([&AllPackages, &PutRemoteTimer, &Timers] {
- uint64_t ElapsedUS = PutRemoteTimer.GetElapsedTimeUs();
- std::sort(Timers.begin(), Timers.end());
- uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0;
- uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0;
- ZEN_INFO("put #{} items in remote in {}, worst {}, best {}",
- AllPackages.size(),
- NiceLatencyNs(ElapsedUS * 1000),
- NiceLatencyNs(WorstUS * 1000),
- NiceLatencyNs(BestUS * 1000));
- });
- for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex)
+ Request << "DefaultPolicy"sv << WriteToString<128>(Policy);
+ Request.BeginArray("Requests"sv);
+ Request.BeginObject();
{
- WorkerPool.ScheduleWork([&GeneratePackage,
- &PackageIndex,
- &Iteration,
- &UpstreamBaseUri,
- &Bucket,
- &SentPackages,
- &WorkCompleted,
- &Timers,
- ItemIndex]() {
- CbPackage Package = GeneratePackage(Bucket, NumAttachments, PackageIndex + Iteration * PackageCount);
- zen::IoHash Key = IoHash::HashBuffer(Package.GetObject().GetBuffer().GetView());
- zen::BinaryWriter MemStream;
- Package.Save(MemStream);
-
- cpr::Response Result;
+ Request.BeginObject("Key"sv);
+ {
+ Request << "Bucket"sv << Bucket << "Hash"sv << Key;
+ }
+ Request.EndObject();
+ }
+ Request.EndObject();
+ for (const auto& AttachmentKey : AttachmentKeys)
+ {
+ Request.BeginObject();
+ {
+ Request.BeginObject("Key"sv);
{
- Stopwatch Timer;
- const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); });
- Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamBaseUri, Bucket, Key)},
- cpr::Body{(const char*)MemStream.Data(), MemStream.Size()},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ Request << "Bucket"sv << Bucket << "Hash"sv << AttachmentKey;
}
- CHECK(Result.status_code == 201);
- std::unordered_set<BLAKE3, BLAKE3::Hasher> Attachments;
- Attachments.reserve(Package.GetAttachments().size());
- for (const CbAttachment& Attachment : Package.GetAttachments())
+ Request.EndObject();
+ }
+ Request.EndObject();
+ }
+ Request.EndArray();
+ }
+ Request.EndObject();
+
+ Request.Save(Body);
+ };
+
+ auto ToIoBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
+ zen::BinaryWriter MemStream;
+ Package.Save(MemStream);
+ return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ };
+
+ const size_t PackageCount = 250;
+ const size_t Iterations = 20;
+
+ std::vector<std::pair<IoHash, std::unordered_set<IoHash, IoHash::Hasher> > > SentPackages;
+ SentPackages.resize(PackageCount * Iterations);
+
+ ZEN_INFO("generating #{} packages", Iterations * PackageCount);
+ WorkerThreadPool WorkerPool(32);
+ for (size_t Iteration = 0; Iteration < Iterations; ++Iteration)
+ {
+ size_t ItemOffset = Iteration * PackageCount;
+ std::vector<uint64_t> Timers;
+ Timers.resize(PackageCount);
+ size_t ItemIndex = 0;
+ std::atomic<size_t> WorkCompleted = 0;
+ Stopwatch PutRemoteTimer;
+ const auto _ = MakeGuard([PackageCount, &PutRemoteTimer, &Timers, Iteration, Iterations] {
+ uint64_t ElapsedUS = PutRemoteTimer.GetElapsedTimeUs();
+ std::sort(Timers.begin(), Timers.end());
+ uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0;
+ uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0;
+ ZEN_INFO("put #{} items in remote in {}, worst {}, best {}, {}/{}",
+ PackageCount,
+ NiceLatencyNs(ElapsedUS * 1000),
+ NiceLatencyNs(WorstUS * 1000),
+ NiceLatencyNs(BestUS * 1000),
+ Iteration + 1,
+ Iterations);
+ });
+ for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex)
+ {
+ WorkerPool.ScheduleWork([&CreateCachePackage,
+ &GenerateAttachment,
+ &InvokePackageRPC,
+ &PackageIndex,
+ &Iteration,
+ &UpstreamBaseUri,
+ &Bucket,
+ &SentPackages,
+ &WorkCompleted,
+ &Timers,
+ ItemIndex,
+ ItemOffset]() {
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Attachments;
+ Attachments.reserve(NumAttachments);
+ for (size_t AttachmentIdx = 0; AttachmentIdx < NumAttachments; ++AttachmentIdx)
+ {
+ IoBuffer Attachment =
+ GenerateAttachment((Iteration * PackageCount * NumAttachments) + (PackageCount * NumAttachments), AttachmentIdx);
+ IoHash Key = IoHash::HashBuffer(Attachment.GetView());
+
+ Attachments.insert_or_assign(Key, Attachment);
+ }
+
+ CbPackage Package = CreateCachePackage(Bucket, Attachments);
+
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); });
+ CacheResult Result = InvokePackageRPC(UpstreamBaseUri, Package);
+ if (!Result.Success)
{
- Attachments.insert(Attachment.AsCompressedBinary().GetRawHash());
+ Sleep(1000);
+ Result = InvokePackageRPC(UpstreamBaseUri, Package);
}
- SentPackages[ItemIndex] = {Key, Attachments};
- WorkCompleted.fetch_add(1);
- });
- ItemIndex++;
- }
- while (WorkCompleted < PackageCount)
- {
- Sleep(1);
- }
+ CHECK(Result.Success);
+ }
+
+ std::unordered_set<IoHash, IoHash::Hasher> AttachmentKeys;
+ AttachmentKeys.reserve(Attachments.size());
+ for (const auto& Entry : Attachments)
+ {
+ AttachmentKeys.insert(Entry.first);
+ }
+ IoHash Key;
+ ((uint32_t*)(Key.Hash))[0] = (uint32_t)(Iteration * PackageCount + PackageIndex);
+ SentPackages[ItemIndex + ItemOffset] = make_pair(Key, AttachmentKeys);
+ WorkCompleted.fetch_add(1);
+ });
+ ItemIndex++;
}
+ while (WorkCompleted < PackageCount)
+ {
+ Sleep(1);
+ }
+ }
+
+ SpawnServer(LocalServer, LocalCfg);
+ LocalServer.WaitUntilReady();
+ struct FetchInfo
+ {
+ uint64_t FetchTime;
+ uint64_t FetchSize;
+ IoHash FetchKey;
+ };
+
+ std::vector<std::pair<IoHash, std::unordered_set<IoHash, IoHash::Hasher> > > AllPackages;
+ AllPackages.reserve(PackageCount * Iterations);
+ for (size_t Iteration = 0; Iteration < Iterations; ++Iteration)
+ {
+ if (!AllPackages.empty())
{
- std::vector<uint64_t> Timers;
+ std::vector<FetchInfo> Timers;
Timers.resize(AllPackages.size());
std::atomic<size_t> WorkCompleted = 0;
size_t ItemIndex = 0;
@@ -2548,28 +2762,60 @@ TEST_CASE("zcache.rpc.upstream")
Stopwatch HotLocalTimer;
const auto _ = MakeGuard([&AllPackages, &HotLocalTimer, &Timers] {
uint64_t ElapsedUS = HotLocalTimer.GetElapsedTimeUs();
- std::sort(Timers.begin(), Timers.end());
- uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0;
- uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0;
- ZEN_INFO("fetched #{} hot local, hot remote items in {}, worst {}, best {}",
+ std::sort(Timers.begin(), Timers.end(), [](const struct FetchInfo& LHS, const struct FetchInfo& RHS) {
+ return LHS.FetchTime < RHS.FetchTime;
+ });
+ FetchInfo Worst = Timers.size() > 0 ? Timers.back() : FetchInfo{};
+ FetchInfo Best = Timers.size() > 0 ? Timers.front() : FetchInfo{};
+ FetchInfo Median = Timers.size() > 0 ? Timers[Timers.size() / 2] : FetchInfo{};
+ ZEN_INFO("fetched #{} hot local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}",
AllPackages.size(),
NiceLatencyNs(ElapsedUS * 1000),
- NiceLatencyNs(WorstUS * 1000),
- NiceLatencyNs(BestUS * 1000));
+ Worst.FetchKey.ToHexString().substr(0, 6),
+ NiceBytes(Worst.FetchSize),
+ NiceLatencyNs(Worst.FetchTime * 1000),
+ Best.FetchKey.ToHexString().substr(0, 6),
+ NiceBytes(Best.FetchSize),
+ NiceLatencyNs(Best.FetchTime * 1000),
+ Median.FetchKey.ToHexString().substr(0, 6),
+ NiceBytes(Median.FetchSize),
+ NiceLatencyNs(Median.FetchTime * 1000));
});
- for (const IoHash& Key : AllPackages)
+ for (const auto& Entry : AllPackages)
{
- WorkerPool.ScheduleWork([&LocalBaseUri, &Bucket, &Key, &WorkCompleted, &Timers, ItemIndex]() {
- cpr::Response Result;
- {
- Stopwatch Timer;
- const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); });
- Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default", LocalBaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- }
- CHECK(IsHttpSuccessCode(Result.status_code));
- WorkCompleted.fetch_add(1);
- });
+ WorkerPool.ScheduleWork(
+ [&LocalBaseUri, &Bucket, &Entry, &WorkCompleted, &Timers, ItemIndex, &CreateGetCacheRecord, &InvokeObjectViewRPC]() {
+ const IoHash& Key = Entry.first;
+ BinaryWriter Body;
+ CreateGetCacheRecord(Bucket, Key, Entry.second, CachePolicy::Default, Body);
+ auto BodyView = CbObjectView(Body.GetData());
+
+ CacheResult Result;
+ {
+ Stopwatch Timer;
+ const auto _ =
+ MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex].FetchTime = Timer.GetElapsedTimeUs(); });
+ Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
+ while (!Result.Success && Result.ErrorCode == 1)
+ {
+ Sleep(1000);
+ Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
+ }
+ CHECK(Result.Success);
+ }
+
+ CbPackage Response;
+ CHECK(Response.TryLoad(Result.Response));
+ CHECK(!Response.IsNull());
+ uint64_t Size = Response.GetObject().GetSize();
+ for (const auto& Attachment : Response.GetAttachments())
+ {
+ Size += Attachment.AsCompressedBinary().GetCompressedSize();
+ }
+ Timers[ItemIndex].FetchSize = Size;
+ Timers[ItemIndex].FetchKey = Key;
+ WorkCompleted.fetch_add(1);
+ });
ItemIndex++;
}
while (WorkCompleted < AllPackages.size())
@@ -2579,54 +2825,77 @@ TEST_CASE("zcache.rpc.upstream")
}
{
- std::vector<uint64_t> Timers;
- Timers.resize(SentPackages.size());
+ std::vector<FetchInfo> Timers;
+ Timers.resize(PackageCount);
std::atomic<size_t> WorkCompleted = 0;
size_t ItemIndex = 0;
Stopwatch HotRemoteTimer;
- const auto _ = MakeGuard([&SentPackages, &HotRemoteTimer, &Timers] {
+ const auto _ = MakeGuard([&HotRemoteTimer, &Timers] {
uint64_t ElapsedUS = HotRemoteTimer.GetElapsedTimeUs();
- std::sort(Timers.begin(), Timers.end());
- uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0;
- uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0;
- ZEN_INFO("fetched #{} cold local, hot remote items in {}, worst {}, best {}",
- SentPackages.size(),
+ std::sort(Timers.begin(), Timers.end(), [](const struct FetchInfo& LHS, const struct FetchInfo& RHS) {
+ return LHS.FetchTime < RHS.FetchTime;
+ });
+ FetchInfo Worst = Timers.size() > 0 ? Timers.back() : FetchInfo{};
+ FetchInfo Best = Timers.size() > 0 ? Timers.front() : FetchInfo{};
+ FetchInfo Median = Timers.size() > 0 ? Timers[Timers.size() / 2] : FetchInfo{};
+ ZEN_INFO("fetched #{} cold local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}",
+ Timers.size(),
NiceLatencyNs(ElapsedUS * 1000),
- NiceLatencyNs(WorstUS * 1000),
- NiceLatencyNs(BestUS * 1000));
+ Worst.FetchKey.ToHexString().substr(0, 6),
+ NiceBytes(Worst.FetchSize),
+ NiceLatencyNs(Worst.FetchTime * 1000),
+ Best.FetchKey.ToHexString().substr(0, 6),
+ NiceBytes(Best.FetchSize),
+ NiceLatencyNs(Best.FetchTime * 1000),
+ Median.FetchKey.ToHexString().substr(0, 6),
+ NiceBytes(Median.FetchSize),
+ NiceLatencyNs(Median.FetchTime * 1000));
});
- for (const auto& Entry : SentPackages)
+ for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex)
{
- WorkerPool.ScheduleWork([&Entry, &LocalBaseUri, &Bucket, &WorkCompleted, &Timers, ItemIndex]() {
- const IoHash& Key = Entry.first;
- const std::unordered_set<BLAKE3, BLAKE3::Hasher>& ExpectedAttachments = Entry.second;
- // Get package with attachments
- {
- cpr::Response Result;
+ const auto& Entry = SentPackages[Iteration * PackageCount + PackageIndex];
+
+ WorkerPool.ScheduleWork(
+ [&LocalBaseUri, &Bucket, &Entry, &WorkCompleted, &Timers, ItemIndex, &CreateGetCacheRecord, &InvokeObjectViewRPC]() {
+ const IoHash& Key = Entry.first;
+
+ BinaryWriter Body;
+ CreateGetCacheRecord(Bucket, Key, Entry.second, CachePolicy::Default, Body);
+ auto BodyView = CbObjectView(Body.GetData());
+
+ CacheResult Result;
{
Stopwatch Timer;
- const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); });
- Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default", LocalBaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ const auto _ =
+ MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex].FetchTime = Timer.GetElapsedTimeUs(); });
+ Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
+ Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
+ while (!Result.Success && Result.ErrorCode == 1)
+ {
+ Sleep(1000);
+ Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
+ }
+ CHECK(Result.Success);
}
- CHECK(IsHttpSuccessCode(Result.status_code));
- IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size());
- CbPackage ResponsePackage;
- CHECK(ResponsePackage.TryLoad(Buffer));
- std::unordered_set<BLAKE3, BLAKE3::Hasher> ReceivedAttachments;
- for (const auto& Attachement : ResponsePackage.GetAttachments())
+
+ CbPackage Response;
+ CHECK(Response.TryLoad(Result.Response));
+ CHECK(!Response.IsNull());
+ uint64_t Size = Response.GetObject().GetSize();
+ for (const auto& Attachment : Response.GetAttachments())
{
- ReceivedAttachments.insert(Attachement.AsCompressedBinary().GetRawHash());
+ Size += Attachment.AsCompressedBinary().GetCompressedSize();
}
- CHECK(ReceivedAttachments == ExpectedAttachments);
- }
- WorkCompleted.fetch_add(1);
- });
+ Timers[ItemIndex].FetchSize = Size;
+ Timers[ItemIndex].FetchKey = Key;
+ WorkCompleted.fetch_add(1);
+ });
+
ItemIndex++;
- AllPackages.push_back(Entry.first);
+ AllPackages.push_back(Entry);
}
- while (WorkCompleted < SentPackages.size())
+ while (WorkCompleted < PackageCount)
{
Sleep(1);
}