aboutsummaryrefslogtreecommitdiff
path: root/zenserver-test/zenserver-test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver-test/zenserver-test.cpp')
-rw-r--r--zenserver-test/zenserver-test.cpp194
1 files changed, 194 insertions, 0 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 82b770e3c..fb5a6820b 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -1801,6 +1801,200 @@ TEST_CASE("zcache.failing.upstream")
}
}
+TEST_CASE("zcache.rpc.prefetch.in.cache")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ const uint16_t RemotePortNumber = 13338;
+ ZenConfig UpstreamCfg = ZenConfig::New(RemotePortNumber);
+ ZenServerInstance UpstreamServer(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(RemotePortNumber);
+ ZenServerInstance LocalServer(TestEnv);
+ const uint16_t LocalPortNumber = 13337;
+ const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
+ const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);
+
+ SpawnServer(UpstreamServer, UpstreamCfg);
+ SpawnServer(LocalServer, LocalCfg);
+
+ std::string_view TestBucket = "allpoliciestest"sv;
+ std::string_view TestNamespace = "ue4.ddc"sv;
+
+ auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request,
+ const zen::CacheKey& CacheKey,
+ size_t PayloadSize,
+ CachePolicy RecordPolicy) {
+ std::vector<uint32_t> Data;
+ Data.resize(PayloadSize / 4);
+ for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx)
+ {
+ Data[Idx] = (*reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0])) + Idx;
+ }
+
+ CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4));
+ Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy});
+ };
+
+ auto PutCacheRecords = [&AppendCacheRecord](
+ std::string_view BaseUri,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ size_t Num,
+ size_t KeyOffset,
+ size_t PayloadSize = 8192) -> std::unordered_map<IoHash, std::vector<IoHash>, IoHash::Hasher> {
+ std::unordered_map<IoHash, std::vector<IoHash>, IoHash::Hasher> OutKeys;
+
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ for (size_t Key = 1; Key <= Num; ++Key)
+ {
+ zen::IoHash KeyHash;
+ ((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key;
+ const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
+
+ AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default);
+ for (const auto& Attachment : Request.Requests.back().Values)
+ {
+ OutKeys[CacheKey.Hash].push_back(Attachment.RawHash);
+ }
+ }
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ if (Result.status_code != 200)
+ {
+ ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason);
+ return {};
+ }
+
+ return OutKeys;
+ };
+
+ struct GetCacheRecordResult
+ {
+ zen::CbPackage Response;
+ cacherequests::GetCacheRecordsResult Result;
+ bool Success = false;
+ };
+
+ auto GetCacheRecords = [](std::string_view BaseUri,
+ std::string_view Namespace,
+ std::span<zen::CacheKey> Keys,
+ zen::CachePolicy Policy) -> GetCacheRecordResult {
+ cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = Policy,
+ .Namespace = std::string(Namespace)};
+ for (const CacheKey& Key : Keys)
+ {
+ Request.Requests.push_back({.Key = Key});
+ }
+
+ CbObjectWriter RequestWriter;
+ CHECK(Request.Format(RequestWriter));
+
+ BinaryWriter Body;
+ RequestWriter.Save(Body);
+
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ GetCacheRecordResult OutResult;
+
+ if (Result.status_code == 200)
+ {
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ if (!Response.IsNull())
+ {
+ OutResult.Response = std::move(Response);
+ CHECK(OutResult.Result.Parse(OutResult.Response));
+ OutResult.Success = true;
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", Result.reason, Result.status_code);
+ }
+
+ return OutResult;
+ };
+
+ auto UpstreamKeys = PutCacheRecords(RemoteBaseUri, TestNamespace, TestBucket, 5, 0);
+
+ // Fetch the record with skip data, this should populate local cache with both record and attachments but
+ // only return the record to us.
+ {
+ cacherequests::GetCacheRecordsRequest RecordOnlyRequest = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = CachePolicy::Default | CachePolicy::SkipData,
+ .Namespace = std::string(TestNamespace)};
+ for (const auto& It : UpstreamKeys)
+ {
+ RecordOnlyRequest.Requests.push_back({.Key = {.Bucket = std::string(TestBucket), .Hash = It.first}});
+ }
+
+ CbObjectWriter RequestWriter;
+ CHECK(RecordOnlyRequest.Format(RequestWriter));
+
+ BinaryWriter Body;
+ RequestWriter.Save(Body);
+
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalBaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(!Response.IsNull());
+ cacherequests::GetCacheRecordsResult CacheRecordResult;
+ CHECK(CacheRecordResult.Parse(Response));
+ CHECK(CacheRecordResult.Results.size() == 5);
+ for (size_t I = 0; I < 5; ++I)
+ {
+ CHECK(CacheRecordResult.Results[I]->Values.size() == 1);
+ CHECK(CacheRecordResult.Results[I]->Values[0].RawHash != IoHash::Zero);
+ CHECK(CacheRecordResult.Results[I]->Values[0].RawSize != 0);
+ CHECK(!CacheRecordResult.Results[I]->Values[0].Body);
+ }
+ }
+ {
+ cacherequests::GetCacheRecordsRequest FullRecordRequest = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = CachePolicy::Local | CachePolicy::StoreRemote,
+ .Namespace = std::string(TestNamespace)};
+ for (const auto& It : UpstreamKeys)
+ {
+ FullRecordRequest.Requests.push_back({.Key = {.Bucket = std::string(TestBucket), .Hash = It.first}});
+ }
+
+ CbObjectWriter RequestWriter;
+ CHECK(FullRecordRequest.Format(RequestWriter));
+
+ BinaryWriter Body;
+ RequestWriter.Save(Body);
+
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalBaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(!Response.IsNull());
+ cacherequests::GetCacheRecordsResult CacheRecordResult;
+ CHECK(CacheRecordResult.Parse(Response));
+ CHECK(CacheRecordResult.Results.size() == 5);
+ for (size_t I = 0; I < 5; ++I)
+ {
+ CHECK(CacheRecordResult.Results[I]->Values.size() == 1);
+ CHECK(CacheRecordResult.Results[I]->Values[0].RawHash != IoHash::Zero);
+ CHECK(CacheRecordResult.Results[I]->Values[0].RawSize != 0);
+ CHECK(CacheRecordResult.Results[I]->Values[0].Body);
+ }
+ }
+}
+
TEST_CASE("zcache.rpc.allpolicies")
{
using namespace std::literals;