aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver-test/zenserver-test.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-03-21 10:58:28 +0100
committerGitHub Enterprise <[email protected]>2024-03-21 10:58:28 +0100
commit9b82209e3368de806a48554d85cb7c364973eb2d (patch)
tree8b52dbe1fd5a1a3255c51198a7b31bef77cf9009 /src/zenserver-test/zenserver-test.cpp
parentimproved process monitoring behaviour with invalid pids (#16) (diff)
downloadzen-9b82209e3368de806a48554d85cb7c364973eb2d.tar.xz
zen-9b82209e3368de806a48554d85cb7c364973eb2d.zip
add support for responding with partial cache chunks (#11)
* add support for responding with partial cache chunks
Diffstat (limited to 'src/zenserver-test/zenserver-test.cpp')
-rw-r--r--src/zenserver-test/zenserver-test.cpp231
1 files changed, 224 insertions, 7 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index 4a368c992..691047f90 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -1771,6 +1771,223 @@ TEST_CASE("zcache.failing.upstream")
}
}
+TEST_CASE("zcache.rpc.partialchunks")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ ZenConfig LocalCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance Server(TestEnv);
+ SpawnServer(Server, LocalCfg);
+
+ std::vector<CompressedBuffer> Attachments;
+
+ const auto BaseUri = fmt::format("http://localhost:{}/z$", Server.GetBasePort());
+
+ auto GenerateKey = [](std::string_view Bucket, size_t KeyIndex) -> CacheKey {
+ IoHash KeyHash;
+ ((size_t*)(KeyHash.Hash))[0] = KeyIndex;
+ return CacheKey::Create(Bucket, KeyHash);
+ };
+
+ auto CreateSemiRandomBlob = [](size_t AttachmentSize) -> CompressedBuffer {
+ // Convoluted way to get a compressed buffer whose result it large enough to be a separate file
+ // but also does actually compress
+ const size_t PartCount = (AttachmentSize / (1u * 1024u * 64)) + 1;
+ const size_t PartSize = AttachmentSize / PartCount;
+ auto Part = SharedBuffer(CreateRandomBlob(PartSize));
+ std::vector<SharedBuffer> Parts(PartCount, Part);
+ size_t RemainPartSize = AttachmentSize - (PartSize * PartCount);
+ if (RemainPartSize > 0)
+ {
+ Parts.push_back(SharedBuffer(CreateRandomBlob(RemainPartSize)));
+ }
+ CompressedBuffer Value = CompressedBuffer::Compress(CompositeBuffer(std::move(Parts)));
+ return Value;
+ };
+
+ auto AppendCacheRecord = [&CreateSemiRandomBlob](cacherequests::PutCacheRecordsRequest& Request,
+ const CacheKey& CacheKey,
+ size_t AttachmentCount,
+ size_t AttachmentsSize,
+ CachePolicy RecordPolicy) -> std::vector<std::pair<Oid, CompressedBuffer>> {
+ std::vector<std::pair<Oid, CompressedBuffer>> AttachmentBuffers;
+ std::vector<cacherequests::PutCacheRecordRequestValue> Attachments;
+ for (size_t AttachmentIndex = 0; AttachmentIndex < AttachmentCount; AttachmentIndex++)
+ {
+ CompressedBuffer Value = CreateSemiRandomBlob(AttachmentsSize);
+ AttachmentBuffers.push_back(std::make_pair(Oid::NewOid(), Value));
+ Attachments.push_back({.Id = AttachmentBuffers.back().first, .Body = std::move(Value)});
+ }
+ Request.Requests.push_back({.Key = CacheKey, .Values = Attachments, .Policy = RecordPolicy});
+ return AttachmentBuffers;
+ };
+
+ auto PutCacheRecords = [&AppendCacheRecord, &GenerateKey](
+ std::string_view BaseUri,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ size_t KeyOffset,
+ size_t Num,
+ size_t AttachmentCount,
+ size_t AttachmentsSize =
+ 8192) -> std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> {
+ std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> Keys;
+
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ for (size_t Key = 1; Key <= Num; ++Key)
+ {
+ const CacheKey NewCacheKey = GenerateKey(Bucket, KeyOffset + Key);
+ std::vector<std::pair<Oid, CompressedBuffer>> Attachments =
+ AppendCacheRecord(Request, NewCacheKey, AttachmentCount, AttachmentsSize, CachePolicy::Default);
+ Keys.push_back(std::make_pair(NewCacheKey, std::move(Attachments)));
+ }
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ if (Result.status_code != 200)
+ {
+ ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason);
+ Keys.clear();
+ }
+
+ return Keys;
+ };
+
+ std::string_view TestBucket = "partialcachevaluetests"sv;
+ std::string_view TestNamespace = "ue4.ddc"sv;
+ auto RecordsWithSmallAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 0, 3, 2, 4096u);
+ CHECK(RecordsWithSmallAttachments.size() == 3);
+ auto RecordsWithLargeAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 10, 1, 2, 8u * 1024u * 1024u);
+ CHECK(RecordsWithLargeAttachments.size() == 1);
+
+ struct PartialOptions
+ {
+ uint64_t Offset = 0ull;
+ uint64_t Size = ~0ull;
+ RpcAcceptOptions AcceptOptions = RpcAcceptOptions::kNone;
+ };
+
+ auto GetCacheChunk = [](std::string_view BaseUri,
+ std::string_view Namespace,
+ const CacheKey& Key,
+ const Oid& ValueId,
+ const PartialOptions& Options = {}) -> cacherequests::GetCacheChunksResult {
+ cacherequests::GetCacheChunksRequest Request = {
+ .AcceptMagic = kCbPkgMagic,
+ .AcceptOptions = (uint16_t)Options.AcceptOptions,
+ .Namespace = std::string(Namespace),
+ .Requests = {{.Key = Key, .ValueId = ValueId, .RawOffset = Options.Offset, .RawSize = Options.Size}}};
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ CHECK(Result.status_code == 200);
+
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ bool Loaded = !Response.IsNull();
+ CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load.");
+ cacherequests::GetCacheChunksResult GetCacheChunksResult;
+ CHECK(GetCacheChunksResult.Parse(Response));
+ return GetCacheChunksResult;
+ };
+
+ auto GetAndVerifyChunk = [&GetCacheChunk](std::string_view BaseUri,
+ std::string_view Namespace,
+ const CacheKey& Key,
+ const Oid& ChunkId,
+ const CompressedBuffer& VerifyData,
+ const PartialOptions& Options = {}) {
+ cacherequests::GetCacheChunksResult Result = GetCacheChunk(BaseUri, Namespace, Key, ChunkId, Options);
+ CHECK(Result.Results.size() == 1);
+ bool CanGetPartial = ((uint16_t)Options.AcceptOptions & (uint16_t)RpcAcceptOptions::kAllowPartialCacheChunks);
+ if (!CanGetPartial)
+ {
+ CHECK(Result.Results[0].RawOffset == 0);
+ CHECK(Result.Results[0].Body.GetCompressedSize() == VerifyData.GetCompressedSize());
+ }
+ IoBuffer SourceDecompressed = VerifyData.Decompress(Options.Offset, Options.Size).AsIoBuffer();
+ IoBuffer ReceivedDecompressed =
+ Result.Results[0].Body.Decompress(Options.Offset - Result.Results[0].RawOffset, Options.Size).AsIoBuffer();
+ CHECK(SourceDecompressed.GetView().EqualBytes(ReceivedDecompressed.GetView()));
+ };
+
+ GetAndVerifyChunk(BaseUri,
+ TestNamespace,
+ RecordsWithSmallAttachments[0].first,
+ RecordsWithSmallAttachments[0].second[0].first,
+ RecordsWithSmallAttachments[0].second[0].second);
+ GetAndVerifyChunk(BaseUri,
+ TestNamespace,
+ RecordsWithSmallAttachments[0].first,
+ RecordsWithSmallAttachments[0].second[0].first,
+ RecordsWithSmallAttachments[0].second[0].second,
+ PartialOptions{.Offset = 378, .Size = 519, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithSmallAttachments[0].first,
+ RecordsWithSmallAttachments[0].second[0].first,
+ RecordsWithSmallAttachments[0].second[0].second,
+ PartialOptions{.Offset = 378,
+ .Size = 519,
+ .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks});
+ GetAndVerifyChunk(BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.AcceptOptions = RpcAcceptOptions::kAllowLocalReferences});
+ GetAndVerifyChunk(BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowPartialCacheChunks});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u,
+ .Size = 512u * 1024u,
+ .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u,
+ .Size = 512u * 1024u,
+ .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences |
+ RpcAcceptOptions::kAllowPartialCacheChunks});
+}
+
TEST_CASE("zcache.rpc.allpolicies")
{
using namespace std::literals;
@@ -2326,9 +2543,9 @@ public:
ZenServerInstance& GetInstance(int Index) { return *m_Instances[Index]; }
private:
- std::string m_HelperId;
- int m_ServerCount = 0;
- std::vector<std::unique_ptr<ZenServerInstance> > m_Instances;
+ std::string m_HelperId;
+ int m_ServerCount = 0;
+ std::vector<std::unique_ptr<ZenServerInstance>> m_Instances;
};
TEST_CASE("http.basics")
@@ -2409,7 +2626,7 @@ OidAsString(const Oid& Id)
}
CbPackage
-CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer> >& Attachments)
+CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
{
CbPackage Package;
CbObjectWriter Object;
@@ -2436,10 +2653,10 @@ CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, Compresse
return Package;
};
-std::vector<std::pair<Oid, CompressedBuffer> >
+std::vector<std::pair<Oid, CompressedBuffer>>
CreateAttachments(const std::span<const size_t>& Sizes)
{
- std::vector<std::pair<Oid, CompressedBuffer> > Result;
+ std::vector<std::pair<Oid, CompressedBuffer>> Result;
Result.reserve(Sizes.size());
for (size_t Size : Sizes)
{
@@ -2500,7 +2717,7 @@ TEST_CASE("project.remote")
OpIds.emplace_back(Oid::NewOid());
}
- std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer> >, Oid::Hasher> Attachments;
+ std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments;
{
std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269,
2257, 3685, 3489, 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759,