diff options
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 194 |
1 files changed, 194 insertions, 0 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 82b770e3c..fb5a6820b 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1801,6 +1801,200 @@ TEST_CASE("zcache.failing.upstream") } } +TEST_CASE("zcache.rpc.prefetch.in.cache") +{ + using namespace std::literals; + using namespace utils; + + const uint16_t RemotePortNumber = 13338; + ZenConfig UpstreamCfg = ZenConfig::New(RemotePortNumber); + ZenServerInstance UpstreamServer(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(RemotePortNumber); + ZenServerInstance LocalServer(TestEnv); + const uint16_t LocalPortNumber = 13337; + const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); + const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); + + SpawnServer(UpstreamServer, UpstreamCfg); + SpawnServer(LocalServer, LocalCfg); + + std::string_view TestBucket = "allpoliciestest"sv; + std::string_view TestNamespace = "ue4.ddc"sv; + + auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, + const zen::CacheKey& CacheKey, + size_t PayloadSize, + CachePolicy RecordPolicy) { + std::vector<uint32_t> Data; + Data.resize(PayloadSize / 4); + for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx) + { + Data[Idx] = (*reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0])) + Idx; + } + + CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4)); + Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy}); + }; + + auto PutCacheRecords = [&AppendCacheRecord]( + std::string_view BaseUri, + std::string_view Namespace, + std::string_view Bucket, + size_t Num, + size_t KeyOffset, + size_t PayloadSize = 8192) -> std::unordered_map<IoHash, std::vector<IoHash>, IoHash::Hasher> { + std::unordered_map<IoHash, std::vector<IoHash>, IoHash::Hasher> OutKeys; + + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + for (size_t Key = 1; Key <= Num; ++Key) + { + zen::IoHash KeyHash; + ((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key; + const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); + + AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default); + for (const auto& Attachment : Request.Requests.back().Values) + { + OutKeys[CacheKey.Hash].push_back(Attachment.RawHash); + } + } + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + if (Result.status_code != 200) + { + ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason); + return {}; + } + + return OutKeys; + }; + + struct GetCacheRecordResult + { + zen::CbPackage Response; + cacherequests::GetCacheRecordsResult Result; + bool Success = false; + }; + + auto GetCacheRecords = [](std::string_view BaseUri, + std::string_view Namespace, + std::span<zen::CacheKey> Keys, + zen::CachePolicy Policy) -> GetCacheRecordResult { + cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = Policy, + .Namespace = std::string(Namespace)}; + for (const CacheKey& Key : Keys) + { + Request.Requests.push_back({.Key = Key}); + } + + CbObjectWriter RequestWriter; + CHECK(Request.Format(RequestWriter)); + + BinaryWriter Body; + RequestWriter.Save(Body); + + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + GetCacheRecordResult OutResult; + + if (Result.status_code == 200) + { + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + if (!Response.IsNull()) + { + OutResult.Response = std::move(Response); + CHECK(OutResult.Result.Parse(OutResult.Response)); + OutResult.Success = true; + } + } + else + { + ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", Result.reason, Result.status_code); + } + + return OutResult; + }; + + auto UpstreamKeys = PutCacheRecords(RemoteBaseUri, TestNamespace, TestBucket, 5, 0); + + // Fetch the record with skip data, this should populate local cache with both record and attachments but + // only return the record to us. + { + cacherequests::GetCacheRecordsRequest RecordOnlyRequest = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = CachePolicy::Default | CachePolicy::SkipData, + .Namespace = std::string(TestNamespace)}; + for (const auto& It : UpstreamKeys) + { + RecordOnlyRequest.Requests.push_back({.Key = {.Bucket = std::string(TestBucket), .Hash = It.first}}); + } + + CbObjectWriter RequestWriter; + CHECK(RecordOnlyRequest.Format(RequestWriter)); + + BinaryWriter Body; + RequestWriter.Save(Body); + + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalBaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK(Result.status_code == 200); + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(!Response.IsNull()); + cacherequests::GetCacheRecordsResult CacheRecordResult; + CHECK(CacheRecordResult.Parse(Response)); + CHECK(CacheRecordResult.Results.size() == 5); + for (size_t I = 0; I < 5; ++I) + { + CHECK(CacheRecordResult.Results[I]->Values.size() == 1); + CHECK(CacheRecordResult.Results[I]->Values[0].RawHash != IoHash::Zero); + CHECK(CacheRecordResult.Results[I]->Values[0].RawSize != 0); + CHECK(!CacheRecordResult.Results[I]->Values[0].Body); + } + } + { + cacherequests::GetCacheRecordsRequest FullRecordRequest = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = CachePolicy::Local | CachePolicy::StoreRemote, + .Namespace = std::string(TestNamespace)}; + for (const auto& It : UpstreamKeys) + { + FullRecordRequest.Requests.push_back({.Key = {.Bucket = std::string(TestBucket), .Hash = It.first}}); + } + + CbObjectWriter RequestWriter; + CHECK(FullRecordRequest.Format(RequestWriter)); + + BinaryWriter Body; + RequestWriter.Save(Body); + + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalBaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK(Result.status_code == 200); + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(!Response.IsNull()); + cacherequests::GetCacheRecordsResult CacheRecordResult; + CHECK(CacheRecordResult.Parse(Response)); + CHECK(CacheRecordResult.Results.size() == 5); + for (size_t I = 0; I < 5; ++I) + { + CHECK(CacheRecordResult.Results[I]->Values.size() == 1); + CHECK(CacheRecordResult.Results[I]->Values[0].RawHash != IoHash::Zero); + CHECK(CacheRecordResult.Results[I]->Values[0].RawSize != 0); + CHECK(CacheRecordResult.Results[I]->Values[0].Body); + } + } +} + TEST_CASE("zcache.rpc.allpolicies") { using namespace std::literals; |