diff options
| author | Stefan Boberg <[email protected]> | 2025-09-29 13:15:16 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-29 13:15:16 +0200 |
| commit | d4c6e547a7081b1562a69dc9839d24cb82681c5d (patch) | |
| tree | 3ffe43dcf09bb6d01c2fb860bb1f73882f44827d /src/zenserver-test/cache-tests.cpp | |
| parent | gracefully handle missing chunks when exporting an oplog (#526) (diff) | |
| download | zen-d4c6e547a7081b1562a69dc9839d24cb82681c5d.tar.xz zen-d4c6e547a7081b1562a69dc9839d24cb82681c5d.zip | |
split zenserver-test monolith into multiple source files (#528)
Diffstat (limited to 'src/zenserver-test/cache-tests.cpp')
| -rw-r--r-- | src/zenserver-test/cache-tests.cpp | 2365 |
1 files changed, 2365 insertions, 0 deletions
diff --git a/src/zenserver-test/cache-tests.cpp b/src/zenserver-test/cache-tests.cpp new file mode 100644 index 000000000..da0ef6b1d --- /dev/null +++ b/src/zenserver-test/cache-tests.cpp @@ -0,0 +1,2365 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#if ZEN_WITH_TESTS +# include "zenserver-test.h" +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/fmtutils.h> +# include <zenhttp/packageformat.h> +# include <zenutil/cache/cachepolicy.h> +# include <zenutil/cache/cacherequests.h> +# include <zencore/filesystem.h> +# include <zencore/stream.h> +# include <zencore/string.h> +# include <zenutil/zenserverprocess.h> +# include <zenhttp/httpclient.h> + +# include <random> + +namespace zen::tests { + +TEST_CASE("zcache.basic") +{ + using namespace std::literals; + + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + const int kIterationCount = 100; + + auto HashKey = [](int i) -> zen::IoHash { return zen::IoHash::HashBuffer(&i, sizeof i); }; + + { + ZenServerInstance Instance1(TestEnv); + Instance1.SetTestDir(TestDir); + + const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady(); + const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); + + // Populate with some simple data + + HttpClient Http{BaseUri}; + + for (int i = 0; i < kIterationCount; ++i) + { + zen::CbObjectWriter Cbo; + Cbo << "index" << i; + + IoBuffer Payload = Cbo.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(HttpContentType::kCbObject); + + zen::IoHash Key = HashKey(i); + + HttpClient::Response Result = Http.Put(fmt::format("/test/{}", Key), Payload); + + CHECK(Result.StatusCode == HttpResponseCode::Created); + } + + // Retrieve data + + for (int i = 0; i < kIterationCount; ++i) + { + zen::IoHash Key = HashKey(i); + + HttpClient::Response Result = Http.Get(fmt::format("/test/{}", Key), {{"Accept", "application/x-ue-cbpkg"}}); + + CHECK(Result.StatusCode == HttpResponseCode::OK); + } + + // Ensure bad bucket identifiers are rejected + + { + zen::CbObjectWriter Cbo; + Cbo << "index" << 42; + + IoBuffer Payload = Cbo.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(HttpContentType::kCbObject); + + zen::IoHash Key = HashKey(442); + + HttpClient::Response Result = Http.Put(fmt::format("/te!st/{}", Key), Payload); + + CHECK(Result.StatusCode == HttpResponseCode::BadRequest); + } + } + + // Verify that the data persists between process runs (the previous server has exited at this point) + + { + ZenServerInstance Instance1(TestEnv); + Instance1.SetTestDir(TestDir); + const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady(); + + const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); + + HttpClient Http{BaseUri}; + + // Retrieve data again + + for (int i = 0; i < kIterationCount; ++i) + { + zen::IoHash Key = HashKey(i); + + HttpClient::Response Result = Http.Get(fmt::format("/{}/{}", "test", Key), {{"Accept", "application/x-ue-cbpkg"}}); + + CHECK(Result.StatusCode == HttpResponseCode::OK); + } + } +} + +TEST_CASE("zcache.cbpackage") +{ + using namespace std::literals; + + auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage { + auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9})); + auto CompressedData = zen::CompressedBuffer::Compress(Data); + + OutAttachmentKey = CompressedData.DecodeRawHash(); + + zen::CbWriter Obj; + Obj.BeginObject("obj"sv); + Obj.AddBinaryAttachment("data", OutAttachmentKey); + Obj.EndObject(); + + zen::CbPackage Package; + Package.SetObject(Obj.Save().AsObject()); + Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey)); + + return Package; + }; + + auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool { + std::span<const zen::CbAttachment> LhsAttachments = Lhs.GetAttachments(); + std::span<const zen::CbAttachment> RhsAttachments = Rhs.GetAttachments(); + + if (LhsAttachments.size() != RhsAttachments.size()) + { + return false; + } + + for (const zen::CbAttachment& LhsAttachment : LhsAttachments) + { + const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash()); + CHECK(RhsAttachment); + + zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress(); + CHECK(!LhsBuffer.IsNull()); + + zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress(); + CHECK(!RhsBuffer.IsNull()); + + if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView())) + { + return false; + } + } + + return true; + }; + + SUBCASE("PUT/GET returns correct package") + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + ZenServerInstance Instance1(TestEnv); + Instance1.SetTestDir(TestDir); + const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady(); + const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); + + HttpClient Http{BaseUri}; + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + // PUT + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), Body); + CHECK(Result.StatusCode == HttpResponseCode::Created); + } + + // GET + { + HttpClient::Response Result = Http.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.StatusCode == HttpResponseCode::OK); + + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Result.ResponsePayload); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } + + SUBCASE("PUT propagates upstream") + { + // Setup local and remote server + std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); + std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); + + ZenServerInstance RemoteInstance(TestEnv); + RemoteInstance.SetTestDir(RemoteDataDir); + const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady(); + + ZenServerInstance LocalInstance(TestEnv); + LocalInstance.SetTestDir(LocalDataDir); + LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(), + fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); + const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady(); + CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput()); + + const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); + const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + HttpClient LocalHttp{LocalBaseUri}; + HttpClient RemoteHttp{RemoteBaseUri}; + + // Store the cache record package in the local instance + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key), Body); + + CHECK(Result.StatusCode == HttpResponseCode::Created); + } + + // The cache record can be retrieved as a package from the local instance + { + HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.StatusCode == HttpResponseCode::OK); + + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Result.ResponsePayload); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + + // The cache record can be retrieved as a package from the remote instance + { + HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.StatusCode == HttpResponseCode::OK); + + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Result.ResponsePayload); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } + + SUBCASE("GET finds upstream when missing in local") + { + // Setup local and remote server + std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); + std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); + + ZenServerInstance RemoteInstance(TestEnv); + RemoteInstance.SetTestDir(RemoteDataDir); + const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady(); + + ZenServerInstance LocalInstance(TestEnv); + LocalInstance.SetTestDir(LocalDataDir); + LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(), + fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); + const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady(); + CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput()); + + const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); + const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); + + HttpClient LocalHttp{LocalBaseUri}; + HttpClient RemoteHttp{RemoteBaseUri}; + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + // Store the cache record package in upstream cache + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), Body); + + CHECK(Result.StatusCode == HttpResponseCode::Created); + } + + // The cache record can be retrieved as a package from the local cache + { + HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.StatusCode == HttpResponseCode::OK); + + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Result.ResponsePayload); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } +} + +TEST_CASE("zcache.policy") +{ + using namespace std::literals; + using namespace utils; + + auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::IoBuffer { + auto Buf = zen::UniqueBuffer::Alloc(Size); + uint8_t* Data = reinterpret_cast<uint8_t*>(Buf.GetData()); + for (uint64_t Idx = 0; Idx < Size; Idx++) + { + Data[Idx] = Idx % 256; + } + OutHash = zen::IoHash::HashBuffer(Data, Size); + return Buf.MoveToShared().AsIoBuffer(); + }; + + auto GeneratePackage = [](zen::IoHash& OutRecordKey, zen::IoHash& OutAttachmentKey) -> zen::CbPackage { + auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9})); + auto CompressedData = zen::CompressedBuffer::Compress(Data); + OutAttachmentKey = CompressedData.DecodeRawHash(); + + zen::CbWriter Writer; + Writer.BeginObject("obj"sv); + Writer.AddBinaryAttachment("data", OutAttachmentKey); + Writer.EndObject(); + CbObject CacheRecord = Writer.Save().AsObject(); + + OutRecordKey = IoHash::HashBuffer(CacheRecord.GetBuffer().GetView()); + + zen::CbPackage Package; + Package.SetObject(CacheRecord); + Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey)); + + return Package; + }; + + SUBCASE("query - 'local' does not query upstream (binary)") + { + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamInst(TestEnv); + UpstreamCfg.Spawn(UpstreamInst); + const uint16_t UpstreamPort = UpstreamCfg.Port; + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort); + ZenServerInstance LocalInst(TestEnv); + LocalCfg.Spawn(LocalInst); + + const std::string_view Bucket = "legacy"sv; + + zen::IoHash Key; + IoBuffer BinaryValue = GenerateData(1024, Key); + + HttpClient LocalHttp{LocalCfg.BaseUri}; + HttpClient RemoteHttp{UpstreamCfg.BaseUri}; + + { + HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue); + CHECK(Result.StatusCode == HttpResponseCode::Created); + } + + { + HttpClient::Response Result = + LocalHttp.Get(fmt::format("/{}/{}?Policy=QueryLocal,Store", Bucket, Key), {{"Accept", "application/octet-stream"}}); + CHECK(Result.StatusCode == HttpResponseCode::NotFound); + } + + { + HttpClient::Response Result = + LocalHttp.Get(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), {{"Accept", "application/octet-stream"}}); + CHECK(Result.StatusCode == HttpResponseCode::OK); + } + } + + SUBCASE("store - 'local' does not store upstream (binary)") + { + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamInst(TestEnv); + UpstreamCfg.Spawn(UpstreamInst); + const uint16_t UpstreamPort = UpstreamCfg.Port; + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort); + ZenServerInstance LocalInst(TestEnv); + LocalCfg.Spawn(LocalInst); + + const auto Bucket = "legacy"sv; + + zen::IoHash Key; + IoBuffer BinaryValue = GenerateData(1024, Key); + + HttpClient LocalHttp{LocalCfg.BaseUri}; + HttpClient RemoteHttp{UpstreamCfg.BaseUri}; + + // Store binary cache value locally + { + HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,StoreLocal", Bucket, Key), + BinaryValue, + {{"Content-Type", "application/octet-stream"}}); + CHECK(Result.StatusCode == HttpResponseCode::Created); + } + + { + HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}}); + CHECK(Result.StatusCode == HttpResponseCode::NotFound); + } + + { + HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}}); + CHECK(Result.StatusCode == HttpResponseCode::OK); + } + } + + SUBCASE("store - 'local/remote' stores local and upstream (binary)") + { + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamInst(TestEnv); + UpstreamCfg.Spawn(UpstreamInst); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenServerInstance LocalInst(TestEnv); + LocalCfg.Spawn(LocalInst); + + const auto Bucket = "legacy"sv; + + zen::IoHash Key; + IoBuffer BinaryValue = GenerateData(1024, Key); + + HttpClient LocalHttp{LocalCfg.BaseUri}; + HttpClient RemoteHttp{UpstreamCfg.BaseUri}; + + // Store binary cache value locally and upstream + { + HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), + BinaryValue, + {{"Content-Type", "application/octet-stream"}}); + CHECK(Result.StatusCode == HttpResponseCode::Created); + } + + { + HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}}); + CHECK(Result.StatusCode == HttpResponseCode::OK); + } + + { + HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}}); + CHECK(Result.StatusCode == HttpResponseCode::OK); + } + } + + SUBCASE("query - 'local' does not query upstream (cbpackage)") + { + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamInst(TestEnv); + UpstreamCfg.Spawn(UpstreamInst); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenServerInstance LocalInst(TestEnv); + LocalCfg.Spawn(LocalInst); + + const auto Bucket = "legacy"sv; + + zen::IoHash Key; + zen::IoHash PayloadId; + zen::CbPackage Package = GeneratePackage(Key, PayloadId); + IoBuffer Buf = SerializeToBuffer(Package); + + HttpClient LocalHttp{LocalCfg.BaseUri}; + HttpClient RemoteHttp{UpstreamCfg.BaseUri}; + + // Store package upstream + { + HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), Buf); + CHECK(Result.StatusCode == HttpResponseCode::Created); + } + + { + HttpClient::Response Result = + LocalHttp.Get(fmt::format("/{}/{}?Policy=QueryLocal,Store", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.StatusCode == HttpResponseCode::NotFound); + } + + { + HttpClient::Response Result = + LocalHttp.Get(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.StatusCode == HttpResponseCode::OK); + } + } + + SUBCASE("store - 'local' does not store upstream (cbpackage)") + { + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamInst(TestEnv); + UpstreamCfg.Spawn(UpstreamInst); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenServerInstance LocalInst(TestEnv); + LocalCfg.Spawn(LocalInst); + + const auto Bucket = "legacy"sv; + + zen::IoHash Key; + zen::IoHash PayloadId; + zen::CbPackage Package = GeneratePackage(Key, PayloadId); + IoBuffer Buf = SerializeToBuffer(Package); + + HttpClient LocalHttp{LocalCfg.BaseUri}; + HttpClient RemoteHttp{UpstreamCfg.BaseUri}; + + // Store package locally + { + HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,StoreLocal", Bucket, Key), Buf); + CHECK(Result.StatusCode == HttpResponseCode::Created); + } + + { + HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.StatusCode == HttpResponseCode::NotFound); + } + + { + HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.StatusCode == HttpResponseCode::OK); + } + } + + SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)") + { + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamInst(TestEnv); + UpstreamCfg.Spawn(UpstreamInst); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenServerInstance LocalInst(TestEnv); + LocalCfg.Spawn(LocalInst); + + const auto Bucket = "legacy"sv; + + zen::IoHash Key; + zen::IoHash PayloadId; + zen::CbPackage Package = GeneratePackage(Key, PayloadId); + IoBuffer Buf = SerializeToBuffer(Package); + + HttpClient LocalHttp{LocalCfg.BaseUri}; + HttpClient RemoteHttp{UpstreamCfg.BaseUri}; + + // Store package locally and upstream + { + HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), Buf); + CHECK(Result.StatusCode == HttpResponseCode::Created); + } + + { + HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.StatusCode == HttpResponseCode::OK); + } + + { + HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.StatusCode == HttpResponseCode::OK); + } + } + + SUBCASE("skip - 'data' returns cache record without attachments/empty payload") + { + ZenConfig Cfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance Instance(TestEnv); + Cfg.Spawn(Instance); + + const auto Bucket = "test"sv; + + zen::IoHash Key; + zen::IoHash PayloadId; + zen::CbPackage Package = GeneratePackage(Key, PayloadId); + IoBuffer Buf = SerializeToBuffer(Package); + + HttpClient Http{Cfg.BaseUri}; + + // Store package + { + HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), Buf); + CHECK(Result.StatusCode == HttpResponseCode::Created); + } + + // Get package + { + HttpClient::Response Result = + Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result); + CbPackage ResponsePackage; + CHECK(ResponsePackage.TryLoad(Result.ResponsePayload)); + CHECK(ResponsePackage.GetAttachments().size() == 0); + } + + // Get record + { + HttpClient::Response Result = + Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cb"}}); + CHECK(Result); + CbObject ResponseObject = zen::LoadCompactBinaryObject(Result.ResponsePayload); + CHECK(ResponseObject); + } + + // Get payload + { + HttpClient::Response Result = + Http.Get(fmt::format("/{}/{}/{}?Policy=Default,SkipData", Bucket, Key, PayloadId), {{"Accept", "application/x-ue-comp"}}); + CHECK(Result); + CHECK(Result.ResponsePayload.GetSize() == 0); + } + } + + SUBCASE("skip - 'data' returns empty binary value") + { + ZenConfig Cfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance Instance(TestEnv); + Cfg.Spawn(Instance); + + const auto Bucket = "test"sv; + + zen::IoHash Key; + IoBuffer BinaryValue = GenerateData(1024, Key); + + HttpClient Http{Cfg.BaseUri}; + + // Store binary cache value + { + HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue); + CHECK(Result.StatusCode == HttpResponseCode::Created); + } + + // Get package + { + HttpClient::Response Result = + Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/octet-stream"}}); + CHECK(Result); + CHECK(Result.ResponsePayload.GetSize() == 0); + } + } +} + +TEST_CASE("zcache.rpc") +{ + using namespace std::literals; + + auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, + const zen::CacheKey& CacheKey, + size_t PayloadSize, + CachePolicy RecordPolicy) { + std::vector<uint8_t> Data; + Data.resize(PayloadSize); + uint32_t DataSeed = *reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0]); + uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data()); + for (size_t Idx = 0; Idx < PayloadSize / 2; ++Idx) + { + DataPtr[Idx] = static_cast<uint16_t>((Idx + DataSeed) % 0xffffu); + } + if (PayloadSize & 1) + { + Data[PayloadSize - 1] = static_cast<uint8_t>((PayloadSize - 1) & 0xff); + } + CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); + Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy}); + }; + + auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri, + std::string_view Namespace, + std::string_view Bucket, + size_t Num, + size_t PayloadSize = 1024, + size_t KeyOffset = 1, + CachePolicy PutPolicy = CachePolicy::Default, + std::vector<CbPackage>* OutPackages = nullptr) -> std::vector<CacheKey> { + std::vector<zen::CacheKey> OutKeys; + + HttpClient Http{BaseUri}; + + for (uint32_t Key = 1; Key <= Num; ++Key) + { + zen::IoHash KeyHash; + ((uint32_t*)(KeyHash.Hash))[0] = gsl::narrow<uint32_t>(KeyOffset + Key); + const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); + + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + AppendCacheRecord(Request, CacheKey, PayloadSize, PutPolicy); + OutKeys.push_back(CacheKey); + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + Body.SetContentType(HttpContentType::kCbPackage); + HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + + CHECK(Result.StatusCode == HttpResponseCode::OK); + if (OutPackages) + { + OutPackages->emplace_back(std::move(Package)); + } + } + + return OutKeys; + }; + + struct GetCacheRecordResult + { + zen::CbPackage Response; + cacherequests::GetCacheRecordsResult Result; + bool Success; + }; + + auto GetCacheRecords = [](std::string_view BaseUri, + std::string_view Namespace, + std::span<zen::CacheKey> Keys, + zen::CachePolicy Policy, + zen::RpcAcceptOptions AcceptOptions = zen::RpcAcceptOptions::kNone, + int Pid = 0) -> GetCacheRecordResult { + cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, + .AcceptOptions = static_cast<uint16_t>(AcceptOptions), + .ProcessPid = Pid, + .DefaultPolicy = Policy, + .Namespace = std::string(Namespace)}; + for (const CacheKey& Key : Keys) + { + Request.Requests.push_back({.Key = Key}); + } + + CbObjectWriter RequestWriter; + CHECK(Request.Format(RequestWriter)); + + IoBuffer Body = RequestWriter.Save().GetBuffer().AsIoBuffer(); + Body.SetContentType(HttpContentType::kCbObject); + + HttpClient Http{BaseUri}; + + HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + + GetCacheRecordResult OutResult; + + if (Result.StatusCode == HttpResponseCode::OK) + { + CbPackage Response = ParsePackageMessage(Result.ResponsePayload); + CHECK(!Response.IsNull()); + OutResult.Response = std::move(Response); + CHECK(OutResult.Result.Parse(OutResult.Response)); + OutResult.Success = true; + } + + return OutResult; + }; + + SUBCASE("get cache records") + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + ZenServerInstance Inst(TestEnv); + Inst.SetTestDir(TestDir); + + const uint16_t BasePort = Inst.SpawnServerAndWaitUntilReady(); + const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort); + + CachePolicy Policy = CachePolicy::Default; + std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128); + GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record); + CHECK(Record->Key == ExpectedKey); + CHECK(Record->Values.size() == 1); + + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.Body); + } + } + } + + SUBCASE("get missing cache records") + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + ZenServerInstance Inst(TestEnv); + Inst.SetTestDir(TestDir); + const uint16_t BasePort = Inst.SpawnServerAndWaitUntilReady(); + const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort); + + CachePolicy Policy = CachePolicy::Default; + std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128); + std::vector<zen::CacheKey> Keys; + + for (const zen::CacheKey& Key : ExistingKeys) + { + Keys.push_back(Key); + Keys.push_back(CacheKey::Create("missing"sv, IoHash::Zero)); + } + + GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + size_t KeyIndex = 0; + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + const bool Missing = Index++ % 2 != 0; + + if (Missing) + { + CHECK(!Record); + } + else + { + const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.Body); + } + } + } + } + + SUBCASE("policy - 'QueryLocal' does not query upstream") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4); + + CachePolicy Policy = CachePolicy::QueryLocal; + GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(!Record); + } + } + + SUBCASE("policy - 'QueryRemote' does query upstream") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4); + + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + } + } + + SUBCASE("policy - 'QueryLocal' on put allows overwrite with differing value when not limiting overwrites") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + size_t PayloadSize = 1024; + std::string_view Namespace("ue4.ddc"sv); + std::string_view Bucket("mastodon"sv); + const size_t NumRecords = 4; + std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); + + HttpClient LocalHttp{LocalCfg.BaseUri}; + HttpClient UpstreamHttp{UpstreamCfg.BaseUri}; + + for (const zen::CacheKey& CacheKey : Keys) + { + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + Body.SetContentType(HttpContentType::kCbPackage); + HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + + CHECK(Result.StatusCode == HttpResponseCode::OK); + cacherequests::PutCacheRecordsResult ParsedResult; + CbPackage Response = ParsePackageMessage(Result.ResponsePayload); + CHECK(!Response.IsNull()); + CHECK(ParsedResult.Parse(Response)); + for (bool ResponseSuccess : ParsedResult.Success) + { + CHECK(ResponseSuccess); + } + CHECK(ParsedResult.Details.empty()); + } + + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize * 2); + } + } + }; + + // Check that the records are present and overwritten in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and overwritten in the upstream server + CheckRecordCorrectness(UpstreamCfg); + } + + SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites"); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + size_t PayloadSize = 1024; + std::string_view Namespace("ue4.ddc"sv); + std::string_view Bucket("mastodon"sv); + const size_t NumRecords = 4; + std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); + + HttpClient LocalHttp{LocalCfg.BaseUri}; + HttpClient UpstreamHttp{UpstreamCfg.BaseUri}; + + for (const zen::CacheKey& CacheKey : Keys) + { + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + Body.SetContentType(HttpContentType::kCbPackage); + + HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + + CHECK(Result.StatusCode == HttpResponseCode::OK); + cacherequests::PutCacheRecordsResult ParsedResult; + CbPackage Response = ParsePackageMessage(Result.ResponsePayload); + CHECK(!Response.IsNull()); + CHECK(ParsedResult.Parse(Response)); + CHECK(Request.Requests.size() == ParsedResult.Success.size()); + for (bool ResponseSuccess : ParsedResult.Success) + { + CHECK(ResponseSuccess); + } + CHECK(Request.Requests.size() == ParsedResult.Details.size()); + for (const CbObjectView& Details : ParsedResult.Details) + { + CHECK(Details); + CHECK(Details["RawHash"sv].IsHash()); + CHECK(Details["RawSize"sv].IsInteger()); + CHECK(Details["Record"sv].IsObject()); + } + } + + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize); + } + } + }; + + // Check that the records are present and not overwritten in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and not overwritten in the upstream server + CheckRecordCorrectness(UpstreamCfg); + } + + SUBCASE("policy - no 'QueryLocal' on put allows overwrite with differing value when limiting overwrites") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites"); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + HttpClient LocalHttp{LocalCfg.BaseUri}; + HttpClient UpstreamHttp{UpstreamCfg.BaseUri}; + + size_t PayloadSize = 1024; + std::string_view Namespace("ue4.ddc"sv); + std::string_view Bucket("mastodon"sv); + const size_t NumRecords = 4; + std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize); + + for (const zen::CacheKey& CacheKey : Keys) + { + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Store); + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + Body.SetContentType(HttpContentType::kCbPackage); + HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + + CHECK(Result.StatusCode == HttpResponseCode::OK); + cacherequests::PutCacheRecordsResult ParsedResult; + CbPackage Response = ParsePackageMessage(Result.ResponsePayload); + CHECK(!Response.IsNull()); + CHECK(ParsedResult.Parse(Response)); + for (bool ResponseSuccess : ParsedResult.Success) + { + CHECK(ResponseSuccess); + } + CHECK(ParsedResult.Details.empty()); + } + + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize * 2); + } + } + }; + + // Check that the records are present and overwritten in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and overwritten in the upstream server + CheckRecordCorrectness(UpstreamCfg); + } + + SUBCASE("policy - 'QueryLocal' on put allows overwrite with equivalent value when limiting overwrites") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites"); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + HttpClient LocalHttp{LocalCfg.BaseUri}; + HttpClient UpstreamHttp{UpstreamCfg.BaseUri}; + + size_t PayloadSize = 1024; + std::string_view Namespace("ue4.ddc"sv); + std::string_view Bucket("mastodon"sv); + const size_t NumRecords = 4; + std::vector<CbPackage> Packages; + std::vector<zen::CacheKey> Keys = + PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, CachePolicy::Default, &Packages); + + for (const CbPackage& Package : Packages) + { + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + Body.SetContentType(HttpContentType::kCbPackage); + HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + + CHECK(Result.StatusCode == HttpResponseCode::OK); + cacherequests::PutCacheRecordsResult ParsedResult; + CbPackage Response = ParsePackageMessage(Result.ResponsePayload); + CHECK(!Response.IsNull()); + CHECK(ParsedResult.Parse(Response)); + for (bool ResponseSuccess : ParsedResult.Success) + { + CHECK(ResponseSuccess); + } + CHECK(ParsedResult.Details.empty()); + } + + auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) { + CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + CHECK(Value.RawSize == PayloadSize); + } + } + }; + + // Check that the records are present and unchanged in the local server + CheckRecordCorrectness(LocalCfg); + + // Check that the records are present and unchanged in the upstream server + CheckRecordCorrectness(UpstreamCfg); + } + + // TODO: Propagation for rejected PUTs + // SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites but allows propagation to + // upstream") + // { + // using namespace utils; + + // ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + // ZenServerInstance UpstreamServer(TestEnv); + // SpawnServer(UpstreamServer, UpstreamCfg); + + // ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, + // "--cache-bucket-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg); + + // size_t PayloadSize = 1024; + // std::string_view Namespace("ue4.ddc"sv); + // std::string_view Bucket("mastodon"sv); + // const size_t NumRecords = 4; + // std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, + // CachePolicy::Local); + + // for (const zen::CacheKey& CacheKey : Keys) + // { + // cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + // AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default); + + // CbPackage Package; + // CHECK(Request.Format(Package)); + + // IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + // cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)}, + // cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + // cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + // CHECK(Result.status_code == 200); + // cacherequests::PutCacheRecordsResult ParsedResult; + // CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + // CHECK(!Response.IsNull()); + // CHECK(ParsedResult.Parse(Response)); + // for (bool ResponseSuccess : ParsedResult.Success) + // { + // CHECK(!ResponseSuccess); + // } + // } + + // auto CheckRecordCorrectness = [&](const ZenConfig& Cfg, size_t ExpectedPayloadSize) { + // CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); + // GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy); + + // CHECK(Result.Result.Results.size() == Keys.size()); + + // for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + // { + // CHECK(Record); + // const CacheKey& ExpectedKey = Keys[Index++]; + // CHECK(Record->Key == ExpectedKey); + // for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + // { + // CHECK(Value.RawSize == ExpectedPayloadSize); + // } + // } + // }; + + // // Check that the records are present and not overwritten in the local server + // CheckRecordCorrectness(LocalCfg, PayloadSize); + + // // Check that the records are present and are the newer size in the upstream server + // CheckRecordCorrectness(UpstreamCfg, PayloadSize*2); + // } + + SUBCASE("RpcAcceptOptions") + { + using namespace utils; + + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + ZenServerInstance Inst(TestEnv); + Inst.SetTestDir(TestDir); + + const uint16_t BasePort = Inst.SpawnServerAndWaitUntilReady(); + const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort); + + std::vector<zen::CacheKey> SmallKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024); + std::vector<zen::CacheKey> LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024 * 1024 * 16, SmallKeys.size()); + + std::vector<zen::CacheKey> Keys(SmallKeys.begin(), SmallKeys.end()); + Keys.insert(Keys.end(), LargeKeys.begin(), LargeKeys.end()); + + { + GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(!IsFileRef); + } + } + } + + // File path, but only for large files + { + GetCacheRecordResult Result = + GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default, RpcAcceptOptions::kAllowLocalReferences); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(IsFileRef == (Body.Size() > 1024)); + } + } + } + + // File path, for all files + { + GetCacheRecordResult Result = + GetCacheRecords(BaseUri, + "ue4.ddc"sv, + Keys, + CachePolicy::Default, + RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(IsFileRef); + } + } + } + + // File handle, but only for large files + { + GetCacheRecordResult Result = GetCacheRecords(BaseUri, + "ue4.ddc"sv, + Keys, + CachePolicy::Default, + RpcAcceptOptions::kAllowLocalReferences, + GetCurrentProcessId()); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(IsFileRef == (Body.Size() > 1024)); + } + } + } + + // File handle, for all files + { + GetCacheRecordResult Result = + GetCacheRecords(BaseUri, + "ue4.ddc"sv, + Keys, + CachePolicy::Default, + RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences, + GetCurrentProcessId()); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(IsFileRef); + } + } + } + } +} + +TEST_CASE("zcache.failing.upstream") +{ + // This is an exploratory test that takes a long time to run, so lets skip it by default + if (true) + { + return; + } + + using namespace std::literals; + using namespace utils; + + ZenConfig Upstream1Cfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance Upstream1Server(TestEnv); + SpawnServer(Upstream1Server, Upstream1Cfg); + + ZenConfig Upstream2Cfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance Upstream2Server(TestEnv); + SpawnServer(Upstream2Server, Upstream2Cfg); + + std::vector<std::uint16_t> UpstreamPorts = {Upstream1Cfg.Port, Upstream2Cfg.Port}; + ZenConfig LocalCfg = ZenConfig::NewWithThreadedUpstreams(TestEnv.GetNewPortNumber(), UpstreamPorts, false); + LocalCfg.Args += (" --upstream-thread-count 2"); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + const uint16_t LocalPortNumber = LocalCfg.Port; + const auto LocalUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); + const auto Upstream1Uri = fmt::format("http://localhost:{}/z$", Upstream1Cfg.Port); + const auto Upstream2Uri = fmt::format("http://localhost:{}/z$", Upstream2Cfg.Port); + + bool Upstream1Running = true; + bool Upstream2Running = true; + + using namespace std::literals; + + auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, + const zen::CacheKey& CacheKey, + size_t PayloadSize, + CachePolicy RecordPolicy) { + std::vector<uint32_t> Data; + Data.resize(PayloadSize / 4); + for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx) + { + Data[Idx] = (*reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0])) + Idx; + } + + CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4)); + Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy}); + }; + + auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri, + std::string_view Namespace, + std::string_view Bucket, + size_t Num, + size_t KeyOffset, + size_t PayloadSize = 8192) -> std::vector<CacheKey> { + std::vector<zen::CacheKey> OutKeys; + + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + for (size_t Key = 1; Key <= Num; ++Key) + { + zen::IoHash KeyHash; + ((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key; + const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); + + AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default); + OutKeys.push_back(CacheKey); + } + + CbPackage Package; + CHECK(Request.Format(Package)); + + HttpClient Http{BaseUri}; + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + + if (Result.StatusCode != HttpResponseCode::OK) + { + ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", ToString(Result.StatusCode), Result.ErrorMessage("")); + OutKeys.clear(); + } + + return OutKeys; + }; + + struct GetCacheRecordResult + { + zen::CbPackage Response; + cacherequests::GetCacheRecordsResult Result; + bool Success = false; + }; + + auto GetCacheRecords = [](std::string_view BaseUri, + std::string_view Namespace, + std::span<zen::CacheKey> Keys, + zen::CachePolicy Policy) -> GetCacheRecordResult { + cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = Policy, + .Namespace = std::string(Namespace)}; + for (const CacheKey& Key : Keys) + { + Request.Requests.push_back({.Key = Key}); + } + + CbObjectWriter RequestWriter; + CHECK(Request.Format(RequestWriter)); + + IoBuffer Body = RequestWriter.Save().GetBuffer().AsIoBuffer(); + Body.SetContentType(HttpContentType::kCbObject); + + HttpClient Http{BaseUri}; + + HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + + GetCacheRecordResult OutResult; + + if (Result.StatusCode == HttpResponseCode::OK) + { + CbPackage Response = ParsePackageMessage(Result.ResponsePayload); + if (!Response.IsNull()) + { + OutResult.Response = std::move(Response); + CHECK(OutResult.Result.Parse(OutResult.Response)); + OutResult.Success = true; + } + } + else + { + ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", ToString(Result.StatusCode), Result.ErrorMessage("")); + } + + return OutResult; + }; + + // Populate with some simple data + + CachePolicy Policy = CachePolicy::Default; + + const size_t ThreadCount = 128; + const size_t KeyMultiplier = 16384; + const size_t RecordsPerRequest = 64; + WorkerThreadPool Pool(ThreadCount); + + std::atomic_size_t Completed = 0; + + auto Keys = new std::vector<CacheKey>[ThreadCount * KeyMultiplier]; + RwLock KeysLock; + + for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) + { + size_t Iteration = I; + Pool.ScheduleWork( + [&] { + std::vector<CacheKey> NewKeys = + PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest); + if (NewKeys.size() != RecordsPerRequest) + { + ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration); + Completed.fetch_add(1); + return; + } + { + RwLock::ExclusiveLockScope _(KeysLock); + Keys[Iteration].swap(NewKeys); + } + Completed.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); + } + bool UseUpstream1 = false; + while (Completed < ThreadCount * KeyMultiplier) + { + Sleep(8000); + + if (UseUpstream1) + { + if (Upstream2Running) + { + Upstream2Server.EnableTermination(); + Upstream2Server.Shutdown(); + Sleep(100); + Upstream2Running = false; + } + if (!Upstream1Running) + { + SpawnServer(Upstream1Server, Upstream1Cfg); + Upstream1Running = true; + } + UseUpstream1 = !UseUpstream1; + } + else + { + if (Upstream1Running) + { + Upstream1Server.EnableTermination(); + Upstream1Server.Shutdown(); + Sleep(100); + Upstream1Running = false; + } + if (!Upstream2Running) + { + SpawnServer(Upstream2Server, Upstream2Cfg); + Upstream2Running = true; + } + UseUpstream1 = !UseUpstream1; + } + } + + Completed = 0; + for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) + { + size_t Iteration = I; + std::vector<CacheKey>& LocalKeys = Keys[Iteration]; + if (LocalKeys.empty()) + { + Completed.fetch_add(1); + continue; + } + Pool.ScheduleWork( + [&] { + GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy); + + if (!Result.Success) + { + ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration); + Completed.fetch_add(1); + return; + } + + if (Result.Result.Results.size() != LocalKeys.size()) + { + ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration); + Completed.fetch_add(1); + return; + } + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + const CacheKey& ExpectedKey = LocalKeys[Index++]; + if (!Record) + { + continue; + } + if (Record->Key != ExpectedKey) + { + continue; + } + if (Record->Values.size() != 1) + { + continue; + } + + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + if (!Value.Body) + { + continue; + } + } + } + Completed.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); + } + while (Completed < ThreadCount * KeyMultiplier) + { + Sleep(10); + } +} + +TEST_CASE("zcache.rpc.partialchunks") +{ + using namespace std::literals; + using namespace utils; + + ZenConfig LocalCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance Server(TestEnv); + SpawnServer(Server, LocalCfg); + + std::vector<CompressedBuffer> Attachments; + + const auto BaseUri = fmt::format("http://localhost:{}/z$", Server.GetBasePort()); + + auto GenerateKey = [](std::string_view Bucket, size_t KeyIndex) -> CacheKey { + IoHash KeyHash; + ((size_t*)(KeyHash.Hash))[0] = KeyIndex; + return CacheKey::Create(Bucket, KeyHash); + }; + + auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, + const CacheKey& CacheKey, + size_t AttachmentCount, + size_t AttachmentsSize, + CachePolicy RecordPolicy) -> std::vector<std::pair<Oid, CompressedBuffer>> { + std::vector<std::pair<Oid, CompressedBuffer>> AttachmentBuffers; + std::vector<cacherequests::PutCacheRecordRequestValue> Attachments; + for (size_t AttachmentIndex = 0; AttachmentIndex < AttachmentCount; AttachmentIndex++) + { + CompressedBuffer Value = CreateSemiRandomBlob(AttachmentsSize); + AttachmentBuffers.push_back(std::make_pair(Oid::NewOid(), Value)); + Attachments.push_back({.Id = AttachmentBuffers.back().first, .Body = std::move(Value)}); + } + Request.Requests.push_back({.Key = CacheKey, .Values = Attachments, .Policy = RecordPolicy}); + return AttachmentBuffers; + }; + + auto PutCacheRecords = [&AppendCacheRecord, &GenerateKey]( + std::string_view BaseUri, + std::string_view Namespace, + std::string_view Bucket, + size_t KeyOffset, + size_t Num, + size_t AttachmentCount, + size_t AttachmentsSize = + 8192) -> std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> { + std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> Keys; + + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; + for (size_t Key = 1; Key <= Num; ++Key) + { + const CacheKey NewCacheKey = GenerateKey(Bucket, KeyOffset + Key); + std::vector<std::pair<Oid, CompressedBuffer>> Attachments = + AppendCacheRecord(Request, NewCacheKey, AttachmentCount, AttachmentsSize, CachePolicy::Default); + Keys.push_back(std::make_pair(NewCacheKey, std::move(Attachments))); + } + + CbPackage Package; + CHECK(Request.Format(Package)); + + HttpClient Http{BaseUri}; + + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + Body.SetContentType(HttpContentType::kCbPackage); + + HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + + if (Result.StatusCode != HttpResponseCode::OK) + { + ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", ToString(Result.StatusCode), Result.ErrorMessage("")); + Keys.clear(); + } + + return Keys; + }; + + std::string_view TestBucket = "partialcachevaluetests"sv; + std::string_view TestNamespace = "ue4.ddc"sv; + auto RecordsWithSmallAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 0, 3, 2, 4096u); + CHECK(RecordsWithSmallAttachments.size() == 3); + auto RecordsWithLargeAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 10, 1, 2, 8u * 1024u * 1024u); + CHECK(RecordsWithLargeAttachments.size() == 1); + + struct PartialOptions + { + uint64_t Offset = 0ull; + uint64_t Size = ~0ull; + RpcAcceptOptions AcceptOptions = RpcAcceptOptions::kNone; + }; + + auto GetCacheChunk = [](std::string_view BaseUri, + std::string_view Namespace, + const CacheKey& Key, + const Oid& ValueId, + const PartialOptions& Options = {}) -> cacherequests::GetCacheChunksResult { + cacherequests::GetCacheChunksRequest Request = { + .AcceptMagic = kCbPkgMagic, + .AcceptOptions = (uint16_t)Options.AcceptOptions, + .Namespace = std::string(Namespace), + .Requests = {{.Key = Key, .ValueId = ValueId, .RawOffset = Options.Offset, .RawSize = Options.Size}}}; + CbPackage Package; + CHECK(Request.Format(Package)); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + Body.SetContentType(HttpContentType::kCbPackage); + + HttpClient Http{BaseUri}; + + HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + + CHECK(Result.StatusCode == HttpResponseCode::OK); + + CbPackage Response = ParsePackageMessage(Result.ResponsePayload); + bool Loaded = !Response.IsNull(); + CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); + cacherequests::GetCacheChunksResult GetCacheChunksResult; + CHECK(GetCacheChunksResult.Parse(Response)); + return GetCacheChunksResult; + }; + + auto GetAndVerifyChunk = [&GetCacheChunk](std::string_view BaseUri, + std::string_view Namespace, + const CacheKey& Key, + const Oid& ChunkId, + const CompressedBuffer& VerifyData, + const PartialOptions& Options = {}) { + cacherequests::GetCacheChunksResult Result = GetCacheChunk(BaseUri, Namespace, Key, ChunkId, Options); + CHECK(Result.Results.size() == 1); + bool CanGetPartial = ((uint16_t)Options.AcceptOptions & (uint16_t)RpcAcceptOptions::kAllowPartialCacheChunks); + if (!CanGetPartial) + { + CHECK(Result.Results[0].FragmentOffset == 0); + CHECK(Result.Results[0].Body.GetCompressedSize() == VerifyData.GetCompressedSize()); + } + IoBuffer SourceDecompressed = VerifyData.Decompress(Options.Offset, Options.Size).AsIoBuffer(); + IoBuffer ReceivedDecompressed = + Result.Results[0].Body.Decompress(Options.Offset - Result.Results[0].FragmentOffset, Options.Size).AsIoBuffer(); + CHECK(SourceDecompressed.GetView().EqualBytes(ReceivedDecompressed.GetView())); + }; + + GetAndVerifyChunk(BaseUri, + TestNamespace, + RecordsWithSmallAttachments[0].first, + RecordsWithSmallAttachments[0].second[0].first, + RecordsWithSmallAttachments[0].second[0].second); + GetAndVerifyChunk(BaseUri, + TestNamespace, + RecordsWithSmallAttachments[0].first, + RecordsWithSmallAttachments[0].second[0].first, + RecordsWithSmallAttachments[0].second[0].second, + PartialOptions{.Offset = 378, .Size = 519, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithSmallAttachments[0].first, + RecordsWithSmallAttachments[0].second[0].first, + RecordsWithSmallAttachments[0].second[0].second, + PartialOptions{.Offset = 378, + .Size = 519, + .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks}); + GetAndVerifyChunk(BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); + GetAndVerifyChunk(BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowPartialCacheChunks}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, + .Size = 512u * 1024u, + .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks}); + GetAndVerifyChunk( + BaseUri, + TestNamespace, + RecordsWithLargeAttachments[0].first, + RecordsWithLargeAttachments[0].second[0].first, + RecordsWithLargeAttachments[0].second[0].second, + PartialOptions{.Offset = 1024u * 1024u, + .Size = 512u * 1024u, + .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences | + RpcAcceptOptions::kAllowPartialCacheChunks}); +} + +IoBuffer +FormatPackageBody(const CbPackage& Package) +{ + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + Body.SetContentType(HttpContentType::kCbPackage); + return Body; +} + +TEST_CASE("zcache.rpc.allpolicies") +{ + using namespace std::literals; + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamServer(TestEnv); + SpawnServer(UpstreamServer, UpstreamCfg); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenServerInstance LocalServer(TestEnv); + SpawnServer(LocalServer, LocalCfg); + + const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalServer.GetBasePort()); + HttpClient Http{BaseUri}; + + std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv; + std::string_view TestBucket = "allpoliciestest"sv; + std::string_view TestNamespace = "ue4.ddc"sv; + + // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local) + // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type) + constexpr int NumKeys = 256; + constexpr int NumValues = 4; + Oid ValueIds[NumValues]; + IoHash Hash; + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + ExtendableStringBuilder<16> ValueName; + ValueName << "ValueId_"sv << ValueIndex; + static_assert(sizeof(IoHash) >= sizeof(Oid)); + ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash); + } + + struct KeyData; + struct UserData + { + UserData& Set(KeyData* InKeyData, int InValueIndex) + { + Data = InKeyData; + ValueIndex = InValueIndex; + return *this; + } + KeyData* Data = nullptr; + int ValueIndex = 0; + }; + struct KeyData + { + CompressedBuffer BufferValues[NumValues]; + uint64_t IntValues[NumValues]; + UserData ValueUserData[NumValues]; + bool ReceivedChunk[NumValues]; + CacheKey Key; + UserData KeyUserData; + uint32_t KeyIndex = 0; + bool GetRequestsData = true; + bool UseValueAPI = false; + bool UseValuePolicy = false; + bool ForceMiss = false; + bool UseLocal = true; + bool UseRemote = true; + bool ShouldBeHit = true; + bool ReceivedPut = false; + bool ReceivedGet = false; + bool ReceivedPutValue = false; + bool ReceivedGetValue = false; + }; + struct CachePutRequest + { + CacheKey Key; + CbObject Record; + CacheRecordPolicy Policy; + KeyData* Values; + UserData* Data; + }; + struct CachePutValueRequest + { + CacheKey Key; + CompressedBuffer Value; + CachePolicy Policy; + UserData* Data; + }; + struct CacheGetRequest + { + CacheKey Key; + CacheRecordPolicy Policy; + UserData* Data; + }; + struct CacheGetValueRequest + { + CacheKey Key; + CachePolicy Policy; + UserData* Data; + }; + struct CacheGetChunkRequest + { + CacheKey Key; + Oid ValueId; + uint64_t RawOffset; + uint64_t RawSize; + IoHash RawHash; + CachePolicy Policy; + UserData* Data; + }; + + KeyData KeyDatas[NumKeys]; + std::vector<CachePutRequest> PutRequests; + std::vector<CachePutValueRequest> PutValueRequests; + std::vector<CacheGetRequest> GetRequests; + std::vector<CacheGetValueRequest> GetValueRequests; + std::vector<CacheGetChunkRequest> ChunkRequests; + + for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex) + { + IoHashStream KeyWriter; + KeyWriter.Append(TestVersion.data(), TestVersion.length() * sizeof(TestVersion.data()[0])); + KeyWriter.Append(&KeyIndex, sizeof(KeyIndex)); + IoHash KeyHash = KeyWriter.GetHash(); + KeyData& KeyData = KeyDatas[KeyIndex]; + + KeyData.Key = CacheKey::Create(TestBucket, KeyHash); + KeyData.KeyIndex = KeyIndex; + KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0; + KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0; + KeyData.UseValuePolicy = (KeyIndex & (1 << 3)) != 0; + KeyData.ForceMiss = (KeyIndex & (1 << 4)) == 0; + KeyData.UseLocal = (KeyIndex & (1 << 5)) == 0; + KeyData.UseRemote = (KeyIndex & (1 << 6)) == 0; + KeyData.ShouldBeHit = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote); + CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None; + SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None; + CachePolicy PutPolicy = SharedPolicy; + CachePolicy GetPolicy = SharedPolicy; + GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None; + CacheKey& Key = KeyData.Key; + + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + KeyData.IntValues[ValueIndex] = static_cast<uint64_t>(KeyIndex) | (static_cast<uint64_t>(ValueIndex) << 32); + KeyData.BufferValues[ValueIndex] = + CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex]))); + KeyData.ReceivedChunk[ValueIndex] = false; + } + + UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex); + } + if (!KeyData.UseValueAPI) + { + CbObjectWriter Builder; + Builder.BeginObject("key"sv); + Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash; + Builder.EndObject(); + Builder.BeginArray("Values"sv); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + Builder.BeginObject(); + Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]); + Builder.AddBinaryAttachment("RawHash"sv, KeyData.BufferValues[ValueIndex].DecodeRawHash()); + Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].DecodeRawSize()); + Builder.EndObject(); + } + Builder.EndArray(); + + CacheRecordPolicy PutRecordPolicy; + CacheRecordPolicy GetRecordPolicy; + if (!KeyData.UseValuePolicy) + { + PutRecordPolicy = CacheRecordPolicy(PutPolicy); + GetRecordPolicy = CacheRecordPolicy(GetPolicy); + } + else + { + // Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies + // it will use the wrong value for SkipData and fail our tests. + CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData); + CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy); + GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy); + } + PutRecordPolicy = PutBuilder.Build(); + GetRecordPolicy = GetBuilder.Build(); + } + if (!KeyData.ForceMiss) + { + PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData}); + } + GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData}); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + UserData& ValueUserData = KeyData.ValueUserData[ValueIndex]; + ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData}); + } + } + else + { + if (!KeyData.ForceMiss) + { + PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData}); + } + GetValueRequests.push_back({Key, GetPolicy, &KeyUserData}); + ChunkRequests.push_back({Key, Oid::Zero, 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData}); + } + } + + // PutCacheRecords + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = BatchDefaultPolicy, + .Namespace = std::string(TestNamespace)}; + Request.Requests.reserve(PutRequests.size()); + for (CachePutRequest& PutRequest : PutRequests) + { + cacherequests::PutCacheRecordRequest& RecordRequest = Request.Requests.emplace_back(); + RecordRequest.Key = PutRequest.Key; + RecordRequest.Policy = PutRequest.Policy; + RecordRequest.Values.reserve(NumValues); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + RecordRequest.Values.push_back({.Id = ValueIds[ValueIndex], .Body = PutRequest.Values->BufferValues[ValueIndex]}); + } + PutRequest.Data->Data->ReceivedPut = true; + } + + CbPackage Package; + CHECK(Request.Format(Package)); + IoBuffer Body = FormatPackageBody(Package); + HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "PutCacheRecords unexpectedly failed."); + } + + // PutCacheValues + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + + cacherequests::PutCacheValuesRequest Request = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = BatchDefaultPolicy, + .Namespace = std::string(TestNamespace)}; + Request.Requests.reserve(PutValueRequests.size()); + for (CachePutValueRequest& PutRequest : PutValueRequests) + { + Request.Requests.push_back({.Key = PutRequest.Key, .Body = PutRequest.Value, .Policy = PutRequest.Policy}); + PutRequest.Data->Data->ReceivedPutValue = true; + } + + CbPackage Package; + CHECK(Request.Format(Package)); + + IoBuffer Body = FormatPackageBody(Package); + HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "PutCacheValues unexpectedly failed."); + } + + for (KeyData& KeyData : KeyDatas) + { + if (!KeyData.ForceMiss) + { + if (!KeyData.UseValueAPI) + { + CHECK_MESSAGE(KeyData.ReceivedPut, WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put.").c_str()); + } + else + { + CHECK_MESSAGE(KeyData.ReceivedPutValue, + WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put to ValueAPI.").c_str()); + } + } + } + + // GetCacheRecords + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = BatchDefaultPolicy, + .Namespace = std::string(TestNamespace)}; + Request.Requests.reserve(GetRequests.size()); + for (CacheGetRequest& GetRequest : GetRequests) + { + Request.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy}); + } + + CbPackage Package; + CHECK(Request.Format(Package)); + IoBuffer Body = FormatPackageBody(Package); + HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheRecords unexpectedly failed."); + CbPackage Response = ParsePackageMessage(Result.ResponsePayload); + bool Loaded = !Response.IsNull(); + CHECK_MESSAGE(Loaded, "GetCacheRecords response failed to load."); + cacherequests::GetCacheRecordsResult RequestResult; + CHECK(RequestResult.Parse(Response)); + CHECK_MESSAGE(RequestResult.Results.size() == GetRequests.size(), "GetCacheRecords response count did not match request count."); + for (int Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& RecordResult : RequestResult.Results) + { + bool Succeeded = RecordResult.has_value(); + CacheGetRequest& GetRequest = GetRequests[Index++]; + KeyData* KeyData = GetRequest.Data->Data; + KeyData->ReceivedGet = true; + WriteToString<32> Name("Get(", KeyData->KeyIndex, ")"); + if (KeyData->ShouldBeHit) + { + CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str()); + } + else if (KeyData->ForceMiss) + { + CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, " unexpectedly succeeded.").c_str()); + } + if (!KeyData->ForceMiss && Succeeded) + { + CHECK_MESSAGE(RecordResult->Values.size() == NumValues, + WriteToString<32>(Name, " number of values did not match.").c_str()); + for (const cacherequests::GetCacheRecordResultValue& Value : RecordResult->Values) + { + int ExpectedValueIndex = 0; + for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex) + { + if (ValueIds[ExpectedValueIndex] == Value.Id) + { + break; + } + } + CHECK_MESSAGE(ExpectedValueIndex < NumValues, WriteToString<32>(Name, " could not find matching ValueId.").c_str()); + + WriteToString<32> ValueName("Get(", KeyData->KeyIndex, ",", ExpectedValueIndex, ")"); + + CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex]; + CHECK_MESSAGE(Value.RawHash == ExpectedValue.DecodeRawHash(), + WriteToString<32>(ValueName, " RawHash did not match.").c_str()); + CHECK_MESSAGE(Value.RawSize == ExpectedValue.DecodeRawSize(), + WriteToString<32>(ValueName, " RawSize did not match.").c_str()); + + if (KeyData->GetRequestsData) + { + SharedBuffer Buffer = Value.Body.Decompress(); + CHECK_MESSAGE(Buffer.GetSize() == Value.RawSize, + WriteToString<32>(ValueName, " BufferSize did not match RawSize.").c_str()); + uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; + uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex]; + CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(ValueName, " had unexpected data.").c_str()); + } + } + } + } + } + + // GetCacheValues + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + + cacherequests::GetCacheValuesRequest GetCacheValuesRequest = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = BatchDefaultPolicy, + .Namespace = std::string(TestNamespace)}; + GetCacheValuesRequest.Requests.reserve(GetValueRequests.size()); + for (CacheGetValueRequest& GetRequest : GetValueRequests) + { + GetCacheValuesRequest.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy}); + } + + CbPackage Package; + CHECK(GetCacheValuesRequest.Format(Package)); + + IoBuffer Body = FormatPackageBody(Package); + HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheValues unexpectedly failed."); + IoBuffer MessageBuffer(Result.ResponsePayload); + CbPackage Response = ParsePackageMessage(MessageBuffer); + bool Loaded = !Response.IsNull(); + CHECK_MESSAGE(Loaded, "GetCacheValues response failed to load."); + cacherequests::GetCacheValuesResult GetCacheValuesResult; + CHECK(GetCacheValuesResult.Parse(Response)); + for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheValuesResult.Results) + { + bool Succeeded = ValueResult.RawHash != IoHash::Zero; + CacheGetValueRequest& Request = GetValueRequests[Index++]; + KeyData* KeyData = Request.Data->Data; + KeyData->ReceivedGetValue = true; + WriteToString<32> Name("GetValue("sv, KeyData->KeyIndex, ")"sv); + + if (KeyData->ShouldBeHit) + { + CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str()); + } + else if (KeyData->ForceMiss) + { + CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, "unexpectedly succeeded.").c_str()); + } + if (!KeyData->ForceMiss && Succeeded) + { + CompressedBuffer ExpectedValue = KeyData->BufferValues[0]; + CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(), + WriteToString<32>(Name, " RawHash did not match.").c_str()); + CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(), + WriteToString<32>(Name, " RawSize did not match.").c_str()); + + if (KeyData->GetRequestsData) + { + SharedBuffer Buffer = ValueResult.Body.Decompress(); + CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize, + WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str()); + uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; + uint64_t ExpectedIntValue = KeyData->IntValues[0]; + CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str()); + } + } + } + } + + // GetCacheChunks + { + std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) { + return A.Key.Hash < B.Key.Hash; + }); + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + cacherequests::GetCacheChunksRequest GetCacheChunksRequest = {.AcceptMagic = kCbPkgMagic, + .DefaultPolicy = BatchDefaultPolicy, + .Namespace = std::string(TestNamespace)}; + GetCacheChunksRequest.Requests.reserve(ChunkRequests.size()); + for (CacheGetChunkRequest& ChunkRequest : ChunkRequests) + { + GetCacheChunksRequest.Requests.push_back({.Key = ChunkRequest.Key, + .ValueId = ChunkRequest.ValueId, + .ChunkId = IoHash(), + .RawOffset = ChunkRequest.RawOffset, + .RawSize = ChunkRequest.RawSize, + .Policy = ChunkRequest.Policy}); + } + CbPackage Package; + CHECK(GetCacheChunksRequest.Format(Package)); + + IoBuffer Body = FormatPackageBody(Package); + HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheChunks unexpectedly failed."); + CbPackage Response = ParsePackageMessage(Result.ResponsePayload); + bool Loaded = !Response.IsNull(); + CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); + cacherequests::GetCacheChunksResult GetCacheChunksResult; + CHECK(GetCacheChunksResult.Parse(Response)); + CHECK_MESSAGE(GetCacheChunksResult.Results.size() == ChunkRequests.size(), + "GetCacheChunks response count did not match request count."); + + for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheChunksResult.Results) + { + bool Succeeded = ValueResult.RawHash != IoHash::Zero; + + CacheGetChunkRequest& Request = ChunkRequests[Index++]; + KeyData* KeyData = Request.Data->Data; + int ValueIndex = Request.Data->ValueIndex >= 0 ? Request.Data->ValueIndex : 0; + KeyData->ReceivedChunk[ValueIndex] = true; + WriteToString<32> Name("GetChunks("sv, KeyData->KeyIndex, ","sv, ValueIndex, ")"sv); + + if (KeyData->ShouldBeHit) + { + CHECK_MESSAGE(Succeeded, WriteToString<256>(Name, " unexpectedly failed."sv).c_str()); + } + else if (KeyData->ForceMiss) + { + CHECK_MESSAGE(!Succeeded, WriteToString<256>(Name, " unexpectedly succeeded."sv).c_str()); + } + if (KeyData->ShouldBeHit && Succeeded) + { + CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex]; + CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(), + WriteToString<32>(Name, " had unexpected RawHash.").c_str()); + CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(), + WriteToString<32>(Name, " had unexpected RawSize.").c_str()); + + if (KeyData->GetRequestsData) + { + SharedBuffer Buffer = ValueResult.Body.Decompress(); + CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize, + WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str()); + uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; + uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex]; + CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str()); + } + } + } + } + + for (KeyData& KeyData : KeyDatas) + { + if (!KeyData.UseValueAPI) + { + CHECK_MESSAGE(KeyData.ReceivedGet, WriteToString<32>("Get(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + CHECK_MESSAGE( + KeyData.ReceivedChunk[ValueIndex], + WriteToString<32>("GetChunks(", KeyData.KeyIndex, ",", ValueIndex, ") was unexpectedly not received.").c_str()); + } + } + else + { + CHECK_MESSAGE(KeyData.ReceivedGetValue, + WriteToString<32>("GetValue(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); + CHECK_MESSAGE(KeyData.ReceivedChunk[0], + WriteToString<32>("GetChunks(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str()); + } + } +} + +} // namespace zen::tests + +#endif |