diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-28 14:37:36 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-06-13 12:07:23 +0200 |
| commit | 172f54166b9c8fd259814fcf4377210bd523c9ce (patch) | |
| tree | 84f5d4083c1206108a4f1147cf80759821776516 | |
| parent | TMP: run tests in release build (diff) | |
| download | zen-de/upstream-server-tests.tar.xz zen-de/upstream-server-tests.zip | |
cleanupde/upstream-server-tests
| -rw-r--r-- | zencore/include/zencore/blockingqueue.h | 20 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 136 |
2 files changed, 96 insertions, 60 deletions
diff --git a/zencore/include/zencore/blockingqueue.h b/zencore/include/zencore/blockingqueue.h index f92df5a54..4e2687003 100644 --- a/zencore/include/zencore/blockingqueue.h +++ b/zencore/include/zencore/blockingqueue.h @@ -36,18 +36,20 @@ public: } std::unique_lock Lock(m_Lock); - m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); - - if (!m_Queue.empty()) + if (m_Queue.empty()) { - Item = std::move(m_Queue.front()); - m_Queue.pop_front(); - m_Size--; - - return true; + m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); + } + if (m_Queue.empty()) + { + return false; } - return false; + Item = std::move(m_Queue.front()); + m_Queue.pop_front(); + m_Size--; + + return true; } void CompleteAdding() diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index c91af976c..8c7b42d2d 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -42,6 +42,7 @@ ZEN_THIRD_PARTY_INCLUDES_START #undef GetObject ZEN_THIRD_PARTY_INCLUDES_END +#include <algorithm> #include <atomic> #include <filesystem> #include <map> @@ -2450,7 +2451,7 @@ TEST_CASE("zcache.rpc.upstream") const size_t NumAttachments = 7; auto GenerateAttachment = [](size_t DataIndexOffset, size_t ValueIndex) { - const size_t NumBytes = (1ull << (ValueIndex + 1) * 3) * sizeof(uint32_t); + const size_t NumBytes = (1ull << (ValueIndex + 1) * 3); IoBuffer ValueContents(NumBytes); uint8_t* DataPtr = reinterpret_cast<uint8_t*>(ValueContents.MutableData()); @@ -2587,7 +2588,19 @@ TEST_CASE("zcache.rpc.upstream") Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}); - Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()}); + size_t Offset = 0; + auto MessageReader = [&Message, &Offset](char* buffer, size_t& size, intptr_t userdata) { + SharedBuffer* MessagePtr = reinterpret_cast<SharedBuffer*>(userdata); + size_t DataSize = MessagePtr->GetSize(); + const char* DataPtr = reinterpret_cast<const char*>(MessagePtr->GetData()); + size = std::min(size, DataSize - Offset); + memcpy(buffer, &DataPtr[Offset], size); + Offset += size; + return true; + }; + Session.SetOption( + cpr::ReadCallback{gsl::narrow<cpr::cpr_off_t>(Message.GetSize()), MessageReader, reinterpret_cast<intptr_t>(&Message)}); + // Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()}); cpr::Response Response = Session.Post(); // ZEN_DEBUG("POST {}", Response); @@ -2645,14 +2658,8 @@ TEST_CASE("zcache.rpc.upstream") Request.Save(Body); }; - auto ToIoBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { - zen::BinaryWriter MemStream; - Package.Save(MemStream); - return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - }; - - const size_t PackageCount = 250; - const size_t Iterations = 20; + const size_t PackageCount = 400; + const size_t Iterations = 10; std::vector<std::pair<IoHash, std::unordered_set<IoHash, IoHash::Hasher> > > SentPackages; SentPackages.resize(PackageCount * Iterations); @@ -2668,17 +2675,24 @@ TEST_CASE("zcache.rpc.upstream") std::atomic<size_t> WorkCompleted = 0; Stopwatch PutRemoteTimer; const auto _ = MakeGuard([PackageCount, &PutRemoteTimer, &Timers, Iteration, Iterations] { - uint64_t ElapsedUS = PutRemoteTimer.GetElapsedTimeUs(); + uint64_t ElapsedNS = PutRemoteTimer.GetElapsedTimeUs() * 1000; std::sort(Timers.begin(), Timers.end()); - uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0; - uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0; - ZEN_INFO("put #{} items in remote in {}, worst {}, best {}, {}/{}", + uint64_t WorstNS = Timers.size() > 0 ? Timers.back() : 0; + uint64_t BestNS = Timers.size() > 0 ? Timers.front() : 0; + uint64_t AvgNs = 0; + for (uint64_t T : Timers) + { + AvgNs += T; + } + AvgNs /= Timers.size(); + ZEN_INFO("put #{} items in remote in {}, worst {}, best {}, {}/{}, avg {}", PackageCount, - NiceLatencyNs(ElapsedUS * 1000), - NiceLatencyNs(WorstUS * 1000), - NiceLatencyNs(BestUS * 1000), + NiceLatencyNs(ElapsedNS), + NiceLatencyNs(WorstNS), + NiceLatencyNs(BestNS), Iteration + 1, - Iterations); + Iterations, + NiceLatencyNs(AvgNs)); }); for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex) { @@ -2707,16 +2721,19 @@ TEST_CASE("zcache.rpc.upstream") CbPackage Package = CreateCachePackage(Bucket, Attachments); + CacheResult Result; + Result.Success = false; { - Stopwatch Timer; - const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); }); - CacheResult Result = InvokePackageRPC(UpstreamBaseUri, Package); - if (!Result.Success) + while (!Result.Success) { - Sleep(1000); Result = InvokePackageRPC(UpstreamBaseUri, Package); + if (!Result.Success) + { + Sleep(1000); + } } CHECK(Result.Success); + Timers[ItemIndex] = static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000.0); } std::unordered_set<IoHash, IoHash::Hasher> AttachmentKeys; @@ -2743,7 +2760,7 @@ TEST_CASE("zcache.rpc.upstream") struct FetchInfo { - uint64_t FetchTime; + uint64_t FetchTimeNS; uint64_t FetchSize; IoHash FetchKey; }; @@ -2752,6 +2769,7 @@ TEST_CASE("zcache.rpc.upstream") AllPackages.reserve(PackageCount * Iterations); for (size_t Iteration = 0; Iteration < Iterations; ++Iteration) { + // Hot Local, Hot Remote if (!AllPackages.empty()) { std::vector<FetchInfo> Timers; @@ -2761,25 +2779,32 @@ TEST_CASE("zcache.rpc.upstream") Stopwatch HotLocalTimer; const auto _ = MakeGuard([&AllPackages, &HotLocalTimer, &Timers] { - uint64_t ElapsedUS = HotLocalTimer.GetElapsedTimeUs(); + uint64_t ElapsedNS = HotLocalTimer.GetElapsedTimeUs() * 1000; std::sort(Timers.begin(), Timers.end(), [](const struct FetchInfo& LHS, const struct FetchInfo& RHS) { - return LHS.FetchTime < RHS.FetchTime; + return LHS.FetchTimeNS < RHS.FetchTimeNS; }); FetchInfo Worst = Timers.size() > 0 ? Timers.back() : FetchInfo{}; FetchInfo Best = Timers.size() > 0 ? Timers.front() : FetchInfo{}; FetchInfo Median = Timers.size() > 0 ? Timers[Timers.size() / 2] : FetchInfo{}; - ZEN_INFO("fetched #{} hot local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}", + uint64_t AvgNs = 0; + for (const FetchInfo& F : Timers) + { + AvgNs += F.FetchTimeNS; + } + AvgNs /= Timers.size(); + ZEN_INFO("fetched #{} hot local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}, avg {}", AllPackages.size(), - NiceLatencyNs(ElapsedUS * 1000), + NiceLatencyNs(ElapsedNS), Worst.FetchKey.ToHexString().substr(0, 6), NiceBytes(Worst.FetchSize), - NiceLatencyNs(Worst.FetchTime * 1000), + NiceLatencyNs(Worst.FetchTimeNS), Best.FetchKey.ToHexString().substr(0, 6), NiceBytes(Best.FetchSize), - NiceLatencyNs(Best.FetchTime * 1000), + NiceLatencyNs(Best.FetchTimeNS), Median.FetchKey.ToHexString().substr(0, 6), NiceBytes(Median.FetchSize), - NiceLatencyNs(Median.FetchTime * 1000)); + NiceLatencyNs(Median.FetchTimeNS), + NiceLatencyNs(AvgNs)); }); for (const auto& Entry : AllPackages) { @@ -2791,17 +2816,18 @@ TEST_CASE("zcache.rpc.upstream") auto BodyView = CbObjectView(Body.GetData()); CacheResult Result; + Result.Success = false; { - Stopwatch Timer; - const auto _ = - MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex].FetchTime = Timer.GetElapsedTimeUs(); }); - Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); - while (!Result.Success && Result.ErrorCode == 1) + while (!Result.Success) { - Sleep(1000); Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); + if (!Result.Success) + { + Sleep(1000); + } } CHECK(Result.Success); + Timers[ItemIndex].FetchTimeNS = static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000.0); } CbPackage Response; @@ -2824,6 +2850,7 @@ TEST_CASE("zcache.rpc.upstream") } } + // Cold local, Hot remote { std::vector<FetchInfo> Timers; Timers.resize(PackageCount); @@ -2832,25 +2859,32 @@ TEST_CASE("zcache.rpc.upstream") Stopwatch HotRemoteTimer; const auto _ = MakeGuard([&HotRemoteTimer, &Timers] { - uint64_t ElapsedUS = HotRemoteTimer.GetElapsedTimeUs(); + uint64_t ElapsedNS = HotRemoteTimer.GetElapsedTimeUs() * 1000; std::sort(Timers.begin(), Timers.end(), [](const struct FetchInfo& LHS, const struct FetchInfo& RHS) { - return LHS.FetchTime < RHS.FetchTime; + return LHS.FetchTimeNS < RHS.FetchTimeNS; }); FetchInfo Worst = Timers.size() > 0 ? Timers.back() : FetchInfo{}; FetchInfo Best = Timers.size() > 0 ? Timers.front() : FetchInfo{}; FetchInfo Median = Timers.size() > 0 ? Timers[Timers.size() / 2] : FetchInfo{}; - ZEN_INFO("fetched #{} cold local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}", + uint64_t AvgNs = 0; + for (const FetchInfo& F : Timers) + { + AvgNs += F.FetchTimeNS; + } + AvgNs /= Timers.size(); + ZEN_INFO("fetched #{} cold local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}, avg {}", Timers.size(), - NiceLatencyNs(ElapsedUS * 1000), + NiceLatencyNs(ElapsedNS), Worst.FetchKey.ToHexString().substr(0, 6), NiceBytes(Worst.FetchSize), - NiceLatencyNs(Worst.FetchTime * 1000), + NiceLatencyNs(Worst.FetchTimeNS), Best.FetchKey.ToHexString().substr(0, 6), NiceBytes(Best.FetchSize), - NiceLatencyNs(Best.FetchTime * 1000), + NiceLatencyNs(Best.FetchTimeNS), Median.FetchKey.ToHexString().substr(0, 6), NiceBytes(Median.FetchSize), - NiceLatencyNs(Median.FetchTime * 1000)); + NiceLatencyNs(Median.FetchTimeNS), + NiceLatencyNs(AvgNs)); }); for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex) { @@ -2865,18 +2899,18 @@ TEST_CASE("zcache.rpc.upstream") auto BodyView = CbObjectView(Body.GetData()); CacheResult Result; + Result.Success = false; { - Stopwatch Timer; - const auto _ = - MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex].FetchTime = Timer.GetElapsedTimeUs(); }); - Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); - Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); - while (!Result.Success && Result.ErrorCode == 1) + while (!Result.Success) { - Sleep(1000); Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); + if (!Result.Success) + { + Sleep(1000); + } } CHECK(Result.Success); + Timers[ItemIndex].FetchTimeNS = static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000.0); } CbPackage Response; |