diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-26 00:40:36 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-06-13 12:06:27 +0200 |
| commit | 62e5a5a5537bfacd8e7dc8197b692763dce5bee4 (patch) | |
| tree | 274e676c89c760dc62ad722b2df716d1460b5911 | |
| parent | more WIP (diff) | |
| download | zen-62e5a5a5537bfacd8e7dc8197b692763dce5bee4.tar.xz zen-62e5a5a5537bfacd8e7dc8197b692763dce5bee4.zip | |
WIP
| -rw-r--r-- | zencore/compactbinarybuilder.cpp | 21 | ||||
| -rw-r--r-- | zencore/compactbinarypackage.cpp | 8 | ||||
| -rw-r--r-- | zencore/include/zencore/compactbinarybuilder.h | 2 | ||||
| -rw-r--r-- | zencore/include/zencore/compactbinarypackage.h | 2 | ||||
| -rw-r--r-- | zenhttp/httpasio.cpp | 5 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 615 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 58 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 1 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 5 |
9 files changed, 526 insertions, 191 deletions
diff --git a/zencore/compactbinarybuilder.cpp b/zencore/compactbinarybuilder.cpp index d4ccd434d..471bd62eb 100644 --- a/zencore/compactbinarybuilder.cpp +++ b/zencore/compactbinarybuilder.cpp @@ -148,6 +148,15 @@ CbWriter::Save(BinaryWriter& Writer) Writer.Write(Data.data(), Data.size()); } +IoBuffer +CbWriter::SaveAsIoBuffer() +{ + ZEN_ASSERT(States.size() == 1 && States.back().Flags == StateFlags::None); + // TEXT("It is invalid to save while there are incomplete write operations.")); + ZEN_ASSERT(Data.size() > 0); // TEXT("It is invalid to save when nothing has been written.")); + return IoBufferBuilder::MakeCloneFromMemory(Data.data(), Data.size()); +} + uint64_t CbWriter::GetSaveSize() const { @@ -468,9 +477,9 @@ void CbWriter::AddString(const std::string_view Value) { BeginField(); - const uint64_t Size = uint64_t(Value.size()); - const uint32_t SizeByteCount = MeasureVarUInt(Size); - const int64_t Offset = Data.size(); + const size_t Size = Value.size(); + const size_t SizeByteCount = MeasureVarUInt(Size); + const size_t Offset = Data.size(); Data.resize(Offset + SizeByteCount + Size); @@ -491,9 +500,9 @@ CbWriter::AddString(const std::wstring_view Value) ExtendableStringBuilder<128> Utf8; WideToUtf8(Value, Utf8); - const uint32_t Size = uint32_t(Utf8.Size()); - const uint32_t SizeByteCount = MeasureVarUInt(Size); - const int64_t Offset = Data.size(); + const size_t Size = Utf8.Size(); + const size_t SizeByteCount = MeasureVarUInt(Size); + const size_t Offset = Data.size(); Data.resize(Offset + SizeByteCount + Size); uint8_t* StringData = Data.data() + Offset; WriteVarUInt(Size, StringData); diff --git a/zencore/compactbinarypackage.cpp b/zencore/compactbinarypackage.cpp index 6eaca5fdf..1586ee54a 100644 --- a/zencore/compactbinarypackage.cpp +++ b/zencore/compactbinarypackage.cpp @@ -612,6 +612,14 @@ CbPackage::Save(BinaryWriter& StreamWriter) const Writer.Save(StreamWriter); } +IoBuffer +CbPackage::SaveAsIoBuffer() const +{ + CbWriter Writer; + Save(Writer); + return Writer.SaveAsIoBuffer(); +} + ////////////////////////////////////////////////////////////////////////// // // Legacy package serialization support diff --git a/zencore/include/zencore/compactbinarybuilder.h b/zencore/include/zencore/compactbinarybuilder.h index dbd010705..d95be558a 100644 --- a/zencore/include/zencore/compactbinarybuilder.h +++ b/zencore/include/zencore/compactbinarybuilder.h @@ -96,6 +96,8 @@ public: ZENCORE_API void Save(BinaryWriter& Writer); + ZENCORE_API IoBuffer SaveAsIoBuffer(); + /** * The size of buffer (in bytes) required to serialize the fields that have been written. * diff --git a/zencore/include/zencore/compactbinarypackage.h b/zencore/include/zencore/compactbinarypackage.h index c3e587f40..08e0a4d6c 100644 --- a/zencore/include/zencore/compactbinarypackage.h +++ b/zencore/include/zencore/compactbinarypackage.h @@ -331,6 +331,8 @@ public: /** Save the object and attachments into the writer as a stream of compact binary fields. */ ZENCORE_API void Save(BinaryWriter& Writer) const; + ZENCORE_API IoBuffer SaveAsIoBuffer() const; + private: ZENCORE_API void SetObject(CbObject Object, const IoHash* Hash, AttachmentResolver* Resolver); ZENCORE_API void AddAttachment(const CbAttachment& Attachment, AttachmentResolver* Resolver); diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp index 1ca5fb1c1..3ca103bff 100644 --- a/zenhttp/httpasio.cpp +++ b/zenhttp/httpasio.cpp @@ -369,6 +369,10 @@ HttpServerConnection::TerminateConnection() std::error_code Ec; m_Socket->close(Ec); + if (Ec) + { + ZEN_ERROR("socket close received ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message()); + } } void @@ -658,6 +662,7 @@ HttpRequest::AppendInputBytes(const char* Data, size_t Bytes) } // Terribad, but better than buffer overflow + ZEN_WARN("Not enough buffer space, need {}, have {}. Closing connection", Bytes, RemainingBufferSpace); TerminateConnection(); } diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 0ffaf6b69..c91af976c 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -70,6 +70,65 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <zencore/scopeguard.h> # include <unordered_set> # include <unordered_map> + +template<> +struct fmt::formatter<cpr::Response> +{ + constexpr auto parse(format_parse_context& Ctx) -> decltype(Ctx.begin()) { return Ctx.end(); } + + template<typename FormatContext> + auto format(const cpr::Response& Response, FormatContext& Ctx) -> decltype(Ctx.out()) + { + using namespace std::literals; + + if (Response.status_code == 200 || Response.status_code == 201) + { + return fmt::format_to(Ctx.out(), + "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s", + Response.url.str(), + Response.status_code, + Response.uploaded_bytes, + Response.downloaded_bytes, + Response.elapsed); + } + else + { + const auto It = Response.header.find("Content-Type"); + const std::string_view ContentType = It != Response.header.end() ? It->second : "<None>"sv; + + if (ContentType == "application/x-ue-cb"sv) + { + zen::IoBuffer Body(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + zen::CbObjectView Obj(Body.Data()); + zen::ExtendableStringBuilder<256> Sb; + std::string_view Json = Obj.ToJson(Sb).ToView(); + + return fmt::format_to(Ctx.out(), + "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Response: '{}', Reason: '{}'", + Response.url.str(), + Response.status_code, + Response.uploaded_bytes, + Response.downloaded_bytes, + Response.elapsed, + Json, + Response.reason); + } + else + { + return fmt::format_to(Ctx.out(), + "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Reponse: '{}', Reason: '{}'", + Response.url.str(), + Response.status_code, + Response.uploaded_bytes, + Response.downloaded_bytes, + Response.elapsed, + Response.text, + Response.reason); + } + } + } +}; + #endif using namespace std::literals; @@ -733,9 +792,10 @@ namespace utils { .Args = std::move(Args)}; } - static ZenConfig NewWithUpstream(uint16_t UpstreamPort) + static ZenConfig NewWithUpstream(uint16_t UpstreamPort, std::string Args = "") { - return New(13337, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort)); + return New(13337, + fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{} {}", UpstreamPort, Args)); } void Spawn(ZenServerInstance& Inst) @@ -2374,25 +2434,23 @@ TEST_CASE("zcache.rpc.upstream") { using namespace utils; - ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenConfig UpstreamCfg = ZenConfig::New(13338, "--http=asio"); ZenServerInstance UpstreamServer(TestEnv); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338, "--http=asio"); ZenServerInstance LocalServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); - SpawnServer(LocalServer, LocalCfg); + UpstreamServer.WaitUntilReady(); - const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalCfg.Port); - const auto UpstreamBaseUri = fmt::format("http://localhost:{}/z$", UpstreamCfg.Port); + const auto LocalBaseUri = fmt::format("http://localhost:{}", LocalCfg.Port); + const auto UpstreamBaseUri = fmt::format("http://localhost:{}", UpstreamCfg.Port); const std::string Bucket = "AutoTestDummy"; - const size_t PackageCount = 500; - const size_t NumAttachments = 7; - const size_t BufferSizeMultiplier = (1024 * 1024) / NumAttachments; + const size_t NumAttachments = 7; - auto GenerateAttachment = [BufferSizeMultiplier](size_t DataIndexOffset, size_t ValueIndex) { - const size_t NumBytes = (ValueIndex + 1) * BufferSizeMultiplier * sizeof(uint32_t); + auto GenerateAttachment = [](size_t DataIndexOffset, size_t ValueIndex) { + const size_t NumBytes = (1ull << (ValueIndex + 1) * 3) * sizeof(uint32_t); IoBuffer ValueContents(NumBytes); uint8_t* DataPtr = reinterpret_cast<uint8_t*>(ValueContents.MutableData()); @@ -2408,139 +2466,295 @@ TEST_CASE("zcache.rpc.upstream") return ValueContents; }; - auto GenerateCacheRecord = [](IoHash Hash, const std::string& Bucket, const std::vector<CompressedBuffer>& Attachments) { - CbObjectWriter Builder; - Builder.BeginObject("key"sv); - Builder << "Bucket"sv << Bucket << "Hash"sv << Hash; - Builder.EndObject(); - Builder.BeginArray("Values"sv); - for (size_t AttachmentIndex = 0; AttachmentIndex < Attachments.size(); ++AttachmentIndex) - { - Builder.BeginObject(); - - std::string ValueName = fmt::format("%d", AttachmentIndex); - Oid ValueId = Oid::FromMemory(IoHash::HashBuffer(ValueName.data(), ValueName.size() * sizeof(ValueName.data()[0])).Hash); - Builder.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Attachments[AttachmentIndex].GetRawHash())); - Builder.AddObjectId("Id"sv, ValueId); - Builder.AddInteger("RawSize"sv, Attachments[AttachmentIndex].GetRawSize()); - Builder.EndObject(); + auto AppendCacheRecord = + [](CbPackage& Package, CbWriter& Writer, const zen::CacheKey& CacheKey, const IoBuffer& Payload, CachePolicy RecordPolicy) { + CompressedBuffer Value = CompressedBuffer::Compress(SharedBuffer(Payload)); + CbAttachment Attachment(Value); + + Writer.BeginObject(); + { + Writer.BeginObject("Record"sv); + { + Writer.BeginObject("Key"sv); + { + Writer << "Bucket"sv << CacheKey.Bucket << "Hash"sv << CacheKey.Hash; + } + Writer.EndObject(); + Writer.BeginArray("Values"sv); + { + Writer.BeginObject(); + { + Writer.AddObjectId("Id"sv, Oid::NewOid()); + Writer.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Value.GetRawHash())); + Writer.AddInteger("RawSize"sv, Value.GetRawSize()); + } + Writer.EndObject(); + } + Writer.EndArray(); + } + Writer.EndObject(); + Writer.SetName("Policy"sv); + CacheRecordPolicy(RecordPolicy).Save(Writer); + } + Writer.EndObject(); + + Package.AddAttachment(Attachment); + }; + + auto CreateCachePackage = [&AppendCacheRecord](std::string_view Bucket, + const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& Attachments) -> CbPackage { + CbPackage Package; + CbWriter Writer; + + Writer.BeginObject(); + { + Writer << "Method"sv + << "PutCacheRecords"sv; + Writer.BeginObject("Params"sv); + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer.BeginArray("Requests"sv); + for (const auto& Entry : Attachments) + { + const IoHash& KeyHash = Entry.first; + const IoBuffer& Attachment = Entry.second; + const CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); + AppendCacheRecord(Package, Writer, CacheKey, Attachment, CachePolicy::Default); + } + Writer.EndArray(); + } + Writer.EndObject(); } - Builder.EndArray(); - return Builder.Save(); + Writer.EndObject(); + Package.SetObject(Writer.Save().AsObject()); + + return Package; + }; + + struct CacheResult + { + IoBuffer Response; + int64_t Bytes = {}; + double ElapsedSeconds = {}; + int32_t ErrorCode = {}; + std::string Reason; + bool Success = false; }; - auto GeneratePackage = [&GenerateAttachment, - &GenerateCacheRecord](const std::string Bucket, size_t NumAttachments, size_t DataIndexOffset) { - IoHashStream Hasher; + auto InvokeObjectViewRPC = [](std::string_view BaseUri, const CbObjectView& Request) -> CacheResult { + ExtendableStringBuilder<256> Uri; + Uri << BaseUri << "/z$/$rpc"; + + BinaryWriter Body; + Request.CopyTo(Body); - std::vector<CompressedBuffer> CompressedValues; - CompressedValues.reserve(NumAttachments); - for (size_t AttachmentIndex = 0; AttachmentIndex < NumAttachments; ++AttachmentIndex) + cpr::Session Session; + Session.SetBody({}); + Session.SetHeader({}); + Session.SetConnectTimeout(0); + Session.SetTimeout(0); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}); + Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()}); + + cpr::Response Response = Session.Post(); + // ZEN_DEBUG("POST {}", Response); + + if (Response.error) { - IoBuffer ValueContents = GenerateAttachment(BufferSizeMultiplier * DataIndexOffset, AttachmentIndex); - Hasher.Append(ValueContents.GetView()); - auto CompressedData = CompressedBuffer::Compress(SharedBuffer(ValueContents)); - size_t CompressedSize = CompressedData.GetCompressedSize(); - CHECK(CompressedSize > 0); - CompressedValues.emplace_back(CompressedData); + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; } - IoHash Key = Hasher.GetHash(); + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - CbObject CacheRecord = GenerateCacheRecord(Key, Bucket, CompressedValues); + return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + }; - CbPackage Package; - Package.SetObject(CacheRecord); - for (const auto& Value : CompressedValues) + auto InvokePackageRPC = [](std::string_view BaseUri, const CbPackage& Request) -> CacheResult { + ExtendableStringBuilder<256> Uri; + Uri << BaseUri << "/z$/$rpc"; + + SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten(); + + cpr::Session Session; + Session.SetBody({}); + Session.SetHeader({}); + // Session.SetConnectTimeout(0); + // Session.SetTimeout(0); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}); + Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()}); + + cpr::Response Response = Session.Post(); + // ZEN_DEBUG("POST {}", Response); + + if (Response.error) { - Package.AddAttachment(zen::CbAttachment(Value)); + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; } - return Package; - }; - // auto GeneratePackages = - // [&GeneratePackage](const std::string Bucket, size_t PackageCount, size_t NumAttachments, size_t DataIndexOffset) { - // std::vector<CbPackage> Packages; - // Packages.reserve(PackageCount); - // for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex) - // { - // CbPackage Package = GeneratePackage(Bucket, NumAttachments, PackageIndex + DataIndexOffset); - // Packages.emplace_back(Package); - // } - // return Packages; - // }; - - const size_t Iterations = 50; - std::vector<IoHash> AllPackages; - AllPackages.reserve(PackageCount * Iterations); + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - WorkerThreadPool WorkerPool(64); - for (size_t Iteration = 0; Iteration < Iterations; ++Iteration) - { - std::vector<std::pair<IoHash, std::unordered_set<BLAKE3, BLAKE3::Hasher> > > SentPackages; - SentPackages.resize(PackageCount); + return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + }; + + auto CreateGetCacheRecord = [](std::string_view Bucket, + IoHash Key, + std::unordered_set<IoHash, IoHash::Hasher> AttachmentKeys, + zen::CachePolicy Policy, + BinaryWriter& Body) { + using namespace zen; + CbObjectWriter Request; + Request << "Method"sv + << "GetCacheRecords"sv; + Request.BeginObject("Params"sv); { - std::vector<uint64_t> Timers; - Timers.resize(PackageCount); - std::atomic<size_t> WorkCompleted = 0; - size_t ItemIndex = 0; - Stopwatch PutRemoteTimer; - const auto _ = MakeGuard([&AllPackages, &PutRemoteTimer, &Timers] { - uint64_t ElapsedUS = PutRemoteTimer.GetElapsedTimeUs(); - std::sort(Timers.begin(), Timers.end()); - uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0; - uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0; - ZEN_INFO("put #{} items in remote in {}, worst {}, best {}", - AllPackages.size(), - NiceLatencyNs(ElapsedUS * 1000), - NiceLatencyNs(WorstUS * 1000), - NiceLatencyNs(BestUS * 1000)); - }); - for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex) + Request << "DefaultPolicy"sv << WriteToString<128>(Policy); + Request.BeginArray("Requests"sv); + Request.BeginObject(); { - WorkerPool.ScheduleWork([&GeneratePackage, - &PackageIndex, - &Iteration, - &UpstreamBaseUri, - &Bucket, - &SentPackages, - &WorkCompleted, - &Timers, - ItemIndex]() { - CbPackage Package = GeneratePackage(Bucket, NumAttachments, PackageIndex + Iteration * PackageCount); - zen::IoHash Key = IoHash::HashBuffer(Package.GetObject().GetBuffer().GetView()); - zen::BinaryWriter MemStream; - Package.Save(MemStream); - - cpr::Response Result; + Request.BeginObject("Key"sv); + { + Request << "Bucket"sv << Bucket << "Hash"sv << Key; + } + Request.EndObject(); + } + Request.EndObject(); + for (const auto& AttachmentKey : AttachmentKeys) + { + Request.BeginObject(); + { + Request.BeginObject("Key"sv); { - Stopwatch Timer; - const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); }); - Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamBaseUri, Bucket, Key)}, - cpr::Body{(const char*)MemStream.Data(), MemStream.Size()}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + Request << "Bucket"sv << Bucket << "Hash"sv << AttachmentKey; } - CHECK(Result.status_code == 201); - std::unordered_set<BLAKE3, BLAKE3::Hasher> Attachments; - Attachments.reserve(Package.GetAttachments().size()); - for (const CbAttachment& Attachment : Package.GetAttachments()) + Request.EndObject(); + } + Request.EndObject(); + } + Request.EndArray(); + } + Request.EndObject(); + + Request.Save(Body); + }; + + auto ToIoBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { + zen::BinaryWriter MemStream; + Package.Save(MemStream); + return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + }; + + const size_t PackageCount = 250; + const size_t Iterations = 20; + + std::vector<std::pair<IoHash, std::unordered_set<IoHash, IoHash::Hasher> > > SentPackages; + SentPackages.resize(PackageCount * Iterations); + + ZEN_INFO("generating #{} packages", Iterations * PackageCount); + WorkerThreadPool WorkerPool(32); + for (size_t Iteration = 0; Iteration < Iterations; ++Iteration) + { + size_t ItemOffset = Iteration * PackageCount; + std::vector<uint64_t> Timers; + Timers.resize(PackageCount); + size_t ItemIndex = 0; + std::atomic<size_t> WorkCompleted = 0; + Stopwatch PutRemoteTimer; + const auto _ = MakeGuard([PackageCount, &PutRemoteTimer, &Timers, Iteration, Iterations] { + uint64_t ElapsedUS = PutRemoteTimer.GetElapsedTimeUs(); + std::sort(Timers.begin(), Timers.end()); + uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0; + uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0; + ZEN_INFO("put #{} items in remote in {}, worst {}, best {}, {}/{}", + PackageCount, + NiceLatencyNs(ElapsedUS * 1000), + NiceLatencyNs(WorstUS * 1000), + NiceLatencyNs(BestUS * 1000), + Iteration + 1, + Iterations); + }); + for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex) + { + WorkerPool.ScheduleWork([&CreateCachePackage, + &GenerateAttachment, + &InvokePackageRPC, + &PackageIndex, + &Iteration, + &UpstreamBaseUri, + &Bucket, + &SentPackages, + &WorkCompleted, + &Timers, + ItemIndex, + ItemOffset]() { + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Attachments; + Attachments.reserve(NumAttachments); + for (size_t AttachmentIdx = 0; AttachmentIdx < NumAttachments; ++AttachmentIdx) + { + IoBuffer Attachment = + GenerateAttachment((Iteration * PackageCount * NumAttachments) + (PackageCount * NumAttachments), AttachmentIdx); + IoHash Key = IoHash::HashBuffer(Attachment.GetView()); + + Attachments.insert_or_assign(Key, Attachment); + } + + CbPackage Package = CreateCachePackage(Bucket, Attachments); + + { + Stopwatch Timer; + const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); }); + CacheResult Result = InvokePackageRPC(UpstreamBaseUri, Package); + if (!Result.Success) { - Attachments.insert(Attachment.AsCompressedBinary().GetRawHash()); + Sleep(1000); + Result = InvokePackageRPC(UpstreamBaseUri, Package); } - SentPackages[ItemIndex] = {Key, Attachments}; - WorkCompleted.fetch_add(1); - }); - ItemIndex++; - } - while (WorkCompleted < PackageCount) - { - Sleep(1); - } + CHECK(Result.Success); + } + + std::unordered_set<IoHash, IoHash::Hasher> AttachmentKeys; + AttachmentKeys.reserve(Attachments.size()); + for (const auto& Entry : Attachments) + { + AttachmentKeys.insert(Entry.first); + } + IoHash Key; + ((uint32_t*)(Key.Hash))[0] = (uint32_t)(Iteration * PackageCount + PackageIndex); + SentPackages[ItemIndex + ItemOffset] = make_pair(Key, AttachmentKeys); + WorkCompleted.fetch_add(1); + }); + ItemIndex++; } + while (WorkCompleted < PackageCount) + { + Sleep(1); + } + } + + SpawnServer(LocalServer, LocalCfg); + LocalServer.WaitUntilReady(); + struct FetchInfo + { + uint64_t FetchTime; + uint64_t FetchSize; + IoHash FetchKey; + }; + + std::vector<std::pair<IoHash, std::unordered_set<IoHash, IoHash::Hasher> > > AllPackages; + AllPackages.reserve(PackageCount * Iterations); + for (size_t Iteration = 0; Iteration < Iterations; ++Iteration) + { + if (!AllPackages.empty()) { - std::vector<uint64_t> Timers; + std::vector<FetchInfo> Timers; Timers.resize(AllPackages.size()); std::atomic<size_t> WorkCompleted = 0; size_t ItemIndex = 0; @@ -2548,28 +2762,60 @@ TEST_CASE("zcache.rpc.upstream") Stopwatch HotLocalTimer; const auto _ = MakeGuard([&AllPackages, &HotLocalTimer, &Timers] { uint64_t ElapsedUS = HotLocalTimer.GetElapsedTimeUs(); - std::sort(Timers.begin(), Timers.end()); - uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0; - uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0; - ZEN_INFO("fetched #{} hot local, hot remote items in {}, worst {}, best {}", + std::sort(Timers.begin(), Timers.end(), [](const struct FetchInfo& LHS, const struct FetchInfo& RHS) { + return LHS.FetchTime < RHS.FetchTime; + }); + FetchInfo Worst = Timers.size() > 0 ? Timers.back() : FetchInfo{}; + FetchInfo Best = Timers.size() > 0 ? Timers.front() : FetchInfo{}; + FetchInfo Median = Timers.size() > 0 ? Timers[Timers.size() / 2] : FetchInfo{}; + ZEN_INFO("fetched #{} hot local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}", AllPackages.size(), NiceLatencyNs(ElapsedUS * 1000), - NiceLatencyNs(WorstUS * 1000), - NiceLatencyNs(BestUS * 1000)); + Worst.FetchKey.ToHexString().substr(0, 6), + NiceBytes(Worst.FetchSize), + NiceLatencyNs(Worst.FetchTime * 1000), + Best.FetchKey.ToHexString().substr(0, 6), + NiceBytes(Best.FetchSize), + NiceLatencyNs(Best.FetchTime * 1000), + Median.FetchKey.ToHexString().substr(0, 6), + NiceBytes(Median.FetchSize), + NiceLatencyNs(Median.FetchTime * 1000)); }); - for (const IoHash& Key : AllPackages) + for (const auto& Entry : AllPackages) { - WorkerPool.ScheduleWork([&LocalBaseUri, &Bucket, &Key, &WorkCompleted, &Timers, ItemIndex]() { - cpr::Response Result; - { - Stopwatch Timer; - const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); }); - Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default", LocalBaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - } - CHECK(IsHttpSuccessCode(Result.status_code)); - WorkCompleted.fetch_add(1); - }); + WorkerPool.ScheduleWork( + [&LocalBaseUri, &Bucket, &Entry, &WorkCompleted, &Timers, ItemIndex, &CreateGetCacheRecord, &InvokeObjectViewRPC]() { + const IoHash& Key = Entry.first; + BinaryWriter Body; + CreateGetCacheRecord(Bucket, Key, Entry.second, CachePolicy::Default, Body); + auto BodyView = CbObjectView(Body.GetData()); + + CacheResult Result; + { + Stopwatch Timer; + const auto _ = + MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex].FetchTime = Timer.GetElapsedTimeUs(); }); + Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); + while (!Result.Success && Result.ErrorCode == 1) + { + Sleep(1000); + Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); + } + CHECK(Result.Success); + } + + CbPackage Response; + CHECK(Response.TryLoad(Result.Response)); + CHECK(!Response.IsNull()); + uint64_t Size = Response.GetObject().GetSize(); + for (const auto& Attachment : Response.GetAttachments()) + { + Size += Attachment.AsCompressedBinary().GetCompressedSize(); + } + Timers[ItemIndex].FetchSize = Size; + Timers[ItemIndex].FetchKey = Key; + WorkCompleted.fetch_add(1); + }); ItemIndex++; } while (WorkCompleted < AllPackages.size()) @@ -2579,54 +2825,77 @@ TEST_CASE("zcache.rpc.upstream") } { - std::vector<uint64_t> Timers; - Timers.resize(SentPackages.size()); + std::vector<FetchInfo> Timers; + Timers.resize(PackageCount); std::atomic<size_t> WorkCompleted = 0; size_t ItemIndex = 0; Stopwatch HotRemoteTimer; - const auto _ = MakeGuard([&SentPackages, &HotRemoteTimer, &Timers] { + const auto _ = MakeGuard([&HotRemoteTimer, &Timers] { uint64_t ElapsedUS = HotRemoteTimer.GetElapsedTimeUs(); - std::sort(Timers.begin(), Timers.end()); - uint64_t WorstUS = Timers.size() > 0 ? Timers.back() : 0; - uint64_t BestUS = Timers.size() > 0 ? Timers.front() : 0; - ZEN_INFO("fetched #{} cold local, hot remote items in {}, worst {}, best {}", - SentPackages.size(), + std::sort(Timers.begin(), Timers.end(), [](const struct FetchInfo& LHS, const struct FetchInfo& RHS) { + return LHS.FetchTime < RHS.FetchTime; + }); + FetchInfo Worst = Timers.size() > 0 ? Timers.back() : FetchInfo{}; + FetchInfo Best = Timers.size() > 0 ? Timers.front() : FetchInfo{}; + FetchInfo Median = Timers.size() > 0 ? Timers[Timers.size() / 2] : FetchInfo{}; + ZEN_INFO("fetched #{} cold local, hot remote items in {}, worst [{}]:{}:{}, best [{}]:{}:{}, median [{}]:{}:{}", + Timers.size(), NiceLatencyNs(ElapsedUS * 1000), - NiceLatencyNs(WorstUS * 1000), - NiceLatencyNs(BestUS * 1000)); + Worst.FetchKey.ToHexString().substr(0, 6), + NiceBytes(Worst.FetchSize), + NiceLatencyNs(Worst.FetchTime * 1000), + Best.FetchKey.ToHexString().substr(0, 6), + NiceBytes(Best.FetchSize), + NiceLatencyNs(Best.FetchTime * 1000), + Median.FetchKey.ToHexString().substr(0, 6), + NiceBytes(Median.FetchSize), + NiceLatencyNs(Median.FetchTime * 1000)); }); - for (const auto& Entry : SentPackages) + for (size_t PackageIndex = 0; PackageIndex < PackageCount; ++PackageIndex) { - WorkerPool.ScheduleWork([&Entry, &LocalBaseUri, &Bucket, &WorkCompleted, &Timers, ItemIndex]() { - const IoHash& Key = Entry.first; - const std::unordered_set<BLAKE3, BLAKE3::Hasher>& ExpectedAttachments = Entry.second; - // Get package with attachments - { - cpr::Response Result; + const auto& Entry = SentPackages[Iteration * PackageCount + PackageIndex]; + + WorkerPool.ScheduleWork( + [&LocalBaseUri, &Bucket, &Entry, &WorkCompleted, &Timers, ItemIndex, &CreateGetCacheRecord, &InvokeObjectViewRPC]() { + const IoHash& Key = Entry.first; + + BinaryWriter Body; + CreateGetCacheRecord(Bucket, Key, Entry.second, CachePolicy::Default, Body); + auto BodyView = CbObjectView(Body.GetData()); + + CacheResult Result; { Stopwatch Timer; - const auto _ = MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex] = Timer.GetElapsedTimeUs(); }); - Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default", LocalBaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + const auto _ = + MakeGuard([&Timer, &Timers, ItemIndex] { Timers[ItemIndex].FetchTime = Timer.GetElapsedTimeUs(); }); + Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); + Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); + while (!Result.Success && Result.ErrorCode == 1) + { + Sleep(1000); + Result = InvokeObjectViewRPC(LocalBaseUri, BodyView); + } + CHECK(Result.Success); } - CHECK(IsHttpSuccessCode(Result.status_code)); - IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size()); - CbPackage ResponsePackage; - CHECK(ResponsePackage.TryLoad(Buffer)); - std::unordered_set<BLAKE3, BLAKE3::Hasher> ReceivedAttachments; - for (const auto& Attachement : ResponsePackage.GetAttachments()) + + CbPackage Response; + CHECK(Response.TryLoad(Result.Response)); + CHECK(!Response.IsNull()); + uint64_t Size = Response.GetObject().GetSize(); + for (const auto& Attachment : Response.GetAttachments()) { - ReceivedAttachments.insert(Attachement.AsCompressedBinary().GetRawHash()); + Size += Attachment.AsCompressedBinary().GetCompressedSize(); } - CHECK(ReceivedAttachments == ExpectedAttachments); - } - WorkCompleted.fetch_add(1); - }); + Timers[ItemIndex].FetchSize = Size; + Timers[ItemIndex].FetchKey = Key; + WorkCompleted.fetch_add(1); + }); + ItemIndex++; - AllPackages.push_back(Entry.first); + AllPackages.push_back(Entry); } - while (WorkCompleted < SentPackages.size()) + while (WorkCompleted < PackageCount) { Sleep(1); } diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 45bbe062b..06b41a014 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -543,10 +543,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request { Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value)); - BinaryWriter MemStream; - Package.Save(MemStream); - - ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + ClientResultValue.Value = Package.SaveAsIoBuffer(); ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); } } @@ -688,18 +685,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); } - BinaryWriter MemStream; + // BinaryWriter MemStream; if (SkipData) { // Save a package containing only the object. - CbPackage(Package.GetObject()).Save(MemStream); + ClientResultValue.Value = CbPackage(Package.GetObject()).SaveAsIoBuffer(); } else { - Package.Save(MemStream); + ClientResultValue.Value = Package.SaveAsIoBuffer(); } - ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage); } else @@ -725,6 +721,18 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (Success) { + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(ClientResultValue.Value.Size()), + ToString(ClientResultValue.Value.GetContentType())); + + // ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", + // Ref.BucketSegment, + // Ref.HashKey, + // NiceBytes(ClientResultValue.Value.Size()), + // ToString(ClientResultValue.Value.GetContentType())); + // ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' (UPSTREAM)", Ref.Namespace, Ref.BucketSegment, @@ -748,6 +756,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } else { + ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + // ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); m_CacheStats.MissCount++; AsyncRequest.WriteResponse(HttpResponseCode::NotFound); @@ -891,6 +901,22 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); } + ZEN_DEBUG("PUT - '{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Body.GetSize()), + ToString(ContentType), + Count.New, + Count.Valid, + Count.Total); + // ZEN_DEBUG("PUT - '{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)", + // Ref.BucketSegment, + // Ref.HashKey, + // NiceBytes(Body.GetSize()), + // ToString(ContentType), + // Count.New, + // Count.Valid, + // Count.Total); ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)", Ref.Namespace, Ref.BucketSegment, @@ -1238,6 +1264,20 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack return PutResult::Invalid; } + ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", + Request.Key.Bucket, + Request.Key.Hash, + NiceBytes(TransferredSize), + Count.New, + Count.Valid, + Count.Total); + // ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", + // Request.Key.Bucket, + // Request.Key.Hash, + // NiceBytes(TransferredSize), + // Count.New, + // Count.Valid, + // Count.Total); ZEN_DEBUG("PUT - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", Request.Namespace, Request.Key.Bucket, @@ -1776,6 +1816,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http } else { + ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}'", Key.Bucket, Key.Hash); + // ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}'", Key.Bucket, Key.Hash); ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", *Namespace, Key.Bucket, diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 890a2ebab..f1bf8bdaf 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -157,6 +157,7 @@ private: metrics::OperationTiming m_HttpRequests; metrics::OperationTiming m_UpstreamGetRequestTiming; CacheStats m_CacheStats; + std::atomic_uint64_t m_InFlightRequests = 0; }; /** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 98b4439c7..c3dd1eecd 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -269,10 +269,7 @@ namespace detail { if (Result.Success) { - BinaryWriter MemStream; - Package.Save(MemStream); - - Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + Result.Response = Package.SaveAsIoBuffer(); } } } |