diff options
| author | Dan Engelbrecht <[email protected]> | 2024-03-21 10:58:28 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-03-21 10:58:28 +0100 |
| commit | 9b82209e3368de806a48554d85cb7c364973eb2d (patch) | |
| tree | 8b52dbe1fd5a1a3255c51198a7b31bef77cf9009 /src/zenserver-test/zenserver-test.cpp | |
| parent | improved process monitoring behaviour with invalid pids (#16) (diff) | |
| download | zen-9b82209e3368de806a48554d85cb7c364973eb2d.tar.xz zen-9b82209e3368de806a48554d85cb7c364973eb2d.zip | |
add support for responding with partial cache chunks (#11)
* add support for responding with partial cache chunks
Diffstat (limited to 'src/zenserver-test/zenserver-test.cpp')
| -rw-r--r-- | src/zenserver-test/zenserver-test.cpp | 231 |
1 files changed, 224 insertions, 7 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 4a368c992..691047f90 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -1771,6 +1771,223 @@ TEST_CASE("zcache.failing.upstream") } } +TEST_CASE("zcache.rpc.partialchunks") +{ + using namespace std::literals; + using namespace utils; + + ZenConfig LocalCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance Server(TestEnv); + SpawnServer(Server, LocalCfg); + + std::vector<CompressedBuffer> Attachments; + + const auto BaseUri = fmt::format("http://localhost:{}/z$", Server.GetBasePort()); + + auto GenerateKey = [](std::string_view Bucket, size_t KeyIndex) -> CacheKey { + IoHash KeyHash; + ((size_t*)(KeyHash.Hash))[0] = KeyIndex; + return CacheKey::Create(Bucket, KeyHash); + }; + + auto CreateSemiRandomBlob = [](size_t AttachmentSize) -> CompressedBuffer { + // Convoluted way to get a compressed buffer whose result it large enough to be a separate file + // but also does actually compress + const size_t PartCount = (AttachmentSize / (1u * 1024u * 64)) + 1; + const size_t PartSize = AttachmentSize / PartCount; + auto Part = SharedBuffer(CreateRandomBlob(PartSize)); + std::vector<SharedBuffer> Parts(PartCount, Part); + size_t RemainPartSize = AttachmentSize - (PartSize * PartCount); + if (RemainPartSize > 0) + { + Parts.push_back(SharedBuffer(CreateRandomBlob(RemainPartSize))); + } + CompressedBuffer Value = CompressedBuffer::Compress(CompositeBuffer(std::move(Parts))); + return Value; + }; + + auto AppendCacheRecord = [&CreateSemiRandomBlob](cacherequests::PutCacheRecordsRequest& Request, + const CacheKey& CacheKey, + size_t AttachmentCount, + size_t AttachmentsSize, + CachePolicy RecordPolicy) -> std::vector<std::pair<Oid, CompressedBuffer>> { + std::vector<std::pair<Oid, CompressedBuffer>> AttachmentBuffers; + std::vector<cacherequests::PutCacheRecordRequestValue> Attachments; + for (size_t AttachmentIndex = 0; AttachmentIndex < AttachmentCount; AttachmentIndex++) + { + CompressedBuffer Value = CreateSemiRandomBlob(AttachmentsSize); + AttachmentBuffers.push_back(std::make_pair(Oid::NewOid(), Value)); + Attachments.push_back({.Id = AttachmentBuffers.back().first, .Body = std::move(Value)}); + } + Request.Requests.push_back({.Key = CacheKey, .Values = Attachments, .Policy = RecordPolicy}); + return AttachmentBuffers; + }; + + auto PutCacheRecords = [&AppendCacheRecord, &GenerateKey]( + std::string_view BaseUri, + std::string_view Namespace, + std::string_view Bucket, + size_t KeyOffset, + size_t Num, + size_t AttachmentCount, + size_t AttachmentsSize = + 8192) -> std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> { + std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> Keys; + + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + for (size_t Key = 1; Key <= Num; ++Key) + { + const CacheKey NewCacheKey = GenerateKey(Bucket, KeyOffset + Key); + std::vector<std::pair<Oid, CompressedBuffer>> Attachments = + AppendCacheRecord(Request, NewCacheKey, AttachmentCount, AttachmentsSize, CachePolicy::Default); + Keys.push_back(std::make_pair(NewCacheKey, std::move(Attachments))); + } + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + if (Result.status_code != 200) + { + ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason); + Keys.clear(); + } + + return Keys; + }; + + std::string_view TestBucket = "partialcachevaluetests"sv; + std::string_view TestNamespace = "ue4.ddc"sv; + auto RecordsWithSmallAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 0, 3, 2, 4096u); + CHECK(RecordsWithSmallAttachments.size() == 3); + auto RecordsWithLargeAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 10, 1, 2, 8u * 1024u * 1024u); + CHECK(RecordsWithLargeAttachments.size() == 1); + + struct PartialOptions + { + uint64_t Offset = 0ull; + uint64_t Size = ~0ull; + RpcAcceptOptions AcceptOptions = RpcAcceptOptions::kNone; + }; + + auto GetCacheChunk = [](std::string_view BaseUri, + std::string_view Namespace, + const CacheKey& Key, + const Oid& ValueId, + const PartialOptions& Options = {}) -> cacherequests::GetCacheChunksResult { + cacherequests::GetCacheChunksRequest Request = { + .AcceptMagic = kCbPkgMagic, + .AcceptOptions = (uint16_t)Options.AcceptOptions, + .Namespace = std::string(Namespace), + .Requests = {{.Key = Key, .ValueId = ValueId, .RawOffset = Options.Offset, .RawSize = Options.Size}}}; + CbPackage Package; + CHECK(Request.Format(Package)); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + CHECK(Result.status_code == 200); + + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + bool Loaded = !Response.IsNull(); + CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); + cacherequests::GetCacheChunksResult GetCacheChunksResult; + CHECK(GetCacheChunksResult.Parse(Response)); + return GetCacheChunksResult; + }; + + auto GetAndVerifyChunk = [&GetCacheChunk](std::string_view BaseUri, + std::string_view Namespace, + const CacheKey& Key, + const Oid& ChunkId, + const CompressedBuffer& VerifyData, + const PartialOptions& Options = {}) { + cacherequests::GetCacheChunksResult Result = GetCacheChunk(BaseUri, Namespace, Key, ChunkId, Options); + CHECK(Result.Results.size() == 1); + bool CanGetPartial = ((uint16_t)Options.AcceptOptions & (uint16_t)RpcAcceptOptions::kAllowPartialCacheChunks); + if (!CanGetPartial) + { + CHECK(Result.Results[0].RawOffset == 0); + CHECK(Result.Results[0].Body.GetCompressedSize() == VerifyData.GetCompressedSize()); + } + IoBuffer SourceDecompressed = VerifyData.Decompress(Options.Offset, Options.Size).AsIoBuffer(); + IoBuffer ReceivedDecompressed = + Result.Results[0].Body.Decompress(Options.Offset - Result.Results[0].RawOffset, Options.Size).AsIoBuffer(); + CHECK(SourceDecompressed.GetView().EqualBytes(ReceivedDecompressed.GetView())); + }; + + GetAndVerifyChunk(BaseUri, + TestNamespace, + RecordsWithSmallAttachments[0].first, + RecordsWithSmallAttachments[0].second[0].first, + RecordsWithSmallAttachments[0].second[0].second); + GetAndVerifyChunk(BaseUri, + TestNamespace, + RecordsWithSmallAttachments[0].first, + RecordsWithSmallAttachments[0].second[0].first, + RecordsWithSmallAttachments[0].second[0].second, + PartialOptions{.Offset = 378, .Size = 519, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithSmallAttachments[0].first, + RecordsWithSmallAttachments[0].second[0].first, + RecordsWithSmallAttachments[0].second[0].second, + PartialOptions{.Offset = 378, + .Size = 519, + .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks}); + GetAndVerifyChunk(BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); + GetAndVerifyChunk(BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowPartialCacheChunks}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, + .Size = 512u * 1024u, + .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, + .Size = 512u * 1024u, + .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences | + RpcAcceptOptions::kAllowPartialCacheChunks}); +} + TEST_CASE("zcache.rpc.allpolicies") { using namespace std::literals; @@ -2326,9 +2543,9 @@ public: ZenServerInstance& GetInstance(int Index) { return *m_Instances[Index]; } private: - std::string m_HelperId; - int m_ServerCount = 0; - std::vector<std::unique_ptr<ZenServerInstance> > m_Instances; + std::string m_HelperId; + int m_ServerCount = 0; + std::vector<std::unique_ptr<ZenServerInstance>> m_Instances; }; TEST_CASE("http.basics") @@ -2409,7 +2626,7 @@ OidAsString(const Oid& Id) } CbPackage -CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer> >& Attachments) +CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) { CbPackage Package; CbObjectWriter Object; @@ -2436,10 +2653,10 @@ CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, Compresse return Package; }; -std::vector<std::pair<Oid, CompressedBuffer> > +std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes) { - std::vector<std::pair<Oid, CompressedBuffer> > Result; + std::vector<std::pair<Oid, CompressedBuffer>> Result; Result.reserve(Sizes.size()); for (size_t Size : Sizes) { @@ -2500,7 +2717,7 @@ TEST_CASE("project.remote") OpIds.emplace_back(Oid::NewOid()); } - std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer> >, Oid::Hasher> Attachments; + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; { std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489, 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, |