diff options
| author | Dan Engelbrecht <[email protected]> | 2024-03-21 10:58:28 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-03-21 10:58:28 +0100 |
| commit | 9b82209e3368de806a48554d85cb7c364973eb2d (patch) | |
| tree | 8b52dbe1fd5a1a3255c51198a7b31bef77cf9009 /src | |
| parent | improved process monitoring behaviour with invalid pids (#16) (diff) | |
| download | zen-9b82209e3368de806a48554d85cb7c364973eb2d.tar.xz zen-9b82209e3368de806a48554d85cb7c364973eb2d.zip | |
add support for responding with partial cache chunks (#11)
* add support for responding with partial cache chunks
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver-test/zenserver-test.cpp | 231 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 36 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cacherpc.h | 5 | ||||
| -rw-r--r-- | src/zenutil/cache/cacherequests.cpp | 3 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cache/cacherequests.h | 7 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/packageformat.h | 3 |
6 files changed, 268 insertions, 17 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 4a368c992..691047f90 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -1771,6 +1771,223 @@ TEST_CASE("zcache.failing.upstream") } } +TEST_CASE("zcache.rpc.partialchunks") +{ + using namespace std::literals; + using namespace utils; + + ZenConfig LocalCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance Server(TestEnv); + SpawnServer(Server, LocalCfg); + + std::vector<CompressedBuffer> Attachments; + + const auto BaseUri = fmt::format("http://localhost:{}/z$", Server.GetBasePort()); + + auto GenerateKey = [](std::string_view Bucket, size_t KeyIndex) -> CacheKey { + IoHash KeyHash; + ((size_t*)(KeyHash.Hash))[0] = KeyIndex; + return CacheKey::Create(Bucket, KeyHash); + }; + + auto CreateSemiRandomBlob = [](size_t AttachmentSize) -> CompressedBuffer { + // Convoluted way to get a compressed buffer whose result it large enough to be a separate file + // but also does actually compress + const size_t PartCount = (AttachmentSize / (1u * 1024u * 64)) + 1; + const size_t PartSize = AttachmentSize / PartCount; + auto Part = SharedBuffer(CreateRandomBlob(PartSize)); + std::vector<SharedBuffer> Parts(PartCount, Part); + size_t RemainPartSize = AttachmentSize - (PartSize * PartCount); + if (RemainPartSize > 0) + { + Parts.push_back(SharedBuffer(CreateRandomBlob(RemainPartSize))); + } + CompressedBuffer Value = CompressedBuffer::Compress(CompositeBuffer(std::move(Parts))); + return Value; + }; + + auto AppendCacheRecord = [&CreateSemiRandomBlob](cacherequests::PutCacheRecordsRequest& Request, + const CacheKey& CacheKey, + size_t AttachmentCount, + size_t AttachmentsSize, + CachePolicy RecordPolicy) -> std::vector<std::pair<Oid, CompressedBuffer>> { + std::vector<std::pair<Oid, CompressedBuffer>> AttachmentBuffers; + std::vector<cacherequests::PutCacheRecordRequestValue> Attachments; + for (size_t AttachmentIndex = 0; AttachmentIndex < AttachmentCount; AttachmentIndex++) + { + CompressedBuffer Value = CreateSemiRandomBlob(AttachmentsSize); + AttachmentBuffers.push_back(std::make_pair(Oid::NewOid(), Value)); + Attachments.push_back({.Id = AttachmentBuffers.back().first, .Body = std::move(Value)}); + } + Request.Requests.push_back({.Key = CacheKey, .Values = Attachments, .Policy = RecordPolicy}); + return AttachmentBuffers; + }; + + auto PutCacheRecords = [&AppendCacheRecord, &GenerateKey]( + std::string_view BaseUri, + std::string_view Namespace, + std::string_view Bucket, + size_t KeyOffset, + size_t Num, + size_t AttachmentCount, + size_t AttachmentsSize = + 8192) -> std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> { + std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> Keys; + + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + for (size_t Key = 1; Key <= Num; ++Key) + { + const CacheKey NewCacheKey = GenerateKey(Bucket, KeyOffset + Key); + std::vector<std::pair<Oid, CompressedBuffer>> Attachments = + AppendCacheRecord(Request, NewCacheKey, AttachmentCount, AttachmentsSize, CachePolicy::Default); + Keys.push_back(std::make_pair(NewCacheKey, std::move(Attachments))); + } + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + if (Result.status_code != 200) + { + ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason); + Keys.clear(); + } + + return Keys; + }; + + std::string_view TestBucket = "partialcachevaluetests"sv; + std::string_view TestNamespace = "ue4.ddc"sv; + auto RecordsWithSmallAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 0, 3, 2, 4096u); + CHECK(RecordsWithSmallAttachments.size() == 3); + auto RecordsWithLargeAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 10, 1, 2, 8u * 1024u * 1024u); + CHECK(RecordsWithLargeAttachments.size() == 1); + + struct PartialOptions + { + uint64_t Offset = 0ull; + uint64_t Size = ~0ull; + RpcAcceptOptions AcceptOptions = RpcAcceptOptions::kNone; + }; + + auto GetCacheChunk = [](std::string_view BaseUri, + std::string_view Namespace, + const CacheKey& Key, + const Oid& ValueId, + const PartialOptions& Options = {}) -> cacherequests::GetCacheChunksResult { + cacherequests::GetCacheChunksRequest Request = { + .AcceptMagic = kCbPkgMagic, + .AcceptOptions = (uint16_t)Options.AcceptOptions, + .Namespace = std::string(Namespace), + .Requests = {{.Key = Key, .ValueId = ValueId, .RawOffset = Options.Offset, .RawSize = Options.Size}}}; + CbPackage Package; + CHECK(Request.Format(Package)); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + CHECK(Result.status_code == 200); + + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + bool Loaded = !Response.IsNull(); + CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); + cacherequests::GetCacheChunksResult GetCacheChunksResult; + CHECK(GetCacheChunksResult.Parse(Response)); + return GetCacheChunksResult; + }; + + auto GetAndVerifyChunk = [&GetCacheChunk](std::string_view BaseUri, + std::string_view Namespace, + const CacheKey& Key, + const Oid& ChunkId, + const CompressedBuffer& VerifyData, + const PartialOptions& Options = {}) { + cacherequests::GetCacheChunksResult Result = GetCacheChunk(BaseUri, Namespace, Key, ChunkId, Options); + CHECK(Result.Results.size() == 1); + bool CanGetPartial = ((uint16_t)Options.AcceptOptions & (uint16_t)RpcAcceptOptions::kAllowPartialCacheChunks); + if (!CanGetPartial) + { + CHECK(Result.Results[0].RawOffset == 0); + CHECK(Result.Results[0].Body.GetCompressedSize() == VerifyData.GetCompressedSize()); + } + IoBuffer SourceDecompressed = VerifyData.Decompress(Options.Offset, Options.Size).AsIoBuffer(); + IoBuffer ReceivedDecompressed = + Result.Results[0].Body.Decompress(Options.Offset - Result.Results[0].RawOffset, Options.Size).AsIoBuffer(); + CHECK(SourceDecompressed.GetView().EqualBytes(ReceivedDecompressed.GetView())); + }; + + GetAndVerifyChunk(BaseUri, + TestNamespace, + RecordsWithSmallAttachments[0].first, + RecordsWithSmallAttachments[0].second[0].first, + RecordsWithSmallAttachments[0].second[0].second); + GetAndVerifyChunk(BaseUri, + TestNamespace, + RecordsWithSmallAttachments[0].first, + RecordsWithSmallAttachments[0].second[0].first, + RecordsWithSmallAttachments[0].second[0].second, + PartialOptions{.Offset = 378, .Size = 519, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithSmallAttachments[0].first, + RecordsWithSmallAttachments[0].second[0].first, + RecordsWithSmallAttachments[0].second[0].second, + PartialOptions{.Offset = 378, + .Size = 519, + .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks}); + GetAndVerifyChunk(BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); + GetAndVerifyChunk(BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowPartialCacheChunks}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, + .Size = 512u * 1024u, + .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, + .Size = 512u * 1024u, + .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences | + RpcAcceptOptions::kAllowPartialCacheChunks}); +} + TEST_CASE("zcache.rpc.allpolicies") { using namespace std::literals; @@ -2326,9 +2543,9 @@ public: ZenServerInstance& GetInstance(int Index) { return *m_Instances[Index]; } private: - std::string m_HelperId; - int m_ServerCount = 0; - std::vector<std::unique_ptr<ZenServerInstance> > m_Instances; + std::string m_HelperId; + int m_ServerCount = 0; + std::vector<std::unique_ptr<ZenServerInstance>> m_Instances; }; TEST_CASE("http.basics") @@ -2409,7 +2626,7 @@ OidAsString(const Oid& Id) } CbPackage -CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer> >& Attachments) +CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) { CbPackage Package; CbObjectWriter Object; @@ -2436,10 +2653,10 @@ CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, Compresse return Package; }; -std::vector<std::pair<Oid, CompressedBuffer> > +std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes) { - std::vector<std::pair<Oid, CompressedBuffer> > Result; + std::vector<std::pair<Oid, CompressedBuffer>> Result; Result.reserve(Sizes.size()); for (size_t Size : Sizes) { @@ -2500,7 +2717,7 @@ TEST_CASE("project.remote") OpIds.emplace_back(Oid::NewOid()); } - std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer> >, Oid::Hasher> Attachments; + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; { std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489, 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index e6ba6629d..27bc4b016 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -229,7 +229,7 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, } else if (Method == "GetCacheChunks"sv) { - OutResultPackage = HandleRpcGetCacheChunks(Context, Object); + OutResultPackage = HandleRpcGetCacheChunks(Context, OutAcceptFlags, Object); } else { @@ -1102,7 +1102,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO } CbPackage -CacheRpcHandler::HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView RpcRequest) +CacheRpcHandler::HandleRpcGetCacheChunks(const CacheRequestContext& Context, RpcAcceptOptions AcceptOptions, CbObjectView RpcRequest) { ZEN_TRACE_CPU("Z$::RpcGetCacheChunks"); using namespace cache::detail; @@ -1133,7 +1133,7 @@ CacheRpcHandler::HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbO GetUpstreamCacheChunks(Context, Namespace, UpstreamChunks, RequestKeys, Requests); // Send the payload and descriptive data about each chunk to the client - return WriteGetCacheChunksResponse(Context, Namespace, Requests); + return WriteGetCacheChunksResponse(Context, Namespace, AcceptOptions, Requests); } bool @@ -1542,12 +1542,15 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, CbPackage CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequestContext& Context, std::string_view Namespace, + RpcAcceptOptions AcceptOptions, std::vector<cache::detail::ChunkRequest>& Requests) { ZEN_TRACE_CPU("Z$::WriteGetCacheChunksResponse"); using namespace cache::detail; + const bool AcceptsPartialChunks = EnumHasAnyFlags(AcceptOptions, RpcAcceptOptions::kAllowPartialCacheChunks); + CbPackage RpcResponse; CbObjectWriter Writer; @@ -1563,6 +1566,33 @@ CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequest Writer.AddHash("RawHash"sv, Request.Key->ChunkId); if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { + if (AcceptsPartialChunks && (Request.RequestedOffset != 0 || Request.RequestedSize < Request.RawSize)) + { + uint64_t RawOffset = 0; + if (Request.RequestedOffset > 0) + { + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize = 0; + const bool bOk = Request.Value.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize); + ZEN_ASSERT(bOk); + if (BlockSize > 0) + { + RawOffset = (Request.RequestedOffset / BlockSize) * BlockSize; + } + else + { + RawOffset = Request.RequestedOffset; + } + } + // Technically the receiver can figure this offset out based on the assumption that we reply with + // a set of blocks that encapsulates the requested range, but since the UE side was not initially + // written to handle this we need to indicate that we do indeed reply with a partial response. + // And as we already need to add a field, why not provide some useful information so the client + // don't need to calculate this for us... + Writer.AddInteger("RawOffset", RawOffset); + Request.Value = Request.Value.GetRange(Request.RequestedOffset, Request.RequestedSize); + } RpcResponse.AddAttachment(CbAttachment(Request.Value, Request.Key->ChunkId)); } else diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h index c57e2818c..a98010a7c 100644 --- a/src/zenstore/include/zenstore/cache/cacherpc.h +++ b/src/zenstore/include/zenstore/cache/cacherpc.h @@ -104,7 +104,7 @@ private: CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest); CbPackage HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest); CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest); - CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest); + CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, RpcAcceptOptions AcceptOptions, CbObjectView BatchRequest); PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); @@ -139,6 +139,7 @@ private: /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ CbPackage WriteGetCacheChunksResponse(const CacheRequestContext& Context, std::string_view Namespace, + RpcAcceptOptions AcceptOptions, std::vector<cache::detail::ChunkRequest>& Requests); LoggerRef Log() { return m_Log; } @@ -152,4 +153,4 @@ private: bool AreDiskWritesAllowed() const; }; -} // namespace zen
\ No newline at end of file +} // namespace zen diff --git a/src/zenutil/cache/cacherequests.cpp b/src/zenutil/cache/cacherequests.cpp index f4de6bacd..442cf0dfc 100644 --- a/src/zenutil/cache/cacherequests.cpp +++ b/src/zenutil/cache/cacherequests.cpp @@ -792,8 +792,9 @@ namespace cacherequests { bool Succeeded = !RawHashField.HasError(); if (Succeeded) { + ValueResult.RawOffset = RecordObject["RawOffset"].AsUInt64(0); const CbAttachment* Attachment = Package.FindAttachment(ValueResult.RawHash); - ValueResult.Body = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer(); + ValueResult.Body = Attachment ? Attachment->AsCompressedBinary().MakeOwned() : CompressedBuffer(); if (ValueResult.Body) { ValueResult.RawSize = ValueResult.Body.DecodeRawSize(); diff --git a/src/zenutil/include/zenutil/cache/cacherequests.h b/src/zenutil/include/zenutil/cache/cacherequests.h index abc5a3c13..0913efc65 100644 --- a/src/zenutil/include/zenutil/cache/cacherequests.h +++ b/src/zenutil/include/zenutil/cache/cacherequests.h @@ -195,9 +195,10 @@ namespace cacherequests { struct CacheValueResult { - uint64_t RawSize = 0; - IoHash RawHash = IoHash::Zero; - CompressedBuffer Body = CompressedBuffer::Null; + uint64_t RawSize = 0; + uint64_t RawOffset = 0; + IoHash RawHash = IoHash::Zero; + CompressedBuffer Body = CompressedBuffer::Null; }; struct CacheValuesResult diff --git a/src/zenutil/include/zenutil/packageformat.h b/src/zenutil/include/zenutil/packageformat.h index 0b8495fbd..c90b840da 100644 --- a/src/zenutil/include/zenutil/packageformat.h +++ b/src/zenutil/include/zenutil/packageformat.h @@ -89,7 +89,8 @@ enum class RpcAcceptOptions : uint16_t { kNone = 0, kAllowLocalReferences = (1u << 0), - kAllowPartialLocalReferences = (1u << 1) + kAllowPartialLocalReferences = (1u << 1), + kAllowPartialCacheChunks = (1u << 2) }; gsl_DEFINE_ENUM_BITMASK_OPERATORS(RpcAcceptOptions); |