aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-26 00:40:36 +0200
committerDan Engelbrecht <[email protected]>2022-06-13 12:06:27 +0200
commit62e5a5a5537bfacd8e7dc8197b692763dce5bee4 (patch)
tree274e676c89c760dc62ad722b2df716d1460b5911
parentmore WIP (diff)
downloadzen-62e5a5a5537bfacd8e7dc8197b692763dce5bee4.tar.xz
zen-62e5a5a5537bfacd8e7dc8197b692763dce5bee4.zip
WIP
-rw-r--r--zencore/compactbinarybuilder.cpp21
-rw-r--r--zencore/compactbinarypackage.cpp8
-rw-r--r--zencore/include/zencore/compactbinarybuilder.h2
-rw-r--r--zencore/include/zencore/compactbinarypackage.h2
-rw-r--r--zenhttp/httpasio.cpp5
-rw-r--r--zenserver-test/zenserver-test.cpp615
-rw-r--r--zenserver/cache/structuredcache.cpp58
-rw-r--r--zenserver/cache/structuredcache.h1
-rw-r--r--zenserver/upstream/upstreamcache.cpp5
9 files changed, 526 insertions, 191 deletions
diff --git a/zencore/compactbinarybuilder.cpp b/zencore/compactbinarybuilder.cpp
index d4ccd434d..471bd62eb 100644
--- a/zencore/compactbinarybuilder.cpp
+++ b/zencore/compactbinarybuilder.cpp
@@ -148,6 +148,15 @@ CbWriter::Save(BinaryWriter& Writer)
Writer.Write(Data.data(), Data.size());
}
+IoBuffer
+CbWriter::SaveAsIoBuffer()
+{
+ ZEN_ASSERT(States.size() == 1 && States.back().Flags == StateFlags::None);
+ // TEXT("It is invalid to save while there are incomplete write operations."));
+ ZEN_ASSERT(Data.size() > 0); // TEXT("It is invalid to save when nothing has been written."));
+ return IoBufferBuilder::MakeCloneFromMemory(Data.data(), Data.size());
+}
+
uint64_t
CbWriter::GetSaveSize() const
{
@@ -468,9 +477,9 @@ void
CbWriter::AddString(const std::string_view Value)
{
BeginField();
- const uint64_t Size = uint64_t(Value.size());
- const uint32_t SizeByteCount = MeasureVarUInt(Size);
- const int64_t Offset = Data.size();
+ const size_t Size = Value.size();
+ const size_t SizeByteCount = MeasureVarUInt(Size);
+ const size_t Offset = Data.size();
Data.resize(Offset + SizeByteCount + Size);
@@ -491,9 +500,9 @@ CbWriter::AddString(const std::wstring_view Value)
ExtendableStringBuilder<128> Utf8;
WideToUtf8(Value, Utf8);
- const uint32_t Size = uint32_t(Utf8.Size());
- const uint32_t SizeByteCount = MeasureVarUInt(Size);
- const int64_t Offset = Data.size();
+ const size_t Size = Utf8.Size();
+ const size_t SizeByteCount = MeasureVarUInt(Size);
+ const size_t Offset = Data.size();
Data.resize(Offset + SizeByteCount + Size);
uint8_t* StringData = Data.data() + Offset;
WriteVarUInt(Size, StringData);
diff --git a/zencore/compactbinarypackage.cpp b/zencore/compactbinarypackage.cpp
index 6eaca5fdf..1586ee54a 100644
--- a/zencore/compactbinarypackage.cpp
+++ b/zencore/compactbinarypackage.cpp
@@ -612,6 +612,14 @@ CbPackage::Save(BinaryWriter& StreamWriter) const
Writer.Save(StreamWriter);
}
+IoBuffer
+CbPackage::SaveAsIoBuffer() const
+{
+ CbWriter Writer;
+ Save(Writer);
+ return Writer.SaveAsIoBuffer();
+}
+
//////////////////////////////////////////////////////////////////////////
//
// Legacy package serialization support
diff --git a/zencore/include/zencore/compactbinarybuilder.h b/zencore/include/zencore/compactbinarybuilder.h
index dbd010705..d95be558a 100644
--- a/zencore/include/zencore/compactbinarybuilder.h
+++ b/zencore/include/zencore/compactbinarybuilder.h
@@ -96,6 +96,8 @@ public:
ZENCORE_API void Save(BinaryWriter& Writer);
+ ZENCORE_API IoBuffer SaveAsIoBuffer();
+
/**
* The size of buffer (in bytes) required to serialize the fields that have been written.
*
diff --git a/zencore/include/zencore/compactbinarypackage.h b/zencore/include/zencore/compactbinarypackage.h
index c3e587f40..08e0a4d6c 100644
--- a/zencore/include/zencore/compactbinarypackage.h
+++ b/zencore/include/zencore/compactbinarypackage.h
@@ -331,6 +331,8 @@ public:
/** Save the object and attachments into the writer as a stream of compact binary fields. */
ZENCORE_API void Save(BinaryWriter& Writer) const;
+ ZENCORE_API IoBuffer SaveAsIoBuffer() const;
+
private:
ZENCORE_API void SetObject(CbObject Object, const IoHash* Hash, AttachmentResolver* Resolver);
ZENCORE_API void AddAttachment(const CbAttachment& Attachment, AttachmentResolver* Resolver);
diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp
index 1ca5fb1c1..3ca103bff 100644
--- a/zenhttp/httpasio.cpp
+++ b/zenhttp/httpasio.cpp
@@ -369,6 +369,10 @@ HttpServerConnection::TerminateConnection()
std::error_code Ec;
m_Socket->close(Ec);
+ if (Ec)
+ {
+ ZEN_ERROR("socket close received ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message());
+ }
}
void
@@ -658,6 +662,7 @@ HttpRequest::AppendInputBytes(const char* Data, size_t Bytes)
}
// Terribad, but better than buffer overflow
+ ZEN_WARN("Not enough buffer space, need {}, have {}. Closing connection", Bytes, RemainingBufferSpace);
TerminateConnection();
}
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 0ffaf6b69..c91af976c 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -70,6 +70,65 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <zencore/scopeguard.h>
# include <unordered_set>
# include <unordered_map>
+
+template<>
+struct fmt::formatter<cpr::Response>
+{
+ constexpr auto parse(format_parse_context& Ctx) -> decltype(Ctx.begin()) { return Ctx.end(); }
+
+ template<typename FormatContext>
+ auto format(const cpr::Response& Response, FormatContext& Ctx) -> decltype(Ctx.out())
+ {
+ using namespace std::literals;
+
+ if (Response.status_code == 200 || Response.status_code == 201)
+ {
+ return fmt::format_to(Ctx.out(),
+ "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s",
+ Response.url.str(),
+ Response.status_code,
+ Response.uploaded_bytes,
+ Response.downloaded_bytes,
+ Response.elapsed);
+ }
+ else
+ {
+ const auto It = Response.header.find("Content-Type");
+ const std::string_view ContentType = It != Response.header.end() ? It->second : "<None>"sv;
+
+ if (ContentType == "application/x-ue-cb"sv)
+ {
+ zen::IoBuffer Body(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ zen::CbObjectView Obj(Body.Data());
+ zen::ExtendableStringBuilder<256> Sb;
+ std::string_view Json = Obj.ToJson(Sb).ToView();
+
+ return fmt::format_to(Ctx.out(),
+ "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Response: '{}', Reason: '{}'",
+ Response.url.str(),
+ Response.status_code,
+ Response.uploaded_bytes,
+ Response.downloaded_bytes,
+ Response.elapsed,
+ Json,
+ Response.reason);
+ }
+ else
+ {
+ return fmt::format_to(Ctx.out(),
+ "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Reponse: '{}', Reason: '{}'",
+ Response.url.str(),
+ Response.status_code,
+ Response.uploaded_bytes,
+ Response.downloaded_bytes,
+ Response.elapsed,
+ Response.text,
+ Response.reason);
+ }
+ }
+ }
+};
+
#endif
using namespace std::literals;
@@ -733,9 +792,10 @@ namespace utils {
.Args = std::move(Args)};
}
- static ZenConfig NewWithUpstream(uint16_t UpstreamPort)
+ static ZenConfig NewWithUpstream(uint16_t UpstreamPort, std::string Args = "")
{
- return New(13337, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort));
+ return New(13337,
+ fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{} {}", UpstreamPort, Args));
}
void Spawn(ZenServerInstance& Inst)
@@ -2374,25 +2434,23 @@ TEST_CASE("zcache.rpc.upstream")
{
using namespace utils;
- ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenConfig UpstreamCfg = ZenConfig::New(13338, "--http=asio");
ZenServerInstance UpstreamServer(TestEnv);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338, "--http=asio");
ZenServerInstance LocalServer(TestEnv);
SpawnServer(UpstreamServer, UpstreamCfg);
- SpawnServer(LocalServer, LocalCfg);
+ UpstreamServer.WaitUntilReady();
- const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalCfg.Port);
- const auto UpstreamBaseUri = fmt::format("http://localhost:{}/z$", UpstreamCfg.Port);
+ const auto LocalBaseUri = fmt::format("http://localhost:{}", LocalCfg.Port);
+ const auto UpstreamBaseUri = fmt::format("http://localhost:{}", UpstreamCfg.Port);
const std::string Bucket = "AutoTestDummy";
- const size_t PackageCount = 500;
- const size_t NumAttachments = 7;
- const size_t BufferSizeMultiplier = (1024 * 1024) / NumAttachments;
+ const size_t NumAttachments = 7;
- auto GenerateAttachment = [BufferSizeMultiplier](size_t DataIndexOffset, size_t ValueIndex) {
- const size_t NumBytes = (ValueIndex + 1) * BufferSizeMultiplier * sizeof(uint32_t);
+ auto GenerateAttachment = [](size_t DataIndexOffset, size_t ValueIndex) {
+ const size_t NumBytes = (1ull << (ValueIndex + 1) * 3) * sizeof(uint32_t);
IoBuffer ValueContents(NumBytes);
uint8_t* DataPtr = reinterpret_cast<uint8_t*>(ValueContents.MutableData());
@@ -2408,139 +2466,295 @@ TEST_CASE("zcache.rpc.upstream")
return ValueContents;
};
- auto GenerateCacheRecord = [](IoHash Hash, const std::string& Bucket, const std::vector<CompressedBuffer>& Attachments) {
- CbObjectWriter Builder;
- Builder.BeginObject("key"sv);
- Builder << "Bucket"sv << Bucket << "Hash"sv << Hash;
- Builder.EndObject();
- Builder.BeginArray("Values"sv);
- for (size_t AttachmentIndex = 0; AttachmentIndex < Attachments.size(); ++AttachmentIndex)
- {
- Builder.BeginObject();
-
- std::string ValueName = fmt::format("%d", AttachmentIndex);
- Oid ValueId = Oid::FromMemory(IoHash::HashBuffer(ValueName.data(), ValueName.size() * sizeof(ValueName.data()[0])).Hash);
- Builder.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Attachments[AttachmentIndex].GetRawHash()));
- Builder.AddObjectId("Id"sv, ValueId);
- Builder.AddInteger("RawSize"sv, Attachments[AttachmentIndex].GetRawSize());
- Builder.EndObject();
+ auto AppendCacheRecord =
+ [](CbPackage& Package, CbWriter& Writer, const zen::CacheKey& CacheKey, const IoBuffer& Payload, CachePolicy RecordPolicy) {
+ CompressedBuffer Value = CompressedBuffer::Compress(SharedBuffer(Payload));
+ CbAttachment Attachment(Value);
+
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Record"sv);
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << CacheKey.Bucket << "Hash"sv << CacheKey.Hash;
+ }
+ Writer.EndObject();
+ Writer.BeginArray("Values"sv);
+ {
+ Writer.BeginObject();
+ {
+ Writer.AddObjectId("Id"sv, Oid::NewOid());
+ Writer.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Value.GetRawHash()));
+ Writer.AddInteger("RawSize"sv, Value.GetRawSize());
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Writer.SetName("Policy"sv);
+ CacheRecordPolicy(RecordPolicy).Save(Writer);
+ }
+ Writer.EndObject();
+
+ Package.AddAttachment(Attachment);
+ };
+
+ auto CreateCachePackage = [&AppendCacheRecord](std::string_view Bucket,
+ const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& Attachments) -> CbPackage {
+ CbPackage Package;
+ CbWriter Writer;
+
+ Writer.BeginObject();
+ {
+ Writer << "Method"sv
+ << "PutCacheRecords"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ for (const auto& Entry : Attachments)
+ {
+ const IoHash& KeyHash = Entry.first;
+ const IoBuffer& Attachment = Entry.second;
+ const CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
+ AppendCacheRecord(Package, Writer, CacheKey, Attachment, CachePolicy::Default);
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
}
- Builder.EndArray();
- return Builder.Save();
+ Writer.EndObject();
+ Package.SetObject(Writer.Save().AsObject());
+
+ return Package;
+ };
+
+ struct CacheResult
+ {
+ IoBuffer Response;
+ int64_t Bytes = {};
+ double ElapsedSeconds = {};
+ int32_t ErrorCode = {};
+ std::string Reason;
+ bool Success = false;
};
- auto GeneratePackage = [&GenerateAttachment,
- &GenerateCacheRecord](const std::string Bucket, size_t NumAttachments, size_t DataIndexOffset) {
- IoHashStream Hasher;
+ auto InvokeObjectViewRPC = [](std::string_view BaseUri, const CbObjectView& Request) -> CacheResult {
+ ExtendableStringBuilder<256> Uri;
+ Uri << BaseUri << "/z$/$rpc";
+
+ BinaryWriter Body;
+ Request.CopyTo(Body);
- std::vector<CompressedBuffer> CompressedValues;
- CompressedValues.reserve(NumAttachments);
- for (size_t AttachmentIndex = 0; AttachmentIndex < NumAttachments; ++AttachmentIndex)
+ cpr::Session Session;
+ Session.SetBody({});
+ Session.SetHeader({});
+ Session.SetConnectTimeout(0);
+ Session.SetTimeout(0);
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}});
+ Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()});
+
+ cpr::Response Response = Session.Post();
+ // ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
{
- IoBuffer ValueContents = GenerateAttachment(BufferSizeMultiplier * DataIndexOffset, AttachmentIndex);
- Hasher.Append(ValueContents.GetView());
- auto CompressedData = CompressedBuffer::Compress(SharedBuffer(ValueContents));
- size_t CompressedSize = CompressedData.GetCompressedSize();
- CHECK(CompressedSize > 0);
- CompressedValues.emplace_back(CompressedData);
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
}
- IoHash Key = Hasher.GetHash();
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- CbObject CacheRecord = GenerateCacheRecord(Key, Bucket, CompressedValues);
+ return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ };
- CbPackage Package;
- Package.SetObject(CacheRecord);
- for (const auto& Value : CompressedValues)
+ auto InvokePackageRPC = [](std::string_view BaseUri, const CbPackage& Request) -> CacheResult {
+ ExtendableStringBuilder<256> Uri;
+ Uri << BaseUri << "/z$/$rpc";
+
+ SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten();
+
+ cpr::Session Session;
+ Session.SetBody({});
+ Session.SetHeader({});
+ // Session.SetConnectTimeout(0);
+ // Session.SetTimeout(0);
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}});
+ Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()});
+
+ cpr::Response Response = Session.Post();
+ // ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
{
- Package.AddAttachment(zen::CbAttachment(Value));
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
}
- return Package;
- };
- // auto GeneratePackages =
- // [&GeneratePackage](const std::string Bucket, size_t PackageCount, size_t NumAttachments, size_t DataIndexOffset) {
- // std::vector<CbPackage> Packages;
- // Packages.reserve(PackageCount);
- // for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex)
- // {
- // CbPackage Package = GeneratePackage(Bucket, NumAttachments, PackageIndex + DataIndexOffset);
- // Packages.emplace_back(Package);
- // }
- // return Packages;
- // };
-
- const size_t Iterations = 50;
- std::vector<IoHash> AllPackages;
- AllPackages.reserve(PackageCount * Iterations);
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- WorkerThreadPool WorkerPool(64);
- for (size_t Iteration = 0; Iteration < Iterations; ++Iteration)
- {
- std::vector<std::pair<IoHash, std::unordered_set<BLAKE3, BLAKE3::Hasher> > > SentPackages;
- SentPackages.resize(PackageCount);
+ return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ };
+
+ auto CreateGetCacheRecord = [](std::string_view Bucket,
+ IoHash Key,
+ std::unordered_set<IoHash, IoHash::Hasher> AttachmentKeys,
+ zen::CachePolicy Policy,
+ BinaryWriter& Body) {
+ using namespace zen;
+ CbObjectWriter Request;
+ Request << "Method"sv
+ << "GetCacheRecords"sv;
+ Request.BeginObject("Params"sv);
{
- std::vector<uint64_t> Timers;
- Timers.resize(PackageCount);
- std::atomic<size_t> WorkCompleted = 0;
- size_t ItemIndex = 0;
- Stopwatch PutRemoteTimer;
- const auto _ = MakeGuard([&AllPackages, &PutRemoteTimer, &Timers] {
- uint64_t ElapsedUS = PutRemoteTimer.GetElapsedTimeUs();
- std::sort(Timers.begin(), Timers.end());
- uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0;
- uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0;
- ZEN_INFO("put #{} items in remote in {}, worst {}, best {}",
- AllPackages.size(),
- NiceLatencyNs(ElapsedUS * 1000),
- NiceLatencyNs(WorstUS * 1000),
- NiceLatencyNs(BestUS * 1000));
- });
- for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex)
+ Request << "DefaultPolicy"sv << WriteToString<128>(Policy);
+ Request.BeginArray("Requests"sv);
+ Request.BeginObject();
{
- WorkerPool.ScheduleWork([&GeneratePackage,
- &PackageIndex,
- &Iteration,
- &UpstreamBaseUri,
- &Bucket,
- &SentPackages,
- &WorkCompleted,
- &Timers,
- ItemIndex]() {
- CbPackage Package = GeneratePackage(Bucket, NumAttachments, PackageIndex + Iteration * PackageCount);
- zen::IoHash Key = IoHash::HashBuffer(Package.GetObject().GetBuffer().GetView());
- zen::BinaryWriter MemStream;
- Package.Save(MemStream);
-
- cpr::Response Result;
+ Request.BeginObject("Key"sv);
+ {
+ Request << "Bucket"sv << Bucket << "Hash"sv << Key;
+ }
+ Request.EndObject();
+ }
+ Request.EndObject();
+ for (const auto& AttachmentKey : AttachmentKeys)
+ {
+ Request.BeginObject();
+ {
+ Request.BeginObject("Key"sv);
{
- Stopwatch Timer;
- const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); });
- Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamBaseUri, Bucket, Key)},
- cpr::Body{(const char*)MemStream.Data(), MemStream.Size()},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ Request << "Bucket"sv << Bucket << "Hash"sv << AttachmentKey;
}
- CHECK(Result.status_code == 201);
- std::unordered_set<BLAKE3, BLAKE3::Hasher> Attachments;
- Attachments.reserve(Package.GetAttachments().size());
- for (const CbAttachment& Attachment : Package.GetAttachments())
+ Request.EndObject();
+ }
+ Request.EndObject();
+ }
+ Request.EndArray();
+ }
+ Request.EndObject();
+
+ Request.Save(Body);
+ };
+
+ auto ToIoBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
+ zen::BinaryWriter MemStream;
+ Package.Save(MemStream);
+ return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ };
+
+ const size_t PackageCount = 250;
+ const size_t Iterations = 20;
+
+ std::vector<std::pair<IoHash, std::unordered_set<IoHash, IoHash::Hasher> > > SentPackages;
+ SentPackages.resize(PackageCount * Iterations);
+
+ ZEN_INFO("generating #{} packages", Iterations * PackageCount);
+ WorkerThreadPool WorkerPool(32);
+ for (size_t Iteration = 0; Iteration < Iterations; ++Iteration)
+ {
+ size_t ItemOffset = Iteration * PackageCount;
+ std::vector<uint64_t> Timers;
+ Timers.resize(PackageCount);
+ size_t ItemIndex = 0;
+ std::atomic<size_t> WorkCompleted = 0;
+ Stopwatch PutRemoteTimer;
+ const auto _ = MakeGuard([PackageCount, &PutRemoteTimer, &Timers, Iteration, Iterations] {
+ uint64_t ElapsedUS = PutRemoteTimer.GetElapsedTimeUs();
+ std::sort(Timers.begin(), Timers.end());
+ uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0;
+ uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0;
+ ZEN_INFO("put #{} items in remote in {}, worst {}, best {}, {}/{}",
+ PackageCount,
+ NiceLatencyNs(ElapsedUS * 1000),
+ NiceLatencyNs(WorstUS * 1000),
+ NiceLatencyNs(BestUS * 1000),
+ Iteration + 1,
+ Iterations);
+ });
+ for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex)
+ {
+ WorkerPool.ScheduleWork([&CreateCachePackage,
+ &GenerateAttachment,
+ &InvokePackageRPC,
+ &PackageIndex,
+ &Iteration,
+ &UpstreamBaseUri,
+ &Bucket,
+ &SentPackages,
+ &WorkCompleted,
+ &Timers,
+ ItemIndex,
+ ItemOffset]() {
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Attachments;
+ Attachments.reserve(NumAttachments);
+ for (size_t AttachmentIdx = 0; AttachmentIdx < NumAttachments; ++AttachmentIdx)
+ {
+ IoBuffer Attachment =
+ GenerateAttachment((Iteration * PackageCount * NumAttachments) + (PackageCount * NumAttachments), AttachmentIdx);
+ IoHash Key = IoHash::HashBuffer(Attachment.GetView());
+
+ Attachments.insert_or_assign(Key, Attachment);
+ }
+
+ CbPackage Package = CreateCachePackage(Bucket, Attachments);
+
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); });
+ CacheResult Result = InvokePackageRPC(UpstreamBaseUri, Package);
+ if (!Result.Success)
{
- Attachments.insert(Attachment.AsCompressedBinary().GetRawHash());
+ Sleep(1000);
+ Result = InvokePackageRPC(UpstreamBaseUri, Package);
}
- SentPackages[ItemIndex] = {Key, Attachments};
- WorkCompleted.fetch_add(1);
- });
- ItemIndex++;
- }
- while (WorkCompleted < PackageCount)
- {
- Sleep(1);
- }
+ CHECK(Result.Success);
+ }
+
+ std::unordered_set<IoHash, IoHash::Hasher> AttachmentKeys;
+ AttachmentKeys.reserve(Attachments.size());
+ for (const auto& Entry : Attachments)
+ {
+ AttachmentKeys.insert(Entry.first);
+ }
+ IoHash Key;
+ ((uint32_t*)(Key.Hash))[0] = (uint32_t)(Iteration * PackageCount + PackageIndex);
+ SentPackages[ItemIndex + ItemOffset] = make_pair(Key, AttachmentKeys);
+ WorkCompleted.fetch_add(1);
+ });
+ ItemIndex++;
}
+ while (WorkCompleted < PackageCount)
+ {
+ Sleep(1);
+ }
+ }
+
+ SpawnServer(LocalServer, LocalCfg);
+ LocalServer.WaitUntilReady();
+ struct FetchInfo
+ {
+ uint64_t FetchTime;
+ uint64_t FetchSize;
+ IoHash FetchKey;
+ };
+
+ std::vector<std::pair<IoHash, std::unordered_set<IoHash, IoHash::Hasher> > > AllPackages;
+ AllPackages.reserve(PackageCount * Iterations);
+ for (size_t Iteration = 0; Iteration < Iterations; ++Iteration)
+ {
+ if (!AllPackages.empty())
{
- std::vector<uint64_t> Timers;
+ std::vector<FetchInfo> Timers;
Timers.resize(AllPackages.size());
std::atomic<size_t> WorkCompleted = 0;
size_t ItemIndex = 0;
@@ -2548,28 +2762,60 @@ TEST_CASE("zcache.rpc.upstream")
Stopwatch HotLocalTimer;
const auto _ = MakeGuard([&AllPackages, &HotLocalTimer, &Timers] {
uint64_t ElapsedUS = HotLocalTimer.GetElapsedTimeUs();
- std::sort(Timers.begin(), Timers.end());
- uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0;
- uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0;
- ZEN_INFO("fetched #{} hot local, hot remote items in {}, worst {}, best {}",
+ std::sort(Timers.begin(), Timers.end(), [](const struct FetchInfo& LHS, const struct FetchInfo& RHS) {
+ return LHS.FetchTime < RHS.FetchTime;
+ });
+ FetchInfo Worst = Timers.size() > 0 ? Timers.back() : FetchInfo{};
+ FetchInfo Best = Timers.size() > 0 ? Timers.front() : FetchInfo{};
+ FetchInfo Median = Timers.size() > 0 ? Timers[Timers.size() / 2] : FetchInfo{};
+ ZEN_INFO("fetched #{} hot local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}",
AllPackages.size(),
NiceLatencyNs(ElapsedUS * 1000),
- NiceLatencyNs(WorstUS * 1000),
- NiceLatencyNs(BestUS * 1000));
+ Worst.FetchKey.ToHexString().substr(0, 6),
+ NiceBytes(Worst.FetchSize),
+ NiceLatencyNs(Worst.FetchTime * 1000),
+ Best.FetchKey.ToHexString().substr(0, 6),
+ NiceBytes(Best.FetchSize),
+ NiceLatencyNs(Best.FetchTime * 1000),
+ Median.FetchKey.ToHexString().substr(0, 6),
+ NiceBytes(Median.FetchSize),
+ NiceLatencyNs(Median.FetchTime * 1000));
});
- for (const IoHash& Key : AllPackages)
+ for (const auto& Entry : AllPackages)
{
- WorkerPool.ScheduleWork([&LocalBaseUri, &Bucket, &Key, &WorkCompleted, &Timers, ItemIndex]() {
- cpr::Response Result;
- {
- Stopwatch Timer;
- const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); });
- Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default", LocalBaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- }
- CHECK(IsHttpSuccessCode(Result.status_code));
- WorkCompleted.fetch_add(1);
- });
+ WorkerPool.ScheduleWork(
+ [&LocalBaseUri, &Bucket, &Entry, &WorkCompleted, &Timers, ItemIndex, &CreateGetCacheRecord, &InvokeObjectViewRPC]() {
+ const IoHash& Key = Entry.first;
+ BinaryWriter Body;
+ CreateGetCacheRecord(Bucket, Key, Entry.second, CachePolicy::Default, Body);
+ auto BodyView = CbObjectView(Body.GetData());
+
+ CacheResult Result;
+ {
+ Stopwatch Timer;
+ const auto _ =
+ MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex].FetchTime = Timer.GetElapsedTimeUs(); });
+ Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
+ while (!Result.Success && Result.ErrorCode == 1)
+ {
+ Sleep(1000);
+ Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
+ }
+ CHECK(Result.Success);
+ }
+
+ CbPackage Response;
+ CHECK(Response.TryLoad(Result.Response));
+ CHECK(!Response.IsNull());
+ uint64_t Size = Response.GetObject().GetSize();
+ for (const auto& Attachment : Response.GetAttachments())
+ {
+ Size += Attachment.AsCompressedBinary().GetCompressedSize();
+ }
+ Timers[ItemIndex].FetchSize = Size;
+ Timers[ItemIndex].FetchKey = Key;
+ WorkCompleted.fetch_add(1);
+ });
ItemIndex++;
}
while (WorkCompleted < AllPackages.size())
@@ -2579,54 +2825,77 @@ TEST_CASE("zcache.rpc.upstream")
}
{
- std::vector<uint64_t> Timers;
- Timers.resize(SentPackages.size());
+ std::vector<FetchInfo> Timers;
+ Timers.resize(PackageCount);
std::atomic<size_t> WorkCompleted = 0;
size_t ItemIndex = 0;
Stopwatch HotRemoteTimer;
- const auto _ = MakeGuard([&SentPackages, &HotRemoteTimer, &Timers] {
+ const auto _ = MakeGuard([&HotRemoteTimer, &Timers] {
uint64_t ElapsedUS = HotRemoteTimer.GetElapsedTimeUs();
- std::sort(Timers.begin(), Timers.end());
- uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0;
- uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0;
- ZEN_INFO("fetched #{} cold local, hot remote items in {}, worst {}, best {}",
- SentPackages.size(),
+ std::sort(Timers.begin(), Timers.end(), [](const struct FetchInfo& LHS, const struct FetchInfo& RHS) {
+ return LHS.FetchTime < RHS.FetchTime;
+ });
+ FetchInfo Worst = Timers.size() > 0 ? Timers.back() : FetchInfo{};
+ FetchInfo Best = Timers.size() > 0 ? Timers.front() : FetchInfo{};
+ FetchInfo Median = Timers.size() > 0 ? Timers[Timers.size() / 2] : FetchInfo{};
+ ZEN_INFO("fetched #{} cold local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}",
+ Timers.size(),
NiceLatencyNs(ElapsedUS * 1000),
- NiceLatencyNs(WorstUS * 1000),
- NiceLatencyNs(BestUS * 1000));
+ Worst.FetchKey.ToHexString().substr(0, 6),
+ NiceBytes(Worst.FetchSize),
+ NiceLatencyNs(Worst.FetchTime * 1000),
+ Best.FetchKey.ToHexString().substr(0, 6),
+ NiceBytes(Best.FetchSize),
+ NiceLatencyNs(Best.FetchTime * 1000),
+ Median.FetchKey.ToHexString().substr(0, 6),
+ NiceBytes(Median.FetchSize),
+ NiceLatencyNs(Median.FetchTime * 1000));
});
- for (const auto& Entry : SentPackages)
+ for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex)
{
- WorkerPool.ScheduleWork([&Entry, &LocalBaseUri, &Bucket, &WorkCompleted, &Timers, ItemIndex]() {
- const IoHash& Key = Entry.first;
- const std::unordered_set<BLAKE3, BLAKE3::Hasher>& ExpectedAttachments = Entry.second;
- // Get package with attachments
- {
- cpr::Response Result;
+ const auto& Entry = SentPackages[Iteration * PackageCount + PackageIndex];
+
+ WorkerPool.ScheduleWork(
+ [&LocalBaseUri, &Bucket, &Entry, &WorkCompleted, &Timers, ItemIndex, &CreateGetCacheRecord, &InvokeObjectViewRPC]() {
+ const IoHash& Key = Entry.first;
+
+ BinaryWriter Body;
+ CreateGetCacheRecord(Bucket, Key, Entry.second, CachePolicy::Default, Body);
+ auto BodyView = CbObjectView(Body.GetData());
+
+ CacheResult Result;
{
Stopwatch Timer;
- const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); });
- Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default", LocalBaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ const auto _ =
+ MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex].FetchTime = Timer.GetElapsedTimeUs(); });
+ Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
+ Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
+ while (!Result.Success && Result.ErrorCode == 1)
+ {
+ Sleep(1000);
+ Result = InvokeObjectViewRPC(LocalBaseUri, BodyView);
+ }
+ CHECK(Result.Success);
}
- CHECK(IsHttpSuccessCode(Result.status_code));
- IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size());
- CbPackage ResponsePackage;
- CHECK(ResponsePackage.TryLoad(Buffer));
- std::unordered_set<BLAKE3, BLAKE3::Hasher> ReceivedAttachments;
- for (const auto& Attachement : ResponsePackage.GetAttachments())
+
+ CbPackage Response;
+ CHECK(Response.TryLoad(Result.Response));
+ CHECK(!Response.IsNull());
+ uint64_t Size = Response.GetObject().GetSize();
+ for (const auto& Attachment : Response.GetAttachments())
{
- ReceivedAttachments.insert(Attachement.AsCompressedBinary().GetRawHash());
+ Size += Attachment.AsCompressedBinary().GetCompressedSize();
}
- CHECK(ReceivedAttachments == ExpectedAttachments);
- }
- WorkCompleted.fetch_add(1);
- });
+ Timers[ItemIndex].FetchSize = Size;
+ Timers[ItemIndex].FetchKey = Key;
+ WorkCompleted.fetch_add(1);
+ });
+
ItemIndex++;
- AllPackages.push_back(Entry.first);
+ AllPackages.push_back(Entry);
}
- while (WorkCompleted < SentPackages.size())
+ while (WorkCompleted < PackageCount)
{
Sleep(1);
}
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 45bbe062b..06b41a014 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -543,10 +543,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
{
Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value));
- BinaryWriter MemStream;
- Package.Save(MemStream);
-
- ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ ClientResultValue.Value = Package.SaveAsIoBuffer();
ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage);
}
}
@@ -688,18 +685,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
}
- BinaryWriter MemStream;
+ // BinaryWriter MemStream;
if (SkipData)
{
// Save a package containing only the object.
- CbPackage(Package.GetObject()).Save(MemStream);
+ ClientResultValue.Value = CbPackage(Package.GetObject()).SaveAsIoBuffer();
}
else
{
- Package.Save(MemStream);
+ ClientResultValue.Value = Package.SaveAsIoBuffer();
}
- ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage);
}
else
@@ -725,6 +721,18 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
if (Success)
{
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(ClientResultValue.Value.Size()),
+ ToString(ClientResultValue.Value.GetContentType()));
+
+ // ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)",
+ // Ref.BucketSegment,
+ // Ref.HashKey,
+ // NiceBytes(ClientResultValue.Value.Size()),
+ // ToString(ClientResultValue.Value.GetContentType()));
+ //
ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' (UPSTREAM)",
Ref.Namespace,
Ref.BucketSegment,
@@ -748,6 +756,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
}
else
{
+ ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
+ // ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
m_CacheStats.MissCount++;
AsyncRequest.WriteResponse(HttpResponseCode::NotFound);
@@ -891,6 +901,22 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv);
}
+ ZEN_DEBUG("PUT - '{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(Body.GetSize()),
+ ToString(ContentType),
+ Count.New,
+ Count.Valid,
+ Count.Total);
+ // ZEN_DEBUG("PUT - '{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)",
+ // Ref.BucketSegment,
+ // Ref.HashKey,
+ // NiceBytes(Body.GetSize()),
+ // ToString(ContentType),
+ // Count.New,
+ // Count.Valid,
+ // Count.Total);
ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)",
Ref.Namespace,
Ref.BucketSegment,
@@ -1238,6 +1264,20 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
return PutResult::Invalid;
}
+ ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)",
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ NiceBytes(TransferredSize),
+ Count.New,
+ Count.Valid,
+ Count.Total);
+ // ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)",
+ // Request.Key.Bucket,
+ // Request.Key.Hash,
+ // NiceBytes(TransferredSize),
+ // Count.New,
+ // Count.Valid,
+ // Count.Total);
ZEN_DEBUG("PUT - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)",
Request.Namespace,
Request.Key.Bucket,
@@ -1776,6 +1816,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
}
else
{
+ ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}'", Key.Bucket, Key.Hash);
+ // ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}'", Key.Bucket, Key.Hash);
ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
*Namespace,
Key.Bucket,
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 890a2ebab..f1bf8bdaf 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -157,6 +157,7 @@ private:
metrics::OperationTiming m_HttpRequests;
metrics::OperationTiming m_UpstreamGetRequestTiming;
CacheStats m_CacheStats;
+ std::atomic_uint64_t m_InFlightRequests = 0;
};
/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 98b4439c7..c3dd1eecd 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -269,10 +269,7 @@ namespace detail {
if (Result.Success)
{
- BinaryWriter MemStream;
- Package.Save(MemStream);
-
- Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ Result.Response = Package.SaveAsIoBuffer();
}
}
}