// Copyright Epic Games, Inc. All Rights Reserved. #if ZEN_WITH_TESTS # include "zenserver-test.h" # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include "cacherequests.h" # include namespace zen::tests { TEST_CASE("zcache.basic") { using namespace std::literals; std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const int kIterationCount = 100; auto HashKey = [](int i) -> zen::IoHash { return zen::IoHash::HashBuffer(&i, sizeof i); }; { ZenServerInstance Instance1(TestEnv); Instance1.SetTestDir(TestDir); const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady(); const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); // Populate with some simple data HttpClient Http{BaseUri}; for (int i = 0; i < kIterationCount; ++i) { zen::CbObjectWriter Cbo; Cbo << "index" << i; IoBuffer Payload = Cbo.Save().GetBuffer().AsIoBuffer(); Payload.SetContentType(HttpContentType::kCbObject); zen::IoHash Key = HashKey(i); HttpClient::Response Result = Http.Put(fmt::format("/test/{}", Key), Payload); CHECK(Result.StatusCode == HttpResponseCode::Created); } // Retrieve data for (int i = 0; i < kIterationCount; ++i) { zen::IoHash Key = HashKey(i); HttpClient::Response Result = Http.Get(fmt::format("/test/{}", Key), {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); } // Ensure bad bucket identifiers are rejected { zen::CbObjectWriter Cbo; Cbo << "index" << 42; IoBuffer Payload = Cbo.Save().GetBuffer().AsIoBuffer(); Payload.SetContentType(HttpContentType::kCbObject); zen::IoHash Key = HashKey(442); HttpClient::Response Result = Http.Put(fmt::format("/te!st/{}", Key), Payload); CHECK(Result.StatusCode == HttpResponseCode::BadRequest); } } // Verify that the data persists between process runs (the previous server has exited at this point) { ZenServerInstance Instance1(TestEnv); Instance1.SetTestDir(TestDir); const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady(); const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); HttpClient Http{BaseUri}; // Retrieve data again for (int i = 0; i < kIterationCount; ++i) { zen::IoHash Key = HashKey(i); HttpClient::Response Result = Http.Get(fmt::format("/{}/{}", "test", Key), {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); } } } TEST_CASE("zcache.cbpackage") { using namespace std::literals; auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage { auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView({1, 2, 3, 4, 5, 6, 7, 8, 9})); auto CompressedData = zen::CompressedBuffer::Compress(Data); OutAttachmentKey = CompressedData.DecodeRawHash(); zen::CbWriter Obj; Obj.BeginObject("obj"sv); Obj.AddBinaryAttachment("data", OutAttachmentKey); Obj.EndObject(); zen::CbPackage Package; Package.SetObject(Obj.Save().AsObject()); Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey)); return Package; }; auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool { std::span LhsAttachments = Lhs.GetAttachments(); std::span RhsAttachments = Rhs.GetAttachments(); if (LhsAttachments.size() != RhsAttachments.size()) { return false; } for (const zen::CbAttachment& LhsAttachment : LhsAttachments) { const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash()); CHECK(RhsAttachment); zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress(); CHECK(!LhsBuffer.IsNull()); zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress(); CHECK(!RhsBuffer.IsNull()); if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView())) { return false; } } return true; }; SUBCASE("PUT/GET returns correct package") { std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); ZenServerInstance Instance1(TestEnv); Instance1.SetTestDir(TestDir); const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady(); const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); HttpClient Http{BaseUri}; const std::string_view Bucket = "mosdef"sv; zen::IoHash Key; zen::CbPackage ExpectedPackage = CreateTestPackage(Key); // PUT { zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), Body); CHECK(Result.StatusCode == HttpResponseCode::Created); } // GET { HttpClient::Response Result = Http.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); zen::CbPackage Package; const bool Ok = Package.TryLoad(Result.ResponsePayload); CHECK(Ok); CHECK(IsEqual(Package, ExpectedPackage)); } } SUBCASE("PUT propagates upstream") { // Setup local and remote server std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); ZenServerInstance RemoteInstance(TestEnv); RemoteInstance.SetTestDir(RemoteDataDir); const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady(); ZenServerInstance LocalInstance(TestEnv); LocalInstance.SetTestDir(LocalDataDir); LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(), fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady(); CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput()); const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); const std::string_view Bucket = "mosdef"sv; zen::IoHash Key; zen::CbPackage ExpectedPackage = CreateTestPackage(Key); HttpClient LocalHttp{LocalBaseUri}; HttpClient RemoteHttp{RemoteBaseUri}; // Store the cache record package in the local instance { zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key), Body); CHECK(Result.StatusCode == HttpResponseCode::Created); } // The cache record can be retrieved as a package from the local instance { HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); zen::CbPackage Package; const bool Ok = Package.TryLoad(Result.ResponsePayload); CHECK(Ok); CHECK(IsEqual(Package, ExpectedPackage)); } // The cache record can be retrieved as a package from the remote instance { HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); zen::CbPackage Package; const bool Ok = Package.TryLoad(Result.ResponsePayload); CHECK(Ok); CHECK(IsEqual(Package, ExpectedPackage)); } } SUBCASE("GET finds upstream when missing in local") { // Setup local and remote server std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); ZenServerInstance RemoteInstance(TestEnv); RemoteInstance.SetTestDir(RemoteDataDir); const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady(); ZenServerInstance LocalInstance(TestEnv); LocalInstance.SetTestDir(LocalDataDir); LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(), fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady(); CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput()); const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); HttpClient LocalHttp{LocalBaseUri}; HttpClient RemoteHttp{RemoteBaseUri}; const std::string_view Bucket = "mosdef"sv; zen::IoHash Key; zen::CbPackage ExpectedPackage = CreateTestPackage(Key); // Store the cache record package in upstream cache { zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), Body); CHECK(Result.StatusCode == HttpResponseCode::Created); } // The cache record can be retrieved as a package from the local cache { HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); zen::CbPackage Package; const bool Ok = Package.TryLoad(Result.ResponsePayload); CHECK(Ok); CHECK(IsEqual(Package, ExpectedPackage)); } } } TEST_CASE("zcache.policy") { using namespace std::literals; using namespace utils; auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::IoBuffer { auto Buf = zen::UniqueBuffer::Alloc(Size); uint8_t* Data = reinterpret_cast(Buf.GetData()); for (uint64_t Idx = 0; Idx < Size; Idx++) { Data[Idx] = Idx % 256; } OutHash = zen::IoHash::HashBuffer(Data, Size); return Buf.MoveToShared().AsIoBuffer(); }; auto GeneratePackage = [](zen::IoHash& OutRecordKey, zen::IoHash& OutAttachmentKey) -> zen::CbPackage { auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView({1, 2, 3, 4, 5, 6, 7, 8, 9})); auto CompressedData = zen::CompressedBuffer::Compress(Data); OutAttachmentKey = CompressedData.DecodeRawHash(); zen::CbWriter Writer; Writer.BeginObject("obj"sv); Writer.AddBinaryAttachment("data", OutAttachmentKey); Writer.EndObject(); CbObject CacheRecord = Writer.Save().AsObject(); OutRecordKey = IoHash::HashBuffer(CacheRecord.GetBuffer().GetView()); zen::CbPackage Package; Package.SetObject(CacheRecord); Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey)); return Package; }; SUBCASE("query - 'local' does not query upstream (binary)") { ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance UpstreamInst(TestEnv); UpstreamCfg.Spawn(UpstreamInst); const uint16_t UpstreamPort = UpstreamCfg.Port; ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort); ZenServerInstance LocalInst(TestEnv); LocalCfg.Spawn(LocalInst); const std::string_view Bucket = "legacy"sv; zen::IoHash Key; IoBuffer BinaryValue = GenerateData(1024, Key); HttpClient LocalHttp{LocalCfg.BaseUri}; HttpClient RemoteHttp{UpstreamCfg.BaseUri}; { HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue); CHECK(Result.StatusCode == HttpResponseCode::Created); } { HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}?Policy=QueryLocal,Store", Bucket, Key), {{"Accept", "application/octet-stream"}}); CHECK(Result.StatusCode == HttpResponseCode::NotFound); } { HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), {{"Accept", "application/octet-stream"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); } } SUBCASE("store - 'local' does not store upstream (binary)") { ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance UpstreamInst(TestEnv); UpstreamCfg.Spawn(UpstreamInst); const uint16_t UpstreamPort = UpstreamCfg.Port; ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort); ZenServerInstance LocalInst(TestEnv); LocalCfg.Spawn(LocalInst); const auto Bucket = "legacy"sv; zen::IoHash Key; IoBuffer BinaryValue = GenerateData(1024, Key); HttpClient LocalHttp{LocalCfg.BaseUri}; HttpClient RemoteHttp{UpstreamCfg.BaseUri}; // Store binary cache value locally { HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,StoreLocal", Bucket, Key), BinaryValue, {{"Content-Type", "application/octet-stream"}}); CHECK(Result.StatusCode == HttpResponseCode::Created); } { HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}}); CHECK(Result.StatusCode == HttpResponseCode::NotFound); } { HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); } } SUBCASE("store - 'local/remote' stores local and upstream (binary)") { ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance UpstreamInst(TestEnv); UpstreamCfg.Spawn(UpstreamInst); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); ZenServerInstance LocalInst(TestEnv); LocalCfg.Spawn(LocalInst); const auto Bucket = "legacy"sv; zen::IoHash Key; IoBuffer BinaryValue = GenerateData(1024, Key); HttpClient LocalHttp{LocalCfg.BaseUri}; HttpClient RemoteHttp{UpstreamCfg.BaseUri}; // Store binary cache value locally and upstream { HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), BinaryValue, {{"Content-Type", "application/octet-stream"}}); CHECK(Result.StatusCode == HttpResponseCode::Created); } { HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); } { HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); } } SUBCASE("query - 'local' does not query upstream (cbpackage)") { ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance UpstreamInst(TestEnv); UpstreamCfg.Spawn(UpstreamInst); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); ZenServerInstance LocalInst(TestEnv); LocalCfg.Spawn(LocalInst); const auto Bucket = "legacy"sv; zen::IoHash Key; zen::IoHash PayloadId; zen::CbPackage Package = GeneratePackage(Key, PayloadId); IoBuffer Buf = SerializeToBuffer(Package); HttpClient LocalHttp{LocalCfg.BaseUri}; HttpClient RemoteHttp{UpstreamCfg.BaseUri}; // Store package upstream { HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), Buf); CHECK(Result.StatusCode == HttpResponseCode::Created); } { HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}?Policy=QueryLocal,Store", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::NotFound); } { HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); } } SUBCASE("store - 'local' does not store upstream (cbpackage)") { ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance UpstreamInst(TestEnv); UpstreamCfg.Spawn(UpstreamInst); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); ZenServerInstance LocalInst(TestEnv); LocalCfg.Spawn(LocalInst); const auto Bucket = "legacy"sv; zen::IoHash Key; zen::IoHash PayloadId; zen::CbPackage Package = GeneratePackage(Key, PayloadId); IoBuffer Buf = SerializeToBuffer(Package); HttpClient LocalHttp{LocalCfg.BaseUri}; HttpClient RemoteHttp{UpstreamCfg.BaseUri}; // Store package locally { HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,StoreLocal", Bucket, Key), Buf); CHECK(Result.StatusCode == HttpResponseCode::Created); } { HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::NotFound); } { HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); } } SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)") { ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance UpstreamInst(TestEnv); UpstreamCfg.Spawn(UpstreamInst); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); ZenServerInstance LocalInst(TestEnv); LocalCfg.Spawn(LocalInst); const auto Bucket = "legacy"sv; zen::IoHash Key; zen::IoHash PayloadId; zen::CbPackage Package = GeneratePackage(Key, PayloadId); IoBuffer Buf = SerializeToBuffer(Package); HttpClient LocalHttp{LocalCfg.BaseUri}; HttpClient RemoteHttp{UpstreamCfg.BaseUri}; // Store package locally and upstream { HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), Buf); CHECK(Result.StatusCode == HttpResponseCode::Created); } { HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); } { HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); } } SUBCASE("skip - 'data' returns cache record without attachments/empty payload") { ZenConfig Cfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance Instance(TestEnv); Cfg.Spawn(Instance); const auto Bucket = "test"sv; zen::IoHash Key; zen::IoHash PayloadId; zen::CbPackage Package = GeneratePackage(Key, PayloadId); IoBuffer Buf = SerializeToBuffer(Package); HttpClient Http{Cfg.BaseUri}; // Store package { HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), Buf); CHECK(Result.StatusCode == HttpResponseCode::Created); } // Get package { HttpClient::Response Result = Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result); CbPackage ResponsePackage; CHECK(ResponsePackage.TryLoad(Result.ResponsePayload)); CHECK(ResponsePackage.GetAttachments().size() == 0); } // Get record { HttpClient::Response Result = Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cb"}}); CHECK(Result); CbObject ResponseObject = zen::LoadCompactBinaryObject(Result.ResponsePayload); CHECK(ResponseObject); } // Get payload { HttpClient::Response Result = Http.Get(fmt::format("/{}/{}/{}?Policy=Default,SkipData", Bucket, Key, PayloadId), {{"Accept", "application/x-ue-comp"}}); CHECK(Result); CHECK(Result.ResponsePayload.GetSize() == 0); } } SUBCASE("skip - 'data' returns empty binary value") { ZenConfig Cfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance Instance(TestEnv); Cfg.Spawn(Instance); const auto Bucket = "test"sv; zen::IoHash Key; IoBuffer BinaryValue = GenerateData(1024, Key); HttpClient Http{Cfg.BaseUri}; // Store binary cache value { HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue); CHECK(Result.StatusCode == HttpResponseCode::Created); } // Get package { HttpClient::Response Result = Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/octet-stream"}}); CHECK(Result); CHECK(Result.ResponsePayload.GetSize() == 0); } } } TEST_CASE("zcache.rpc") { using namespace std::literals; auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, const zen::CacheKey& CacheKey, size_t PayloadSize, CachePolicy RecordPolicy) { std::vector Data; Data.resize(PayloadSize); uint32_t DataSeed = *reinterpret_cast(&CacheKey.Hash.Hash[0]); uint16_t* DataPtr = reinterpret_cast(Data.data()); for (size_t Idx = 0; Idx < PayloadSize / 2; ++Idx) { DataPtr[Idx] = static_cast((Idx + DataSeed) % 0xffffu); } if (PayloadSize & 1) { Data[PayloadSize - 1] = static_cast((PayloadSize - 1) & 0xff); } CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy}); }; auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri, std::string_view Namespace, std::string_view Bucket, size_t Num, size_t PayloadSize = 1024, size_t KeyOffset = 1, CachePolicy PutPolicy = CachePolicy::Default, std::vector* OutPackages = nullptr) -> std::vector { std::vector OutKeys; HttpClient Http{BaseUri}; for (uint32_t Key = 1; Key <= Num; ++Key) { zen::IoHash KeyHash; ((uint32_t*)(KeyHash.Hash))[0] = gsl::narrow(KeyOffset + Key); const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; AppendCacheRecord(Request, CacheKey, PayloadSize, PutPolicy); OutKeys.push_back(CacheKey); CbPackage Package; CHECK(Request.Format(Package)); CompositeBuffer Body(FormatPackageMessageBuffer(Package)); HttpClient::Response Result = Http.Post("/$rpc", Body, HttpContentType::kCbPackage, HttpClient::Accept(HttpContentType::kCbPackage)); CHECK(Result.StatusCode == HttpResponseCode::OK); if (OutPackages) { OutPackages->emplace_back(std::move(Package)); } } return OutKeys; }; struct GetCacheRecordResult { zen::CbPackage Response; cacherequests::GetCacheRecordsResult Result; bool Success; }; auto GetCacheRecords = [](std::string_view BaseUri, std::string_view Namespace, std::span Keys, zen::CachePolicy Policy, zen::RpcAcceptOptions AcceptOptions = zen::RpcAcceptOptions::kNone, int Pid = 0) -> GetCacheRecordResult { cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .AcceptOptions = static_cast(AcceptOptions), .ProcessPid = Pid, .DefaultPolicy = Policy, .Namespace = std::string(Namespace)}; for (const CacheKey& Key : Keys) { Request.Requests.push_back({.Key = Key}); } CbObjectWriter RequestWriter; CHECK(Request.Format(RequestWriter)); IoBuffer Body = RequestWriter.Save().GetBuffer().AsIoBuffer(); Body.SetContentType(HttpContentType::kCbObject); HttpClient Http{BaseUri}; HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); GetCacheRecordResult OutResult; if (Result.StatusCode == HttpResponseCode::OK) { CbPackage Response = ParsePackageMessage(Result.ResponsePayload); CHECK(!Response.IsNull()); OutResult.Response = std::move(Response); CHECK(OutResult.Result.Parse(OutResult.Response)); OutResult.Success = true; } return OutResult; }; SUBCASE("get cache records") { std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); ZenServerInstance Inst(TestEnv); Inst.SetTestDir(TestDir); const uint16_t BasePort = Inst.SpawnServerAndWaitUntilReady(); const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort); CachePolicy Policy = CachePolicy::Default; std::vector Keys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 16); GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Result.Results.size() == Keys.size()); for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { const CacheKey& ExpectedKey = Keys[Index++]; CHECK(Record); CHECK(Record->Key == ExpectedKey); CHECK(Record->Values.size() == 1); for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { CHECK(Value.Body); } } } SUBCASE("get missing cache records") { std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); ZenServerInstance Inst(TestEnv); Inst.SetTestDir(TestDir); const uint16_t BasePort = Inst.SpawnServerAndWaitUntilReady(); const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort); CachePolicy Policy = CachePolicy::Default; std::vector ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 16); std::vector Keys; for (const zen::CacheKey& Key : ExistingKeys) { Keys.push_back(Key); Keys.push_back(CacheKey::Create("missing"sv, IoHash::Zero)); } GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Result.Results.size() == Keys.size()); size_t KeyIndex = 0; for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { const bool Missing = Index++ % 2 != 0; if (Missing) { CHECK(!Record); } else { const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++]; CHECK(Record->Key == ExpectedKey); for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { CHECK(Value.Body); } } } } SUBCASE("policy - 'QueryLocal' does not query upstream") { using namespace utils; ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); std::vector Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4); CachePolicy Policy = CachePolicy::QueryLocal; GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Result.Results.size() == Keys.size()); for (const std::optional& Record : Result.Result.Results) { CHECK(!Record); } } SUBCASE("policy - 'QueryRemote' does query upstream") { using namespace utils; ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); std::vector Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4); CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Result.Results.size() == Keys.size()); for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { CHECK(Record); const CacheKey& ExpectedKey = Keys[Index++]; CHECK(Record->Key == ExpectedKey); } } SUBCASE("policy - 'QueryLocal' on put allows overwrite with differing value when not limiting overwrites") { using namespace utils; ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); size_t PayloadSize = 1024; std::string_view Namespace("ue4.ddc"sv); std::string_view Bucket("mastodon"sv); const size_t NumRecords = 4; std::vector Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); HttpClient LocalHttp{LocalCfg.BaseUri}; HttpClient UpstreamHttp{UpstreamCfg.BaseUri}; for (const zen::CacheKey& CacheKey : Keys) { cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); CbPackage Package; CHECK(Request.Format(Package)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); Body.SetContentType(HttpContentType::kCbPackage); HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); cacherequests::PutCacheRecordsResult ParsedResult; CbPackage Response = ParsePackageMessage(Result.ResponsePayload); CHECK(!Response.IsNull()); CHECK(ParsedResult.Parse(Response)); for (bool ResponseSuccess : ParsedResult.Success) { CHECK(ResponseSuccess); } CHECK(ParsedResult.Details.empty()); } auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Result.Results.size() == Keys.size()); for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { CHECK(Record); const CacheKey& ExpectedKey = Keys[Index++]; CHECK(Record->Key == ExpectedKey); for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { CHECK(Value.RawSize == PayloadSize * 2); } } }; // Check that the records are present and overwritten in the local server CheckRecordCorrectness(LocalCfg); // Check that the records are present and overwritten in the upstream server CheckRecordCorrectness(UpstreamCfg); } SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites") { using namespace utils; ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); size_t PayloadSize = 1024; std::string_view Namespace("ue4.ddc"sv); std::string_view Bucket("mastodon"sv); const size_t NumRecords = 4; std::vector Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); HttpClient LocalHttp{LocalCfg.BaseUri}; HttpClient UpstreamHttp{UpstreamCfg.BaseUri}; for (const zen::CacheKey& CacheKey : Keys) { cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); CbPackage Package; CHECK(Request.Format(Package)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); Body.SetContentType(HttpContentType::kCbPackage); HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); cacherequests::PutCacheRecordsResult ParsedResult; CbPackage Response = ParsePackageMessage(Result.ResponsePayload); CHECK(!Response.IsNull()); CHECK(ParsedResult.Parse(Response)); CHECK(Request.Requests.size() == ParsedResult.Success.size()); for (bool ResponseSuccess : ParsedResult.Success) { CHECK(ResponseSuccess); } CHECK(Request.Requests.size() == ParsedResult.Details.size()); for (const CbObjectView& Details : ParsedResult.Details) { CHECK(Details); CHECK(Details["RawHash"sv].IsHash()); CHECK(Details["RawSize"sv].IsInteger()); CHECK(Details["Record"sv].IsObject()); } } auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Result.Results.size() == Keys.size()); for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { CHECK(Record); const CacheKey& ExpectedKey = Keys[Index++]; CHECK(Record->Key == ExpectedKey); for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { CHECK(Value.RawSize == PayloadSize); } } }; // Check that the records are present and not overwritten in the local server CheckRecordCorrectness(LocalCfg); // Check that the records are present and not overwritten in the upstream server CheckRecordCorrectness(UpstreamCfg); } SUBCASE("policy - no 'QueryLocal' on put allows overwrite with differing value when limiting overwrites") { using namespace utils; ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); HttpClient LocalHttp{LocalCfg.BaseUri}; HttpClient UpstreamHttp{UpstreamCfg.BaseUri}; size_t PayloadSize = 1024; std::string_view Namespace("ue4.ddc"sv); std::string_view Bucket("mastodon"sv); const size_t NumRecords = 4; std::vector Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); for (const zen::CacheKey& CacheKey : Keys) { cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Store); CbPackage Package; CHECK(Request.Format(Package)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); Body.SetContentType(HttpContentType::kCbPackage); HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); cacherequests::PutCacheRecordsResult ParsedResult; CbPackage Response = ParsePackageMessage(Result.ResponsePayload); CHECK(!Response.IsNull()); CHECK(ParsedResult.Parse(Response)); for (bool ResponseSuccess : ParsedResult.Success) { CHECK(ResponseSuccess); } CHECK(ParsedResult.Details.empty()); } auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Result.Results.size() == Keys.size()); for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { CHECK(Record); const CacheKey& ExpectedKey = Keys[Index++]; CHECK(Record->Key == ExpectedKey); for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { CHECK(Value.RawSize == PayloadSize * 2); } } }; // Check that the records are present and overwritten in the local server CheckRecordCorrectness(LocalCfg); // Check that the records are present and overwritten in the upstream server CheckRecordCorrectness(UpstreamCfg); } SUBCASE("policy - 'QueryLocal' on put allows overwrite with equivalent value when limiting overwrites") { using namespace utils; ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); HttpClient LocalHttp{LocalCfg.BaseUri}; HttpClient UpstreamHttp{UpstreamCfg.BaseUri}; size_t PayloadSize = 1024; std::string_view Namespace("ue4.ddc"sv); std::string_view Bucket("mastodon"sv); const size_t NumRecords = 4; std::vector Packages; std::vector Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, CachePolicy::Default, &Packages); for (const CbPackage& Package : Packages) { IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); Body.SetContentType(HttpContentType::kCbPackage); HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); cacherequests::PutCacheRecordsResult ParsedResult; CbPackage Response = ParsePackageMessage(Result.ResponsePayload); CHECK(!Response.IsNull()); CHECK(ParsedResult.Parse(Response)); for (bool ResponseSuccess : ParsedResult.Success) { CHECK(ResponseSuccess); } CHECK(ParsedResult.Details.empty()); } auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Result.Results.size() == Keys.size()); for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { CHECK(Record); const CacheKey& ExpectedKey = Keys[Index++]; CHECK(Record->Key == ExpectedKey); for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { CHECK(Value.RawSize == PayloadSize); } } }; // Check that the records are present and unchanged in the local server CheckRecordCorrectness(LocalCfg); // Check that the records are present and unchanged in the upstream server CheckRecordCorrectness(UpstreamCfg); } // TODO: Propagation for rejected PUTs // SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites but allows propagation to // upstream") // { // using namespace utils; // ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); // ZenServerInstance UpstreamServer(TestEnv); // SpawnServer(UpstreamServer, UpstreamCfg); // ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, // "--cache-bucket-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); // size_t PayloadSize = 1024; // std::string_view Namespace("ue4.ddc"sv); // std::string_view Bucket("mastodon"sv); // const size_t NumRecords = 4; // std::vector Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, // CachePolicy::Local); // for (const zen::CacheKey& CacheKey : Keys) // { // cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; // AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); // CbPackage Package; // CHECK(Request.Format(Package)); // IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); // cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, // cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, // cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); // CHECK(Result.status_code == 200); // cacherequests::PutCacheRecordsResult ParsedResult; // CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); // CHECK(!Response.IsNull()); // CHECK(ParsedResult.Parse(Response)); // for (bool ResponseSuccess : ParsedResult.Success) // { // CHECK(!ResponseSuccess); // } // } // auto CheckRecordCorrectness = [&](const ZenConfig& Cfg, size_t ExpectedPayloadSize) { // CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); // GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); // CHECK(Result.Result.Results.size() == Keys.size()); // for (size_t Index = 0; const std::optional& Record : Result.Result.Results) // { // CHECK(Record); // const CacheKey& ExpectedKey = Keys[Index++]; // CHECK(Record->Key == ExpectedKey); // for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) // { // CHECK(Value.RawSize == ExpectedPayloadSize); // } // } // }; // // Check that the records are present and not overwritten in the local server // CheckRecordCorrectness(LocalCfg, PayloadSize); // // Check that the records are present and are the newer size in the upstream server // CheckRecordCorrectness(UpstreamCfg, PayloadSize*2); // } SUBCASE("RpcAcceptOptions") { using namespace utils; std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); ZenServerInstance Inst(TestEnv); Inst.SetTestDir(TestDir); const uint16_t BasePort = Inst.SpawnServerAndWaitUntilReady(); const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort); std::vector SmallKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024); std::vector LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 2, 1024 * 1024 * 16, SmallKeys.size()); std::vector Keys(SmallKeys.begin(), SmallKeys.end()); Keys.insert(Keys.end(), LargeKeys.begin(), LargeKeys.end()); { GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default); CHECK(Result.Result.Results.size() == Keys.size()); for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { CHECK(Record); const CacheKey& ExpectedKey = Keys[Index++]; CHECK(Record->Key == ExpectedKey); for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); IoBufferFileReference Ref; bool IsFileRef = Body.GetFileReference(Ref); CHECK(!IsFileRef); } } } // File path, but only for large files { GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default, RpcAcceptOptions::kAllowLocalReferences); CHECK(Result.Result.Results.size() == Keys.size()); for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { CHECK(Record); const CacheKey& ExpectedKey = Keys[Index++]; CHECK(Record->Key == ExpectedKey); for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); IoBufferFileReference Ref; bool IsFileRef = Body.GetFileReference(Ref); CHECK(IsFileRef == (Body.Size() > 1024)); } } } // File path, for all files { GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default, RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences); CHECK(Result.Result.Results.size() == Keys.size()); for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { CHECK(Record); const CacheKey& ExpectedKey = Keys[Index++]; CHECK(Record->Key == ExpectedKey); for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); IoBufferFileReference Ref; bool IsFileRef = Body.GetFileReference(Ref); CHECK(IsFileRef); } } } // File handle, but only for large files { GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default, RpcAcceptOptions::kAllowLocalReferences, GetCurrentProcessId()); CHECK(Result.Result.Results.size() == Keys.size()); for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { CHECK(Record); const CacheKey& ExpectedKey = Keys[Index++]; CHECK(Record->Key == ExpectedKey); for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); IoBufferFileReference Ref; bool IsFileRef = Body.GetFileReference(Ref); CHECK(IsFileRef == (Body.Size() > 1024)); } } } // File handle, for all files { GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default, RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences, GetCurrentProcessId()); CHECK(Result.Result.Results.size() == Keys.size()); for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { CHECK(Record); const CacheKey& ExpectedKey = Keys[Index++]; CHECK(Record->Key == ExpectedKey); for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); IoBufferFileReference Ref; bool IsFileRef = Body.GetFileReference(Ref); CHECK(IsFileRef); } } } } } TEST_CASE("zcache.failing.upstream") { // This is an exploratory test that takes a long time to run, so lets skip it by default if (true) { return; } using namespace std::literals; using namespace utils; ZenConfig Upstream1Cfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance Upstream1Server(TestEnv); SpawnServer(Upstream1Server, Upstream1Cfg); ZenConfig Upstream2Cfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance Upstream2Server(TestEnv); SpawnServer(Upstream2Server, Upstream2Cfg); std::vector UpstreamPorts = {Upstream1Cfg.Port, Upstream2Cfg.Port}; ZenConfig LocalCfg = ZenConfig::NewWithThreadedUpstreams(TestEnv.GetNewPortNumber(), UpstreamPorts, false); LocalCfg.Args += (" --upstream-thread-count 2"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); const uint16_t LocalPortNumber = LocalCfg.Port; const auto LocalUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); const auto Upstream1Uri = fmt::format("http://localhost:{}/z$", Upstream1Cfg.Port); const auto Upstream2Uri = fmt::format("http://localhost:{}/z$", Upstream2Cfg.Port); bool Upstream1Running = true; bool Upstream2Running = true; using namespace std::literals; auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, const zen::CacheKey& CacheKey, size_t PayloadSize, CachePolicy RecordPolicy) { std::vector Data; Data.resize(PayloadSize / 4); for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx) { Data[Idx] = (*reinterpret_cast(&CacheKey.Hash.Hash[0])) + Idx; } CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4)); Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy}); }; auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri, std::string_view Namespace, std::string_view Bucket, size_t Num, size_t KeyOffset, size_t PayloadSize = 8192) -> std::vector { std::vector OutKeys; cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; for (size_t Key = 1; Key <= Num; ++Key) { zen::IoHash KeyHash; ((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key; const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default); OutKeys.push_back(CacheKey); } CbPackage Package; CHECK(Request.Format(Package)); HttpClient Http{BaseUri}; IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); if (Result.StatusCode != HttpResponseCode::OK) { ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", ToString(Result.StatusCode), Result.ErrorMessage("")); OutKeys.clear(); } return OutKeys; }; struct GetCacheRecordResult { zen::CbPackage Response; cacherequests::GetCacheRecordsResult Result; bool Success = false; }; auto GetCacheRecords = [](std::string_view BaseUri, std::string_view Namespace, std::span Keys, zen::CachePolicy Policy) -> GetCacheRecordResult { cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .DefaultPolicy = Policy, .Namespace = std::string(Namespace)}; for (const CacheKey& Key : Keys) { Request.Requests.push_back({.Key = Key}); } CbObjectWriter RequestWriter; CHECK(Request.Format(RequestWriter)); IoBuffer Body = RequestWriter.Save().GetBuffer().AsIoBuffer(); Body.SetContentType(HttpContentType::kCbObject); HttpClient Http{BaseUri}; HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); GetCacheRecordResult OutResult; if (Result.StatusCode == HttpResponseCode::OK) { CbPackage Response = ParsePackageMessage(Result.ResponsePayload); if (!Response.IsNull()) { OutResult.Response = std::move(Response); CHECK(OutResult.Result.Parse(OutResult.Response)); OutResult.Success = true; } } else { ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", ToString(Result.StatusCode), Result.ErrorMessage("")); } return OutResult; }; // Populate with some simple data CachePolicy Policy = CachePolicy::Default; const size_t ThreadCount = 128; const size_t KeyMultiplier = 16384; const size_t RecordsPerRequest = 64; WorkerThreadPool Pool(ThreadCount); std::atomic_size_t Completed = 0; auto Keys = new std::vector[ThreadCount * KeyMultiplier]; RwLock KeysLock; for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) { size_t Iteration = I; Pool.ScheduleWork( [&] { std::vector NewKeys = PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest); if (NewKeys.size() != RecordsPerRequest) { ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration); Completed.fetch_add(1); return; } { RwLock::ExclusiveLockScope _(KeysLock); Keys[Iteration].swap(NewKeys); } Completed.fetch_add(1); }, WorkerThreadPool::EMode::DisableBacklog); } bool UseUpstream1 = false; while (Completed < ThreadCount * KeyMultiplier) { Sleep(8000); if (UseUpstream1) { if (Upstream2Running) { Upstream2Server.EnableTermination(); Upstream2Server.Shutdown(); Sleep(100); Upstream2Running = false; } if (!Upstream1Running) { SpawnServer(Upstream1Server, Upstream1Cfg); Upstream1Running = true; } UseUpstream1 = !UseUpstream1; } else { if (Upstream1Running) { Upstream1Server.EnableTermination(); Upstream1Server.Shutdown(); Sleep(100); Upstream1Running = false; } if (!Upstream2Running) { SpawnServer(Upstream2Server, Upstream2Cfg); Upstream2Running = true; } UseUpstream1 = !UseUpstream1; } } Completed = 0; for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) { size_t Iteration = I; std::vector& LocalKeys = Keys[Iteration]; if (LocalKeys.empty()) { Completed.fetch_add(1); continue; } Pool.ScheduleWork( [&] { GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy); if (!Result.Success) { ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration); Completed.fetch_add(1); return; } if (Result.Result.Results.size() != LocalKeys.size()) { ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration); Completed.fetch_add(1); return; } for (size_t Index = 0; const std::optional& Record : Result.Result.Results) { const CacheKey& ExpectedKey = LocalKeys[Index++]; if (!Record) { continue; } if (Record->Key != ExpectedKey) { continue; } if (Record->Values.size() != 1) { continue; } for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) { if (!Value.Body) { continue; } } } Completed.fetch_add(1); }, WorkerThreadPool::EMode::DisableBacklog); } while (Completed < ThreadCount * KeyMultiplier) { Sleep(10); } } TEST_CASE("zcache.rpc.partialchunks") { using namespace std::literals; using namespace utils; ZenConfig LocalCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance Server(TestEnv); SpawnServer(Server, LocalCfg); std::vector Attachments; const auto BaseUri = fmt::format("http://localhost:{}/z$", Server.GetBasePort()); auto GenerateKey = [](std::string_view Bucket, size_t KeyIndex) -> CacheKey { IoHash KeyHash; ((size_t*)(KeyHash.Hash))[0] = KeyIndex; return CacheKey::Create(Bucket, KeyHash); }; auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, const CacheKey& CacheKey, size_t AttachmentCount, size_t AttachmentsSize, CachePolicy RecordPolicy) -> std::vector> { std::vector> AttachmentBuffers; std::vector Attachments; for (size_t AttachmentIndex = 0; AttachmentIndex < AttachmentCount; AttachmentIndex++) { CompressedBuffer Value = CreateSemiRandomBlob(AttachmentsSize); AttachmentBuffers.push_back(std::make_pair(Oid::NewOid(), Value)); Attachments.push_back({.Id = AttachmentBuffers.back().first, .Body = std::move(Value)}); } Request.Requests.push_back({.Key = CacheKey, .Values = Attachments, .Policy = RecordPolicy}); return AttachmentBuffers; }; auto PutCacheRecords = [&AppendCacheRecord, &GenerateKey]( std::string_view BaseUri, std::string_view Namespace, std::string_view Bucket, size_t KeyOffset, size_t Num, size_t AttachmentCount, size_t AttachmentsSize = 8192) -> std::vector>>> { std::vector>>> Keys; cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; for (size_t Key = 1; Key <= Num; ++Key) { const CacheKey NewCacheKey = GenerateKey(Bucket, KeyOffset + Key); std::vector> Attachments = AppendCacheRecord(Request, NewCacheKey, AttachmentCount, AttachmentsSize, CachePolicy::Default); Keys.push_back(std::make_pair(NewCacheKey, std::move(Attachments))); } CbPackage Package; CHECK(Request.Format(Package)); HttpClient Http{BaseUri}; IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); Body.SetContentType(HttpContentType::kCbPackage); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); if (Result.StatusCode != HttpResponseCode::OK) { ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", ToString(Result.StatusCode), Result.ErrorMessage("")); Keys.clear(); } return Keys; }; std::string_view TestBucket = "partialcachevaluetests"sv; std::string_view TestNamespace = "ue4.ddc"sv; auto RecordsWithSmallAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 0, 3, 2, 4096u); CHECK(RecordsWithSmallAttachments.size() == 3); auto RecordsWithLargeAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 10, 1, 2, 8u * 1024u * 1024u); CHECK(RecordsWithLargeAttachments.size() == 1); struct PartialOptions { uint64_t Offset = 0ull; uint64_t Size = ~0ull; RpcAcceptOptions AcceptOptions = RpcAcceptOptions::kNone; }; auto GetCacheChunk = [](std::string_view BaseUri, std::string_view Namespace, const CacheKey& Key, const Oid& ValueId, const PartialOptions& Options = {}) -> cacherequests::GetCacheChunksResult { cacherequests::GetCacheChunksRequest Request = { .AcceptMagic = kCbPkgMagic, .AcceptOptions = (uint16_t)Options.AcceptOptions, .Namespace = std::string(Namespace), .Requests = {{.Key = Key, .ValueId = ValueId, .RawOffset = Options.Offset, .RawSize = Options.Size}}}; CbPackage Package; CHECK(Request.Format(Package)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); Body.SetContentType(HttpContentType::kCbPackage); HttpClient Http{BaseUri}; HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.StatusCode == HttpResponseCode::OK); CbPackage Response = ParsePackageMessage(Result.ResponsePayload); bool Loaded = !Response.IsNull(); CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); cacherequests::GetCacheChunksResult GetCacheChunksResult; CHECK(GetCacheChunksResult.Parse(Response)); return GetCacheChunksResult; }; auto GetAndVerifyChunk = [&GetCacheChunk](std::string_view BaseUri, std::string_view Namespace, const CacheKey& Key, const Oid& ChunkId, const CompressedBuffer& VerifyData, const PartialOptions& Options = {}) { cacherequests::GetCacheChunksResult Result = GetCacheChunk(BaseUri, Namespace, Key, ChunkId, Options); CHECK(Result.Results.size() == 1); bool CanGetPartial = ((uint16_t)Options.AcceptOptions & (uint16_t)RpcAcceptOptions::kAllowPartialCacheChunks); if (!CanGetPartial) { CHECK(Result.Results[0].FragmentOffset == 0); CHECK(Result.Results[0].Body.GetCompressedSize() == VerifyData.GetCompressedSize()); } IoBuffer SourceDecompressed = VerifyData.Decompress(Options.Offset, Options.Size).AsIoBuffer(); IoBuffer ReceivedDecompressed = Result.Results[0].Body.Decompress(Options.Offset - Result.Results[0].FragmentOffset, Options.Size).AsIoBuffer(); CHECK(SourceDecompressed.GetView().EqualBytes(ReceivedDecompressed.GetView())); }; GetAndVerifyChunk(BaseUri, TestNamespace, RecordsWithSmallAttachments[0].first, RecordsWithSmallAttachments[0].second[0].first, RecordsWithSmallAttachments[0].second[0].second); GetAndVerifyChunk(BaseUri, TestNamespace, RecordsWithSmallAttachments[0].first, RecordsWithSmallAttachments[0].second[0].first, RecordsWithSmallAttachments[0].second[0].second, PartialOptions{.Offset = 378, .Size = 519, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); GetAndVerifyChunk( BaseUri, TestNamespace, RecordsWithSmallAttachments[0].first, RecordsWithSmallAttachments[0].second[0].first, RecordsWithSmallAttachments[0].second[0].second, PartialOptions{.Offset = 378, .Size = 519, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks}); GetAndVerifyChunk(BaseUri, TestNamespace, RecordsWithLargeAttachments[0].first, RecordsWithLargeAttachments[0].second[0].first, RecordsWithLargeAttachments[0].second[0].second, PartialOptions{.AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); GetAndVerifyChunk(BaseUri, TestNamespace, RecordsWithLargeAttachments[0].first, RecordsWithLargeAttachments[0].second[0].first, RecordsWithLargeAttachments[0].second[0].second, PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u}); GetAndVerifyChunk( BaseUri, TestNamespace, RecordsWithLargeAttachments[0].first, RecordsWithLargeAttachments[0].second[0].first, RecordsWithLargeAttachments[0].second[0].second, PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); GetAndVerifyChunk( BaseUri, TestNamespace, RecordsWithLargeAttachments[0].first, RecordsWithLargeAttachments[0].second[0].first, RecordsWithLargeAttachments[0].second[0].second, PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowPartialCacheChunks}); GetAndVerifyChunk( BaseUri, TestNamespace, RecordsWithLargeAttachments[0].first, RecordsWithLargeAttachments[0].second[0].first, RecordsWithLargeAttachments[0].second[0].second, PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks}); GetAndVerifyChunk( BaseUri, TestNamespace, RecordsWithLargeAttachments[0].first, RecordsWithLargeAttachments[0].second[0].first, RecordsWithLargeAttachments[0].second[0].second, PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks}); } IoBuffer FormatPackageBody(const CbPackage& Package) { IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); Body.SetContentType(HttpContentType::kCbPackage); return Body; } TEST_CASE("zcache.rpc.allpolicies") { using namespace std::literals; using namespace utils; ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalServer.GetBasePort()); HttpClient Http{BaseUri}; std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv; std::string_view TestBucket = "allpoliciestest"sv; std::string_view TestNamespace = "ue4.ddc"sv; // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local) // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type) constexpr int NumKeys = 256; constexpr int NumValues = 4; Oid ValueIds[NumValues]; IoHash Hash; for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { ExtendableStringBuilder<16> ValueName; ValueName << "ValueId_"sv << ValueIndex; static_assert(sizeof(IoHash) >= sizeof(Oid)); ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash); } struct KeyData; struct UserData { UserData& Set(KeyData* InKeyData, int InValueIndex) { Data = InKeyData; ValueIndex = InValueIndex; return *this; } KeyData* Data = nullptr; int ValueIndex = 0; }; struct KeyData { CompressedBuffer BufferValues[NumValues]; uint64_t IntValues[NumValues]; UserData ValueUserData[NumValues]; bool ReceivedChunk[NumValues]; CacheKey Key; UserData KeyUserData; uint32_t KeyIndex = 0; bool GetRequestsData = true; bool UseValueAPI = false; bool UseValuePolicy = false; bool ForceMiss = false; bool UseLocal = true; bool UseRemote = true; bool ShouldBeHit = true; bool ReceivedPut = false; bool ReceivedGet = false; bool ReceivedPutValue = false; bool ReceivedGetValue = false; }; struct CachePutRequest { CacheKey Key; CbObject Record; CacheRecordPolicy Policy; KeyData* Values; UserData* Data; }; struct CachePutValueRequest { CacheKey Key; CompressedBuffer Value; CachePolicy Policy; UserData* Data; }; struct CacheGetRequest { CacheKey Key; CacheRecordPolicy Policy; UserData* Data; }; struct CacheGetValueRequest { CacheKey Key; CachePolicy Policy; UserData* Data; }; struct CacheGetChunkRequest { CacheKey Key; Oid ValueId; uint64_t RawOffset; uint64_t RawSize; IoHash RawHash; CachePolicy Policy; UserData* Data; }; KeyData KeyDatas[NumKeys]; std::vector PutRequests; std::vector PutValueRequests; std::vector GetRequests; std::vector GetValueRequests; std::vector ChunkRequests; for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex) { IoHashStream KeyWriter; KeyWriter.Append(TestVersion.data(), TestVersion.length() * sizeof(TestVersion.data()[0])); KeyWriter.Append(&KeyIndex, sizeof(KeyIndex)); IoHash KeyHash = KeyWriter.GetHash(); KeyData& KeyData = KeyDatas[KeyIndex]; KeyData.Key = CacheKey::Create(TestBucket, KeyHash); KeyData.KeyIndex = KeyIndex; KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0; KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0; KeyData.UseValuePolicy = (KeyIndex & (1 << 3)) != 0; KeyData.ForceMiss = (KeyIndex & (1 << 4)) == 0; KeyData.UseLocal = (KeyIndex & (1 << 5)) == 0; KeyData.UseRemote = (KeyIndex & (1 << 6)) == 0; KeyData.ShouldBeHit = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote); CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None; SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None; CachePolicy PutPolicy = SharedPolicy; CachePolicy GetPolicy = SharedPolicy; GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None; CacheKey& Key = KeyData.Key; for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { KeyData.IntValues[ValueIndex] = static_cast(KeyIndex) | (static_cast(ValueIndex) << 32); KeyData.BufferValues[ValueIndex] = CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex]))); KeyData.ReceivedChunk[ValueIndex] = false; } UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1); for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex); } if (!KeyData.UseValueAPI) { CbObjectWriter Builder; Builder.BeginObject("key"sv); Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash; Builder.EndObject(); Builder.BeginArray("Values"sv); for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { Builder.BeginObject(); Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]); Builder.AddBinaryAttachment("RawHash"sv, KeyData.BufferValues[ValueIndex].DecodeRawHash()); Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].DecodeRawSize()); Builder.EndObject(); } Builder.EndArray(); CacheRecordPolicy PutRecordPolicy; CacheRecordPolicy GetRecordPolicy; if (!KeyData.UseValuePolicy) { PutRecordPolicy = CacheRecordPolicy(PutPolicy); GetRecordPolicy = CacheRecordPolicy(GetPolicy); } else { // Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies // it will use the wrong value for SkipData and fail our tests. CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData); CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData); for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy); GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy); } PutRecordPolicy = PutBuilder.Build(); GetRecordPolicy = GetBuilder.Build(); } if (!KeyData.ForceMiss) { PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData}); } GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData}); for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { UserData& ValueUserData = KeyData.ValueUserData[ValueIndex]; ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData}); } } else { if (!KeyData.ForceMiss) { PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData}); } GetValueRequests.push_back({Key, GetPolicy, &KeyUserData}); ChunkRequests.push_back({Key, Oid::Zero, 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData}); } } // PutCacheRecords { CachePolicy BatchDefaultPolicy = CachePolicy::Default; cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .DefaultPolicy = BatchDefaultPolicy, .Namespace = std::string(TestNamespace)}; Request.Requests.reserve(PutRequests.size()); for (CachePutRequest& PutRequest : PutRequests) { cacherequests::PutCacheRecordRequest& RecordRequest = Request.Requests.emplace_back(); RecordRequest.Key = PutRequest.Key; RecordRequest.Policy = PutRequest.Policy; RecordRequest.Values.reserve(NumValues); for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { RecordRequest.Values.push_back({.Id = ValueIds[ValueIndex], .Body = PutRequest.Values->BufferValues[ValueIndex]}); } PutRequest.Data->Data->ReceivedPut = true; } CbPackage Package; CHECK(Request.Format(Package)); IoBuffer Body = FormatPackageBody(Package); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "PutCacheRecords unexpectedly failed."); } // PutCacheValues { CachePolicy BatchDefaultPolicy = CachePolicy::Default; cacherequests::PutCacheValuesRequest Request = {.AcceptMagic = kCbPkgMagic, .DefaultPolicy = BatchDefaultPolicy, .Namespace = std::string(TestNamespace)}; Request.Requests.reserve(PutValueRequests.size()); for (CachePutValueRequest& PutRequest : PutValueRequests) { Request.Requests.push_back({.Key = PutRequest.Key, .Body = PutRequest.Value, .Policy = PutRequest.Policy}); PutRequest.Data->Data->ReceivedPutValue = true; } CbPackage Package; CHECK(Request.Format(Package)); IoBuffer Body = FormatPackageBody(Package); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "PutCacheValues unexpectedly failed."); } for (KeyData& KeyData : KeyDatas) { if (!KeyData.ForceMiss) { if (!KeyData.UseValueAPI) { CHECK_MESSAGE(KeyData.ReceivedPut, WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put.").c_str()); } else { CHECK_MESSAGE(KeyData.ReceivedPutValue, WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put to ValueAPI.").c_str()); } } } // GetCacheRecords { CachePolicy BatchDefaultPolicy = CachePolicy::Default; cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .DefaultPolicy = BatchDefaultPolicy, .Namespace = std::string(TestNamespace)}; Request.Requests.reserve(GetRequests.size()); for (CacheGetRequest& GetRequest : GetRequests) { Request.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy}); } CbPackage Package; CHECK(Request.Format(Package)); IoBuffer Body = FormatPackageBody(Package); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheRecords unexpectedly failed."); CbPackage Response = ParsePackageMessage(Result.ResponsePayload); bool Loaded = !Response.IsNull(); CHECK_MESSAGE(Loaded, "GetCacheRecords response failed to load."); cacherequests::GetCacheRecordsResult RequestResult; CHECK(RequestResult.Parse(Response)); CHECK_MESSAGE(RequestResult.Results.size() == GetRequests.size(), "GetCacheRecords response count did not match request count."); for (int Index = 0; const std::optional& RecordResult : RequestResult.Results) { bool Succeeded = RecordResult.has_value(); CacheGetRequest& GetRequest = GetRequests[Index++]; KeyData* KeyData = GetRequest.Data->Data; KeyData->ReceivedGet = true; WriteToString<32> Name("Get(", KeyData->KeyIndex, ")"); if (KeyData->ShouldBeHit) { CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str()); } else if (KeyData->ForceMiss) { CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, " unexpectedly succeeded.").c_str()); } if (!KeyData->ForceMiss && Succeeded) { CHECK_MESSAGE(RecordResult->Values.size() == NumValues, WriteToString<32>(Name, " number of values did not match.").c_str()); for (const cacherequests::GetCacheRecordResultValue& Value : RecordResult->Values) { int ExpectedValueIndex = 0; for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex) { if (ValueIds[ExpectedValueIndex] == Value.Id) { break; } } CHECK_MESSAGE(ExpectedValueIndex < NumValues, WriteToString<32>(Name, " could not find matching ValueId.").c_str()); WriteToString<32> ValueName("Get(", KeyData->KeyIndex, ",", ExpectedValueIndex, ")"); CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex]; CHECK_MESSAGE(Value.RawHash == ExpectedValue.DecodeRawHash(), WriteToString<32>(ValueName, " RawHash did not match.").c_str()); CHECK_MESSAGE(Value.RawSize == ExpectedValue.DecodeRawSize(), WriteToString<32>(ValueName, " RawSize did not match.").c_str()); if (KeyData->GetRequestsData) { SharedBuffer Buffer = Value.Body.Decompress(); CHECK_MESSAGE(Buffer.GetSize() == Value.RawSize, WriteToString<32>(ValueName, " BufferSize did not match RawSize.").c_str()); uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex]; CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(ValueName, " had unexpected data.").c_str()); } } } } } // GetCacheValues { CachePolicy BatchDefaultPolicy = CachePolicy::Default; cacherequests::GetCacheValuesRequest GetCacheValuesRequest = {.AcceptMagic = kCbPkgMagic, .DefaultPolicy = BatchDefaultPolicy, .Namespace = std::string(TestNamespace)}; GetCacheValuesRequest.Requests.reserve(GetValueRequests.size()); for (CacheGetValueRequest& GetRequest : GetValueRequests) { GetCacheValuesRequest.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy}); } CbPackage Package; CHECK(GetCacheValuesRequest.Format(Package)); IoBuffer Body = FormatPackageBody(Package); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheValues unexpectedly failed."); IoBuffer MessageBuffer(Result.ResponsePayload); CbPackage Response = ParsePackageMessage(MessageBuffer); bool Loaded = !Response.IsNull(); CHECK_MESSAGE(Loaded, "GetCacheValues response failed to load."); cacherequests::GetCacheValuesResult GetCacheValuesResult; CHECK(GetCacheValuesResult.Parse(Response)); for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheValuesResult.Results) { bool Succeeded = ValueResult.RawHash != IoHash::Zero; CacheGetValueRequest& Request = GetValueRequests[Index++]; KeyData* KeyData = Request.Data->Data; KeyData->ReceivedGetValue = true; WriteToString<32> Name("GetValue("sv, KeyData->KeyIndex, ")"sv); if (KeyData->ShouldBeHit) { CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str()); } else if (KeyData->ForceMiss) { CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, "unexpectedly succeeded.").c_str()); } if (!KeyData->ForceMiss && Succeeded) { CompressedBuffer ExpectedValue = KeyData->BufferValues[0]; CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(), WriteToString<32>(Name, " RawHash did not match.").c_str()); CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(), WriteToString<32>(Name, " RawSize did not match.").c_str()); if (KeyData->GetRequestsData) { SharedBuffer Buffer = ValueResult.Body.Decompress(); CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize, WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str()); uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; uint64_t ExpectedIntValue = KeyData->IntValues[0]; CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str()); } } } } // GetCacheChunks { std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) { return A.Key.Hash < B.Key.Hash; }); CachePolicy BatchDefaultPolicy = CachePolicy::Default; cacherequests::GetCacheChunksRequest GetCacheChunksRequest = {.AcceptMagic = kCbPkgMagic, .DefaultPolicy = BatchDefaultPolicy, .Namespace = std::string(TestNamespace)}; GetCacheChunksRequest.Requests.reserve(ChunkRequests.size()); for (CacheGetChunkRequest& ChunkRequest : ChunkRequests) { GetCacheChunksRequest.Requests.push_back({.Key = ChunkRequest.Key, .ValueId = ChunkRequest.ValueId, .ChunkId = IoHash(), .RawOffset = ChunkRequest.RawOffset, .RawSize = ChunkRequest.RawSize, .Policy = ChunkRequest.Policy}); } CbPackage Package; CHECK(GetCacheChunksRequest.Format(Package)); IoBuffer Body = FormatPackageBody(Package); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheChunks unexpectedly failed."); CbPackage Response = ParsePackageMessage(Result.ResponsePayload); bool Loaded = !Response.IsNull(); CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); cacherequests::GetCacheChunksResult GetCacheChunksResult; CHECK(GetCacheChunksResult.Parse(Response)); CHECK_MESSAGE(GetCacheChunksResult.Results.size() == ChunkRequests.size(), "GetCacheChunks response count did not match request count."); for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheChunksResult.Results) { bool Succeeded = ValueResult.RawHash != IoHash::Zero; CacheGetChunkRequest& Request = ChunkRequests[Index++]; KeyData* KeyData = Request.Data->Data; int ValueIndex = Request.Data->ValueIndex >= 0 ? Request.Data->ValueIndex : 0; KeyData->ReceivedChunk[ValueIndex] = true; WriteToString<32> Name("GetChunks("sv, KeyData->KeyIndex, ","sv, ValueIndex, ")"sv); if (KeyData->ShouldBeHit) { CHECK_MESSAGE(Succeeded, WriteToString<256>(Name, " unexpectedly failed."sv).c_str()); } else if (KeyData->ForceMiss) { CHECK_MESSAGE(!Succeeded, WriteToString<256>(Name, " unexpectedly succeeded."sv).c_str()); } if (KeyData->ShouldBeHit && Succeeded) { CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex]; CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(), WriteToString<32>(Name, " had unexpected RawHash.").c_str()); CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(), WriteToString<32>(Name, " had unexpected RawSize.").c_str()); if (KeyData->GetRequestsData) { SharedBuffer Buffer = ValueResult.Body.Decompress(); CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize, WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str()); uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex]; CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str()); } } } } for (KeyData& KeyData : KeyDatas) { if (!KeyData.UseValueAPI) { CHECK_MESSAGE(KeyData.ReceivedGet, WriteToString<32>("Get(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) { CHECK_MESSAGE( KeyData.ReceivedChunk[ValueIndex], WriteToString<32>("GetChunks(", KeyData.KeyIndex, ",", ValueIndex, ") was unexpectedly not received.").c_str()); } } else { CHECK_MESSAGE(KeyData.ReceivedGetValue, WriteToString<32>("GetValue(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); CHECK_MESSAGE(KeyData.ReceivedChunk[0], WriteToString<32>("GetChunks(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); } } } TEST_CASE("zcache.GetChunksRanges") { using namespace std::literals; using namespace utils; std::string_view TestBucket = "allpoliciestest"sv; std::string_view TestNamespace = "ue4.ddc"sv; ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance UpstreamServer(TestEnv); SpawnServer(UpstreamServer, UpstreamCfg); ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalServer.GetBasePort()); HttpClient Http{BaseUri}; const uint64_t BlobSize = 500u * 1024u; IoHash RawHash; uint64_t BlockSize = 0; { cacherequests::PutCacheValuesRequest PutRequest = {.AcceptMagic = kCbPkgMagic, .DefaultPolicy = CachePolicy::Default, .Namespace = std::string(TestNamespace)}; CompressedBuffer CompressedBlob = CreateSemiRandomBlob(BlobSize); RawHash = CompressedBlob.DecodeRawHash(); OodleCompressor Compressor; OodleCompressionLevel CompressionLevel; CHECK(CompressedBlob.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)); CHECK(BlockSize > 0); PutRequest.Requests.push_back( cacherequests::PutCacheValueRequest{.Key = {.Bucket = std::string(TestBucket), .Hash = RawHash}, .Body = CompressedBlob}); CbPackage Package; CHECK(PutRequest.Format(Package)); IoBuffer Body = FormatPackageBody(Package); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "PutCacheValues unexpectedly failed."); } // GetCacheChunks full { cacherequests::GetCacheChunksRequest GetCacheChunksRequest = { .AcceptMagic = kCbPkgMagic, .AcceptOptions = (uint16_t)(RpcAcceptOptions::kAllowPartialCacheChunks), .DefaultPolicy = CachePolicy::Default, .Namespace = std::string(TestNamespace)}; GetCacheChunksRequest.Requests.push_back({.Key = {.Bucket = std::string(TestBucket), .Hash = RawHash}, //.ValueId = OutOfRangeRequest.ValueId, .ChunkId = RawHash}); CbPackage Package; CHECK(GetCacheChunksRequest.Format(Package)); IoBuffer Body = FormatPackageBody(Package); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheChunks unexpectedly failed."); CbPackage Response = ParsePackageMessage(Result.ResponsePayload); bool Loaded = !Response.IsNull(); CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); cacherequests::GetCacheChunksResult GetCacheChunksResult; CHECK(GetCacheChunksResult.Parse(Response)); CHECK_MESSAGE(GetCacheChunksResult.Results.size() == 1, "GetCacheChunks response count did not match request count."); CHECK_EQ(GetCacheChunksResult.Results[0].RawHash, RawHash); CHECK_EQ(GetCacheChunksResult.Results[0].FragmentHash, IoHash::Zero); CHECK_EQ(GetCacheChunksResult.Results[0].FragmentOffset, 0u); } // GetCacheChunks in range { cacherequests::GetCacheChunksRequest GetCacheChunksRequest = { .AcceptMagic = kCbPkgMagic, .AcceptOptions = (uint16_t)(RpcAcceptOptions::kAllowPartialCacheChunks), .DefaultPolicy = CachePolicy::Default, .Namespace = std::string(TestNamespace)}; const uint64_t RawOffset = BlobSize / 4u; const uint64_t RawSize = BlobSize / 8u; GetCacheChunksRequest.Requests.push_back({.Key = {.Bucket = std::string(TestBucket), .Hash = RawHash}, //.ValueId = OutOfRangeRequest.ValueId, .ChunkId = RawHash, .RawOffset = RawOffset, .RawSize = RawSize}); CbPackage Package; CHECK(GetCacheChunksRequest.Format(Package)); IoBuffer Body = FormatPackageBody(Package); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheChunks unexpectedly failed."); CbPackage Response = ParsePackageMessage(Result.ResponsePayload); bool Loaded = !Response.IsNull(); CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); cacherequests::GetCacheChunksResult GetCacheChunksResult; CHECK(GetCacheChunksResult.Parse(Response)); CHECK_MESSAGE(GetCacheChunksResult.Results.size() == 1, "GetCacheChunks response count did not match request count."); CHECK_EQ(GetCacheChunksResult.Results[0].RawHash, RawHash); CHECK_NE(GetCacheChunksResult.Results[0].FragmentHash, IoHash::Zero); CHECK_NE(GetCacheChunksResult.Results[0].FragmentHash, RawHash); const uint64_t ExpectedRawOffset = (RawOffset / BlockSize) * BlockSize; const uint64_t ExpectedRawSize = RoundUp(RawSize, BlockSize); CHECK_EQ(GetCacheChunksResult.Results[0].FragmentOffset, ExpectedRawOffset); CHECK_EQ(GetCacheChunksResult.Results[0].Body.DecodeRawSize(), ExpectedRawSize); } // GetCacheChunks partially out of bounds { cacherequests::GetCacheChunksRequest GetCacheChunksRequest = { .AcceptMagic = kCbPkgMagic, .AcceptOptions = (uint16_t)(RpcAcceptOptions::kAllowPartialCacheChunks), .DefaultPolicy = CachePolicy::Default, .Namespace = std::string(TestNamespace)}; const uint64_t RawOffset = BlobSize - 512u; const uint64_t RawSize = 1u * 1024u; GetCacheChunksRequest.Requests.push_back({.Key = {.Bucket = std::string(TestBucket), .Hash = RawHash}, //.ValueId = OutOfRangeRequest.ValueId, .ChunkId = RawHash, .RawOffset = RawOffset, .RawSize = RawSize}); CbPackage Package; CHECK(GetCacheChunksRequest.Format(Package)); IoBuffer Body = FormatPackageBody(Package); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheChunks unexpectedly failed."); CbPackage Response = ParsePackageMessage(Result.ResponsePayload); bool Loaded = !Response.IsNull(); CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); cacherequests::GetCacheChunksResult GetCacheChunksResult; CHECK(GetCacheChunksResult.Parse(Response)); CHECK_MESSAGE(GetCacheChunksResult.Results.size() == 1, "GetCacheChunks response count did not match request count."); CHECK_EQ(GetCacheChunksResult.Results[0].RawHash, RawHash); CHECK_NE(GetCacheChunksResult.Results[0].FragmentHash, IoHash::Zero); CHECK_NE(GetCacheChunksResult.Results[0].FragmentHash, RawHash); const uint64_t ExpectedRawOffset = (RawOffset / BlockSize) * BlockSize; const uint64_t ExpectedRawSize = BlobSize - ExpectedRawOffset; CHECK_EQ(GetCacheChunksResult.Results[0].FragmentOffset, ExpectedRawOffset); CHECK_EQ(GetCacheChunksResult.Results[0].Body.DecodeRawSize(), ExpectedRawSize); } // GetCacheChunks out of bounds { cacherequests::GetCacheChunksRequest GetCacheChunksRequest = { .AcceptMagic = kCbPkgMagic, .AcceptOptions = (uint16_t)(RpcAcceptOptions::kAllowPartialCacheChunks), .DefaultPolicy = CachePolicy::Default, .Namespace = std::string(TestNamespace)}; GetCacheChunksRequest.Requests.push_back({.Key = {.Bucket = std::string(TestBucket), .Hash = RawHash}, //.ValueId = OutOfRangeRequest.ValueId, .ChunkId = RawHash, .RawOffset = BlobSize, .RawSize = 1u * 1024u}); CbPackage Package; CHECK(GetCacheChunksRequest.Format(Package)); IoBuffer Body = FormatPackageBody(Package); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheChunks unexpectedly failed."); CbPackage Response = ParsePackageMessage(Result.ResponsePayload); bool Loaded = !Response.IsNull(); CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); cacherequests::GetCacheChunksResult GetCacheChunksResult; CHECK(GetCacheChunksResult.Parse(Response)); CHECK_MESSAGE(GetCacheChunksResult.Results.size() == 1, "GetCacheChunks response count did not match request count."); CHECK_EQ(GetCacheChunksResult.Results[0].RawHash, RawHash); CHECK_EQ(GetCacheChunksResult.Results[0].RawSize, 0u); CHECK_EQ(!!GetCacheChunksResult.Results[0].Body, false); CHECK_EQ(GetCacheChunksResult.Results[0].FragmentHash, IoHash::Zero); CHECK_EQ(GetCacheChunksResult.Results[0].FragmentOffset, 0u); } } TEST_CASE("zcache.batchoperations") { using namespace std::literals; using namespace utils; std::vector TestBuckets = {"batchtest1"sv, "batchtest2"sv}; std::string_view TestNamespace = "ue4.ddc"sv; ZenConfig LocalCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalServer.GetBasePort()); HttpClient Http{BaseUri}; const uint64_t BlobSize = 24u * 1024u; IoHash RawHash; uint64_t BlobCount = 128u; std::vector Hashes; Hashes.reserve(BlobCount); { cacherequests::PutCacheValuesRequest PutRequest = {.AcceptMagic = kCbPkgMagic, .DefaultPolicy = CachePolicy::Default, .Namespace = std::string(TestNamespace)}; PutRequest.Requests.reserve(BlobCount * TestBuckets.size()); for (const std::string_view& TestBucket : TestBuckets) { for (uint64_t BlobIndex = 0; BlobIndex < BlobCount; BlobIndex++) { CompressedBuffer CompressedBlob = CreateSemiRandomBlob(BlobSize); RawHash = CompressedBlob.DecodeRawHash(); Hashes.push_back(RawHash); PutRequest.Requests.push_back( cacherequests::PutCacheValueRequest{.Key = {.Bucket = std::string(TestBucket), .Hash = RawHash}, .Body = CompressedBlob}); } } CbPackage Package; CHECK(PutRequest.Format(Package)); IoBuffer Body = FormatPackageBody(Package); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "PutCacheValues unexpectedly failed."); } { cacherequests::GetCacheChunksRequest GetCacheChunksRequest = {.AcceptMagic = kCbPkgMagic, .AcceptOptions = (uint16_t)(RpcAcceptOptions::kNone), .DefaultPolicy = CachePolicy::Default, .Namespace = std::string(TestNamespace)}; GetCacheChunksRequest.Requests.reserve(BlobCount * TestBuckets.size()); uint64_t BlobIndexOffset = 0; for (const std::string_view& TestBucket : TestBuckets) { for (uint64_t BlobIndex = 0; BlobIndex < BlobCount; BlobIndex++) { GetCacheChunksRequest.Requests.push_back( {.Key = {.Bucket = std::string(TestBucket), .Hash = Hashes[BlobIndex + BlobIndexOffset]}, .ChunkId = Hashes[BlobIndex + BlobIndexOffset]}); } BlobIndexOffset += BlobCount; } CbPackage Package; CHECK(GetCacheChunksRequest.Format(Package)); IoBuffer Body = FormatPackageBody(Package); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheChunks unexpectedly failed."); CbPackage Response = ParsePackageMessage(Result.ResponsePayload); bool Loaded = !Response.IsNull(); CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); cacherequests::GetCacheChunksResult GetCacheChunksResult; CHECK(GetCacheChunksResult.Parse(Response)); CHECK_MESSAGE(GetCacheChunksResult.Results.size() == BlobCount * TestBuckets.size(), "GetCacheChunks response count did not match request count."); for (uint64_t BlobIndex = 0; BlobIndex < BlobCount * TestBuckets.size(); BlobIndex++) { CHECK_EQ(GetCacheChunksResult.Results[BlobIndex].RawHash, Hashes[BlobIndex]); CHECK(GetCacheChunksResult.Results[BlobIndex].Body); } } { cacherequests::GetCacheValuesRequest GetCacheValuesRequest = {.AcceptMagic = kCbPkgMagic, .AcceptOptions = (uint16_t)(RpcAcceptOptions::kNone), .DefaultPolicy = CachePolicy::Default, .Namespace = std::string(TestNamespace)}; GetCacheValuesRequest.Requests.reserve(BlobCount * TestBuckets.size()); uint64_t BlobIndexOffset = 0; for (const std::string_view& TestBucket : TestBuckets) { for (uint64_t BlobIndex = 0; BlobIndex < BlobCount; BlobIndex++) { GetCacheValuesRequest.Requests.push_back(cacherequests::GetCacheValueRequest{ .Key = {.Bucket = std::string(TestBucket), .Hash = Hashes[BlobIndex + BlobIndexOffset]}}); } BlobIndexOffset += BlobCount; } CbPackage Package; CHECK(GetCacheValuesRequest.Format(Package)); IoBuffer Body = FormatPackageBody(Package); HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheValues unexpectedly failed."); CbPackage Response = ParsePackageMessage(Result.ResponsePayload); bool Loaded = !Response.IsNull(); CHECK_MESSAGE(Loaded, "GetCacheValues response failed to load."); cacherequests::GetCacheValuesResult GetCacheValuesResult; CHECK(GetCacheValuesResult.Parse(Response)); CHECK_MESSAGE(GetCacheValuesResult.Results.size() == BlobCount * TestBuckets.size(), "GetCacheValues response count did not match request count."); for (uint64_t BlobIndex = 0; BlobIndex < BlobCount * TestBuckets.size(); BlobIndex++) { CHECK_EQ(GetCacheValuesResult.Results[BlobIndex].RawHash, Hashes[BlobIndex]); CHECK(GetCacheValuesResult.Results[BlobIndex].Body); } } } } // namespace zen::tests #endif