aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-03-21 10:58:28 +0100
committerGitHub Enterprise <[email protected]>2024-03-21 10:58:28 +0100
commit9b82209e3368de806a48554d85cb7c364973eb2d (patch)
tree8b52dbe1fd5a1a3255c51198a7b31bef77cf9009 /src
parentimproved process monitoring behaviour with invalid pids (#16) (diff)
downloadzen-9b82209e3368de806a48554d85cb7c364973eb2d.tar.xz
zen-9b82209e3368de806a48554d85cb7c364973eb2d.zip
add support for responding with partial cache chunks (#11)
* add support for responding with partial cache chunks
Diffstat (limited to 'src')
-rw-r--r--src/zenserver-test/zenserver-test.cpp231
-rw-r--r--src/zenstore/cache/cacherpc.cpp36
-rw-r--r--src/zenstore/include/zenstore/cache/cacherpc.h5
-rw-r--r--src/zenutil/cache/cacherequests.cpp3
-rw-r--r--src/zenutil/include/zenutil/cache/cacherequests.h7
-rw-r--r--src/zenutil/include/zenutil/packageformat.h3
6 files changed, 268 insertions, 17 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index 4a368c992..691047f90 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -1771,6 +1771,223 @@ TEST_CASE("zcache.failing.upstream")
}
}
+TEST_CASE("zcache.rpc.partialchunks")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ ZenConfig LocalCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance Server(TestEnv);
+ SpawnServer(Server, LocalCfg);
+
+ std::vector<CompressedBuffer> Attachments;
+
+ const auto BaseUri = fmt::format("http://localhost:{}/z$", Server.GetBasePort());
+
+ auto GenerateKey = [](std::string_view Bucket, size_t KeyIndex) -> CacheKey {
+ IoHash KeyHash;
+ ((size_t*)(KeyHash.Hash))[0] = KeyIndex;
+ return CacheKey::Create(Bucket, KeyHash);
+ };
+
+ auto CreateSemiRandomBlob = [](size_t AttachmentSize) -> CompressedBuffer {
+ // Convoluted way to get a compressed buffer whose result it large enough to be a separate file
+ // but also does actually compress
+ const size_t PartCount = (AttachmentSize / (1u * 1024u * 64)) + 1;
+ const size_t PartSize = AttachmentSize / PartCount;
+ auto Part = SharedBuffer(CreateRandomBlob(PartSize));
+ std::vector<SharedBuffer> Parts(PartCount, Part);
+ size_t RemainPartSize = AttachmentSize - (PartSize * PartCount);
+ if (RemainPartSize > 0)
+ {
+ Parts.push_back(SharedBuffer(CreateRandomBlob(RemainPartSize)));
+ }
+ CompressedBuffer Value = CompressedBuffer::Compress(CompositeBuffer(std::move(Parts)));
+ return Value;
+ };
+
+ auto AppendCacheRecord = [&CreateSemiRandomBlob](cacherequests::PutCacheRecordsRequest& Request,
+ const CacheKey& CacheKey,
+ size_t AttachmentCount,
+ size_t AttachmentsSize,
+ CachePolicy RecordPolicy) -> std::vector<std::pair<Oid, CompressedBuffer>> {
+ std::vector<std::pair<Oid, CompressedBuffer>> AttachmentBuffers;
+ std::vector<cacherequests::PutCacheRecordRequestValue> Attachments;
+ for (size_t AttachmentIndex = 0; AttachmentIndex < AttachmentCount; AttachmentIndex++)
+ {
+ CompressedBuffer Value = CreateSemiRandomBlob(AttachmentsSize);
+ AttachmentBuffers.push_back(std::make_pair(Oid::NewOid(), Value));
+ Attachments.push_back({.Id = AttachmentBuffers.back().first, .Body = std::move(Value)});
+ }
+ Request.Requests.push_back({.Key = CacheKey, .Values = Attachments, .Policy = RecordPolicy});
+ return AttachmentBuffers;
+ };
+
+ auto PutCacheRecords = [&AppendCacheRecord, &GenerateKey](
+ std::string_view BaseUri,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ size_t KeyOffset,
+ size_t Num,
+ size_t AttachmentCount,
+ size_t AttachmentsSize =
+ 8192) -> std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> {
+ std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> Keys;
+
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ for (size_t Key = 1; Key <= Num; ++Key)
+ {
+ const CacheKey NewCacheKey = GenerateKey(Bucket, KeyOffset + Key);
+ std::vector<std::pair<Oid, CompressedBuffer>> Attachments =
+ AppendCacheRecord(Request, NewCacheKey, AttachmentCount, AttachmentsSize, CachePolicy::Default);
+ Keys.push_back(std::make_pair(NewCacheKey, std::move(Attachments)));
+ }
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ if (Result.status_code != 200)
+ {
+ ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason);
+ Keys.clear();
+ }
+
+ return Keys;
+ };
+
+ std::string_view TestBucket = "partialcachevaluetests"sv;
+ std::string_view TestNamespace = "ue4.ddc"sv;
+ auto RecordsWithSmallAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 0, 3, 2, 4096u);
+ CHECK(RecordsWithSmallAttachments.size() == 3);
+ auto RecordsWithLargeAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 10, 1, 2, 8u * 1024u * 1024u);
+ CHECK(RecordsWithLargeAttachments.size() == 1);
+
+ struct PartialOptions
+ {
+ uint64_t Offset = 0ull;
+ uint64_t Size = ~0ull;
+ RpcAcceptOptions AcceptOptions = RpcAcceptOptions::kNone;
+ };
+
+ auto GetCacheChunk = [](std::string_view BaseUri,
+ std::string_view Namespace,
+ const CacheKey& Key,
+ const Oid& ValueId,
+ const PartialOptions& Options = {}) -> cacherequests::GetCacheChunksResult {
+ cacherequests::GetCacheChunksRequest Request = {
+ .AcceptMagic = kCbPkgMagic,
+ .AcceptOptions = (uint16_t)Options.AcceptOptions,
+ .Namespace = std::string(Namespace),
+ .Requests = {{.Key = Key, .ValueId = ValueId, .RawOffset = Options.Offset, .RawSize = Options.Size}}};
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ CHECK(Result.status_code == 200);
+
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ bool Loaded = !Response.IsNull();
+ CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load.");
+ cacherequests::GetCacheChunksResult GetCacheChunksResult;
+ CHECK(GetCacheChunksResult.Parse(Response));
+ return GetCacheChunksResult;
+ };
+
+ auto GetAndVerifyChunk = [&GetCacheChunk](std::string_view BaseUri,
+ std::string_view Namespace,
+ const CacheKey& Key,
+ const Oid& ChunkId,
+ const CompressedBuffer& VerifyData,
+ const PartialOptions& Options = {}) {
+ cacherequests::GetCacheChunksResult Result = GetCacheChunk(BaseUri, Namespace, Key, ChunkId, Options);
+ CHECK(Result.Results.size() == 1);
+ bool CanGetPartial = ((uint16_t)Options.AcceptOptions & (uint16_t)RpcAcceptOptions::kAllowPartialCacheChunks);
+ if (!CanGetPartial)
+ {
+ CHECK(Result.Results[0].RawOffset == 0);
+ CHECK(Result.Results[0].Body.GetCompressedSize() == VerifyData.GetCompressedSize());
+ }
+ IoBuffer SourceDecompressed = VerifyData.Decompress(Options.Offset, Options.Size).AsIoBuffer();
+ IoBuffer ReceivedDecompressed =
+ Result.Results[0].Body.Decompress(Options.Offset - Result.Results[0].RawOffset, Options.Size).AsIoBuffer();
+ CHECK(SourceDecompressed.GetView().EqualBytes(ReceivedDecompressed.GetView()));
+ };
+
+ GetAndVerifyChunk(BaseUri,
+ TestNamespace,
+ RecordsWithSmallAttachments[0].first,
+ RecordsWithSmallAttachments[0].second[0].first,
+ RecordsWithSmallAttachments[0].second[0].second);
+ GetAndVerifyChunk(BaseUri,
+ TestNamespace,
+ RecordsWithSmallAttachments[0].first,
+ RecordsWithSmallAttachments[0].second[0].first,
+ RecordsWithSmallAttachments[0].second[0].second,
+ PartialOptions{.Offset = 378, .Size = 519, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithSmallAttachments[0].first,
+ RecordsWithSmallAttachments[0].second[0].first,
+ RecordsWithSmallAttachments[0].second[0].second,
+ PartialOptions{.Offset = 378,
+ .Size = 519,
+ .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks});
+ GetAndVerifyChunk(BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.AcceptOptions = RpcAcceptOptions::kAllowLocalReferences});
+ GetAndVerifyChunk(BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowPartialCacheChunks});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u,
+ .Size = 512u * 1024u,
+ .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u,
+ .Size = 512u * 1024u,
+ .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences |
+ RpcAcceptOptions::kAllowPartialCacheChunks});
+}
+
TEST_CASE("zcache.rpc.allpolicies")
{
using namespace std::literals;
@@ -2326,9 +2543,9 @@ public:
ZenServerInstance& GetInstance(int Index) { return *m_Instances[Index]; }
private:
- std::string m_HelperId;
- int m_ServerCount = 0;
- std::vector<std::unique_ptr<ZenServerInstance> > m_Instances;
+ std::string m_HelperId;
+ int m_ServerCount = 0;
+ std::vector<std::unique_ptr<ZenServerInstance>> m_Instances;
};
TEST_CASE("http.basics")
@@ -2409,7 +2626,7 @@ OidAsString(const Oid& Id)
}
CbPackage
-CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer> >& Attachments)
+CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
{
CbPackage Package;
CbObjectWriter Object;
@@ -2436,10 +2653,10 @@ CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, Compresse
return Package;
};
-std::vector<std::pair<Oid, CompressedBuffer> >
+std::vector<std::pair<Oid, CompressedBuffer>>
CreateAttachments(const std::span<const size_t>& Sizes)
{
- std::vector<std::pair<Oid, CompressedBuffer> > Result;
+ std::vector<std::pair<Oid, CompressedBuffer>> Result;
Result.reserve(Sizes.size());
for (size_t Size : Sizes)
{
@@ -2500,7 +2717,7 @@ TEST_CASE("project.remote")
OpIds.emplace_back(Oid::NewOid());
}
- std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer> >, Oid::Hasher> Attachments;
+ std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments;
{
std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269,
2257, 3685, 3489, 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759,
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index e6ba6629d..27bc4b016 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -229,7 +229,7 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context,
}
else if (Method == "GetCacheChunks"sv)
{
- OutResultPackage = HandleRpcGetCacheChunks(Context, Object);
+ OutResultPackage = HandleRpcGetCacheChunks(Context, OutAcceptFlags, Object);
}
else
{
@@ -1102,7 +1102,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
}
CbPackage
-CacheRpcHandler::HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView RpcRequest)
+CacheRpcHandler::HandleRpcGetCacheChunks(const CacheRequestContext& Context, RpcAcceptOptions AcceptOptions, CbObjectView RpcRequest)
{
ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
using namespace cache::detail;
@@ -1133,7 +1133,7 @@ CacheRpcHandler::HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbO
GetUpstreamCacheChunks(Context, Namespace, UpstreamChunks, RequestKeys, Requests);
// Send the payload and descriptive data about each chunk to the client
- return WriteGetCacheChunksResponse(Context, Namespace, Requests);
+ return WriteGetCacheChunksResponse(Context, Namespace, AcceptOptions, Requests);
}
bool
@@ -1542,12 +1542,15 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
CbPackage
CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequestContext& Context,
std::string_view Namespace,
+ RpcAcceptOptions AcceptOptions,
std::vector<cache::detail::ChunkRequest>& Requests)
{
ZEN_TRACE_CPU("Z$::WriteGetCacheChunksResponse");
using namespace cache::detail;
+ const bool AcceptsPartialChunks = EnumHasAnyFlags(AcceptOptions, RpcAcceptOptions::kAllowPartialCacheChunks);
+
CbPackage RpcResponse;
CbObjectWriter Writer;
@@ -1563,6 +1566,33 @@ CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequest
Writer.AddHash("RawHash"sv, Request.Key->ChunkId);
if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
+ if (AcceptsPartialChunks && (Request.RequestedOffset != 0 || Request.RequestedSize < Request.RawSize))
+ {
+ uint64_t RawOffset = 0;
+ if (Request.RequestedOffset > 0)
+ {
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize = 0;
+ const bool bOk = Request.Value.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize);
+ ZEN_ASSERT(bOk);
+ if (BlockSize > 0)
+ {
+ RawOffset = (Request.RequestedOffset / BlockSize) * BlockSize;
+ }
+ else
+ {
+ RawOffset = Request.RequestedOffset;
+ }
+ }
+ // Technically the receiver can figure this offset out based on the assumption that we reply with
+ // a set of blocks that encapsulates the requested range, but since the UE side was not initially
+ // written to handle this we need to indicate that we do indeed reply with a partial response.
+ // And as we already need to add a field, why not provide some useful information so the client
+ // don't need to calculate this for us...
+ Writer.AddInteger("RawOffset", RawOffset);
+ Request.Value = Request.Value.GetRange(Request.RequestedOffset, Request.RequestedSize);
+ }
RpcResponse.AddAttachment(CbAttachment(Request.Value, Request.Key->ChunkId));
}
else
diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h
index c57e2818c..a98010a7c 100644
--- a/src/zenstore/include/zenstore/cache/cacherpc.h
+++ b/src/zenstore/include/zenstore/cache/cacherpc.h
@@ -104,7 +104,7 @@ private:
CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest);
CbPackage HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest);
CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest);
- CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest);
+ CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, RpcAcceptOptions AcceptOptions, CbObjectView BatchRequest);
PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
@@ -139,6 +139,7 @@ private:
/** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
CbPackage WriteGetCacheChunksResponse(const CacheRequestContext& Context,
std::string_view Namespace,
+ RpcAcceptOptions AcceptOptions,
std::vector<cache::detail::ChunkRequest>& Requests);
LoggerRef Log() { return m_Log; }
@@ -152,4 +153,4 @@ private:
bool AreDiskWritesAllowed() const;
};
-} // namespace zen \ No newline at end of file
+} // namespace zen
diff --git a/src/zenutil/cache/cacherequests.cpp b/src/zenutil/cache/cacherequests.cpp
index f4de6bacd..442cf0dfc 100644
--- a/src/zenutil/cache/cacherequests.cpp
+++ b/src/zenutil/cache/cacherequests.cpp
@@ -792,8 +792,9 @@ namespace cacherequests {
bool Succeeded = !RawHashField.HasError();
if (Succeeded)
{
+ ValueResult.RawOffset = RecordObject["RawOffset"].AsUInt64(0);
const CbAttachment* Attachment = Package.FindAttachment(ValueResult.RawHash);
- ValueResult.Body = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer();
+ ValueResult.Body = Attachment ? Attachment->AsCompressedBinary().MakeOwned() : CompressedBuffer();
if (ValueResult.Body)
{
ValueResult.RawSize = ValueResult.Body.DecodeRawSize();
diff --git a/src/zenutil/include/zenutil/cache/cacherequests.h b/src/zenutil/include/zenutil/cache/cacherequests.h
index abc5a3c13..0913efc65 100644
--- a/src/zenutil/include/zenutil/cache/cacherequests.h
+++ b/src/zenutil/include/zenutil/cache/cacherequests.h
@@ -195,9 +195,10 @@ namespace cacherequests {
struct CacheValueResult
{
- uint64_t RawSize = 0;
- IoHash RawHash = IoHash::Zero;
- CompressedBuffer Body = CompressedBuffer::Null;
+ uint64_t RawSize = 0;
+ uint64_t RawOffset = 0;
+ IoHash RawHash = IoHash::Zero;
+ CompressedBuffer Body = CompressedBuffer::Null;
};
struct CacheValuesResult
diff --git a/src/zenutil/include/zenutil/packageformat.h b/src/zenutil/include/zenutil/packageformat.h
index 0b8495fbd..c90b840da 100644
--- a/src/zenutil/include/zenutil/packageformat.h
+++ b/src/zenutil/include/zenutil/packageformat.h
@@ -89,7 +89,8 @@ enum class RpcAcceptOptions : uint16_t
{
kNone = 0,
kAllowLocalReferences = (1u << 0),
- kAllowPartialLocalReferences = (1u << 1)
+ kAllowPartialLocalReferences = (1u << 1),
+ kAllowPartialCacheChunks = (1u << 2)
};
gsl_DEFINE_ENUM_BITMASK_OPERATORS(RpcAcceptOptions);