aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-28 14:37:36 +0200
committerDan Engelbrecht <[email protected]>2022-06-13 12:07:23 +0200
commit172f54166b9c8fd259814fcf4377210bd523c9ce (patch)
tree84f5d4083c1206108a4f1147cf80759821776516
parentTMP: run tests in release build (diff)
downloadzen-de/upstream-server-tests.tar.xz
zen-de/upstream-server-tests.zip
-rw-r--r--zencore/include/zencore/blockingqueue.h20
-rw-r--r--zenserver-test/zenserver-test.cpp136
2 files changed, 96 insertions, 60 deletions
diff --git a/zencore/include/zencore/blockingqueue.h b/zencore/include/zencore/blockingqueue.h
index f92df5a54..4e2687003 100644
--- a/zencore/include/zencore/blockingqueue.h
+++ b/zencore/include/zencore/blockingqueue.h
@@ -36,18 +36,20 @@ public:
}
std::unique_lock Lock(m_Lock);
- m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); });
-
- if (!m_Queue.empty())
+ if (m_Queue.empty())
{
- Item = std::move(m_Queue.front());
- m_Queue.pop_front();
- m_Size--;
-
- return true;
+ m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); });
+ }
+ if (m_Queue.empty())
+ {
+ return false;
}
- return false;
+ Item = std::move(m_Queue.front());
+ m_Queue.pop_front();
+ m_Size--;
+
+ return true;
}
void CompleteAdding()
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index c91af976c..8c7b42d2d 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -42,6 +42,7 @@ ZEN_THIRD_PARTY_INCLUDES_START
#undef GetObject
ZEN_THIRD_PARTY_INCLUDES_END
+#include <algorithm>
#include <atomic>
#include <filesystem>
#include <map>
@@ -2450,7 +2451,7 @@ TEST_CASE("zcache.rpc.upstream")
const size_t NumAttachments = 7;
auto GenerateAttachment = [](size_t DataIndexOffset, size_t ValueIndex) {
- const size_t NumBytes = (1ull << (ValueIndex + 1) * 3) * sizeof(uint32_t);
+ const size_t NumBytes = (1ull << (ValueIndex + 1) * 3);
IoBuffer ValueContents(NumBytes);
uint8_t* DataPtr = reinterpret_cast<uint8_t*>(ValueContents.MutableData());
@@ -2587,7 +2588,19 @@ TEST_CASE("zcache.rpc.upstream")
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}});
- Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()});
+ size_t Offset = 0;
+ auto MessageReader = [&Message, &Offset](char* buffer, size_t& size, intptr_t userdata) {
+ SharedBuffer* MessagePtr = reinterpret_cast<SharedBuffer*>(userdata);
+ size_t DataSize = MessagePtr->GetSize();
+ const char* DataPtr = reinterpret_cast<const char*>(MessagePtr->GetData());
+ size = std::min(size, DataSize - Offset);
+ memcpy(buffer, &DataPtr[Offset], size);
+ Offset += size;
+ return true;
+ };
+ Session.SetOption(
+ cpr::ReadCallback{gsl::narrow<cpr::cpr_off_t>(Message.GetSize()), MessageReader, reinterpret_cast<intptr_t>(&Message)});
+ // Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()});
cpr::Response Response = Session.Post();
// ZEN_DEBUG("POST {}", Response);
@@ -2645,14 +2658,8 @@ TEST_CASE("zcache.rpc.upstream")
Request.Save(Body);
};
- auto ToIoBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
- zen::BinaryWriter MemStream;
- Package.Save(MemStream);
- return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- };
-
- const size_t PackageCount = 250;
- const size_t Iterations = 20;
+ const size_t PackageCount = 400;
+ const size_t Iterations = 10;
std::vector<std::pair<IoHash, std::unordered_set<IoHash, IoHash::Hasher> > > SentPackages;
SentPackages.resize(PackageCount * Iterations);
@@ -2668,17 +2675,24 @@ TEST_CASE("zcache.rpc.upstream")
std::atomic<size_t> WorkCompleted = 0;
Stopwatch PutRemoteTimer;
const auto _ = MakeGuard([PackageCount, &PutRemoteTimer, &Timers, Iteration, Iterations] {
- uint64_t ElapsedUS = PutRemoteTimer.GetElapsedTimeUs();
+ uint64_t ElapsedNS = PutRemoteTimer.GetElapsedTimeUs() * 1000;
std::sort(Timers.begin(), Timers.end());
- uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0;
- uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0;
- ZEN_INFO("put #{} items in remote in {}, worst {}, best {}, {}/{}",
+ uint64_t WorstNS = Timers.size() > 0 ? Timers.back() : 0;
+ uint64_t BestNS = Timers.size() > 0 ? Timers.front() : 0;
+ uint64_t AvgNs = 0;
+ for (uint64_t T : Timers)
+ {
+ AvgNs += T;
+ }
+ AvgNs /= Timers.size();
+ ZEN_INFO("put #{} items in remote in {}, worst {}, best {}, {}/{}, avg {}",
PackageCount,
- NiceLatencyNs(ElapsedUS * 1000),
- NiceLatencyNs(WorstUS * 1000),
- NiceLatencyNs(BestUS * 1000),
+ NiceLatencyNs(ElapsedNS),
+ NiceLatencyNs(WorstNS),
+ NiceLatencyNs(BestNS),
Iteration + 1,
- Iterations);
+ Iterations,
+ NiceLatencyNs(AvgNs));
});
for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex)
{
@@ -2707,16 +2721,19 @@ TEST_CASE("zcache.rpc.upstream")
CbPackage Package = CreateCachePackage(Bucket, Attachments);
+ CacheResult Result;
+ Result.Success = false;
{
- Stopwatch Timer;
- const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); });
- CacheResult Result = InvokePackageRPC(UpstreamBaseUri, Package);
- if (!Result.Success)
+ while (!Result.Success)
{
- Sleep(1000);
Result = InvokePackageRPC(UpstreamBaseUri, Package);
+ if (!Result.Success)
+ {
+ Sleep(1000);
+ }
}
CHECK(Result.Success);
+ Timers[ItemIndex] = static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000.0);
}
std::unordered_set<IoHash, IoHash::Hasher> AttachmentKeys;
@@ -2743,7 +2760,7 @@ TEST_CASE("zcache.rpc.upstream")
struct FetchInfo
{
- uint64_t FetchTime;
+ uint64_t FetchTimeNS;
uint64_t FetchSize;
IoHash FetchKey;
};
@@ -2752,6 +2769,7 @@ TEST_CASE("zcache.rpc.upstream")
AllPackages.reserve(PackageCount * Iterations);
for (size_t Iteration = 0; Iteration < Iterations; ++Iteration)
{
+ // Hot Local, Hot Remote
if (!AllPackages.empty())
{
std::vector<FetchInfo> Timers;
@@ -2761,25 +2779,32 @@ TEST_CASE("zcache.rpc.upstream")
Stopwatch HotLocalTimer;
const auto _ = MakeGuard([&AllPackages, &HotLocalTimer, &Timers] {
- uint64_t ElapsedUS = HotLocalTimer.GetElapsedTimeUs();
+ uint64_t ElapsedNS = HotLocalTimer.GetElapsedTimeUs() * 1000;
std::sort(Timers.begin(), Timers.end(), [](const struct FetchInfo& LHS, const struct FetchInfo& RHS) {
- return LHS.FetchTime < RHS.FetchTime;
+ return LHS.FetchTimeNS < RHS.FetchTimeNS;
});
FetchInfo Worst = Timers.size() > 0 ? Timers.back() : FetchInfo{};
FetchInfo Best = Timers.size() > 0 ? Timers.front() : FetchInfo{};
FetchInfo Median = Timers.size() > 0 ? Timers[Timers.size() / 2] : FetchInfo{};
- ZEN_INFO("fetched #{} hot local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}",
+ uint64_t AvgNs = 0;
+ for (const FetchInfo& F : Timers)
+ {
+ AvgNs += F.FetchTimeNS;
+ }
+ AvgNs /= Timers.size();
+ ZEN_INFO("fetched #{} hot local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}, avg {}",
AllPackages.size(),
- NiceLatencyNs(ElapsedUS * 1000),
+ NiceLatencyNs(ElapsedNS),
Worst.FetchKey.ToHexString().substr(0, 6),
NiceBytes(Worst.FetchSize),
- NiceLatencyNs(Worst.FetchTime * 1000),
+ NiceLatencyNs(Worst.FetchTimeNS),
Best.FetchKey.ToHexString().substr(0, 6),
NiceBytes(Best.FetchSize),
- NiceLatencyNs(Best.FetchTime * 1000),
+ NiceLatencyNs(Best.FetchTimeNS),
Median.FetchKey.ToHexString().substr(0, 6),
NiceBytes(Median.FetchSize),
- NiceLatencyNs(Median.FetchTime * 1000));
+ NiceLatencyNs(Median.FetchTimeNS),
+ NiceLatencyNs(AvgNs));
});
for (const auto& Entry : AllPackages)
{
@@ -2791,17 +2816,18 @@ TEST_CASE("zcache.rpc.upstream")
auto BodyView = CbObjectView(Body.GetData());
CacheResult Result;
+ Result.Success = false;
{
- Stopwatch Timer;
- const auto _ =
- MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex].FetchTime = Timer.GetElapsedTimeUs(); });
- Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
- while (!Result.Success && Result.ErrorCode == 1)
+ while (!Result.Success)
{
- Sleep(1000);
Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
+ if (!Result.Success)
+ {
+ Sleep(1000);
+ }
}
CHECK(Result.Success);
+ Timers[ItemIndex].FetchTimeNS = static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000.0);
}
CbPackage Response;
@@ -2824,6 +2850,7 @@ TEST_CASE("zcache.rpc.upstream")
}
}
+ // Cold local, Hot remote
{
std::vector<FetchInfo> Timers;
Timers.resize(PackageCount);
@@ -2832,25 +2859,32 @@ TEST_CASE("zcache.rpc.upstream")
Stopwatch HotRemoteTimer;
const auto _ = MakeGuard([&HotRemoteTimer, &Timers] {
- uint64_t ElapsedUS = HotRemoteTimer.GetElapsedTimeUs();
+ uint64_t ElapsedNS = HotRemoteTimer.GetElapsedTimeUs() * 1000;
std::sort(Timers.begin(), Timers.end(), [](const struct FetchInfo& LHS, const struct FetchInfo& RHS) {
- return LHS.FetchTime < RHS.FetchTime;
+ return LHS.FetchTimeNS < RHS.FetchTimeNS;
});
FetchInfo Worst = Timers.size() > 0 ? Timers.back() : FetchInfo{};
FetchInfo Best = Timers.size() > 0 ? Timers.front() : FetchInfo{};
FetchInfo Median = Timers.size() > 0 ? Timers[Timers.size() / 2] : FetchInfo{};
- ZEN_INFO("fetched #{} cold local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}",
+ uint64_t AvgNs = 0;
+ for (const FetchInfo& F : Timers)
+ {
+ AvgNs += F.FetchTimeNS;
+ }
+ AvgNs /= Timers.size();
+ ZEN_INFO("fetched #{} cold local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}, avg {}",
Timers.size(),
- NiceLatencyNs(ElapsedUS * 1000),
+ NiceLatencyNs(ElapsedNS),
Worst.FetchKey.ToHexString().substr(0, 6),
NiceBytes(Worst.FetchSize),
- NiceLatencyNs(Worst.FetchTime * 1000),
+ NiceLatencyNs(Worst.FetchTimeNS),
Best.FetchKey.ToHexString().substr(0, 6),
NiceBytes(Best.FetchSize),
- NiceLatencyNs(Best.FetchTime * 1000),
+ NiceLatencyNs(Best.FetchTimeNS),
Median.FetchKey.ToHexString().substr(0, 6),
NiceBytes(Median.FetchSize),
- NiceLatencyNs(Median.FetchTime * 1000));
+ NiceLatencyNs(Median.FetchTimeNS),
+ NiceLatencyNs(AvgNs));
});
for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex)
{
@@ -2865,18 +2899,18 @@ TEST_CASE("zcache.rpc.upstream")
auto BodyView = CbObjectView(Body.GetData());
CacheResult Result;
+ Result.Success = false;
{
- Stopwatch Timer;
- const auto _ =
- MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex].FetchTime = Timer.GetElapsedTimeUs(); });
- Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
- Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
- while (!Result.Success && Result.ErrorCode == 1)
+ while (!Result.Success)
{
- Sleep(1000);
Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
+ if (!Result.Success)
+ {
+ Sleep(1000);
+ }
}
CHECK(Result.Success);
+ Timers[ItemIndex].FetchTimeNS = static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000.0);
}
CbPackage Response;