aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver-test/cache-tests.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2025-09-29 13:15:16 +0200
committerGitHub Enterprise <[email protected]>2025-09-29 13:15:16 +0200
commitd4c6e547a7081b1562a69dc9839d24cb82681c5d (patch)
tree3ffe43dcf09bb6d01c2fb860bb1f73882f44827d /src/zenserver-test/cache-tests.cpp
parentgracefully handle missing chunks when exporting an oplog (#526) (diff)
downloadzen-d4c6e547a7081b1562a69dc9839d24cb82681c5d.tar.xz
zen-d4c6e547a7081b1562a69dc9839d24cb82681c5d.zip
split zenserver-test monolith into multiple source files (#528)
Diffstat (limited to 'src/zenserver-test/cache-tests.cpp')
-rw-r--r--src/zenserver-test/cache-tests.cpp2365
1 files changed, 2365 insertions, 0 deletions
diff --git a/src/zenserver-test/cache-tests.cpp b/src/zenserver-test/cache-tests.cpp
new file mode 100644
index 000000000..da0ef6b1d
--- /dev/null
+++ b/src/zenserver-test/cache-tests.cpp
@@ -0,0 +1,2365 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#if ZEN_WITH_TESTS
+# include "zenserver-test.h"
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/fmtutils.h>
+# include <zenhttp/packageformat.h>
+# include <zenutil/cache/cachepolicy.h>
+# include <zenutil/cache/cacherequests.h>
+# include <zencore/filesystem.h>
+# include <zencore/stream.h>
+# include <zencore/string.h>
+# include <zenutil/zenserverprocess.h>
+# include <zenhttp/httpclient.h>
+
+# include <random>
+
+namespace zen::tests {
+
+TEST_CASE("zcache.basic")
+{
+ using namespace std::literals;
+
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+
+ const int kIterationCount = 100;
+
+ auto HashKey = [](int i) -> zen::IoHash { return zen::IoHash::HashBuffer(&i, sizeof i); };
+
+ {
+ ZenServerInstance Instance1(TestEnv);
+ Instance1.SetTestDir(TestDir);
+
+ const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady();
+ const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
+
+ // Populate with some simple data
+
+ HttpClient Http{BaseUri};
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ zen::CbObjectWriter Cbo;
+ Cbo << "index" << i;
+
+ IoBuffer Payload = Cbo.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(HttpContentType::kCbObject);
+
+ zen::IoHash Key = HashKey(i);
+
+ HttpClient::Response Result = Http.Put(fmt::format("/test/{}", Key), Payload);
+
+ CHECK(Result.StatusCode == HttpResponseCode::Created);
+ }
+
+ // Retrieve data
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ zen::IoHash Key = HashKey(i);
+
+ HttpClient::Response Result = Http.Get(fmt::format("/test/{}", Key), {{"Accept", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ }
+
+ // Ensure bad bucket identifiers are rejected
+
+ {
+ zen::CbObjectWriter Cbo;
+ Cbo << "index" << 42;
+
+ IoBuffer Payload = Cbo.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(HttpContentType::kCbObject);
+
+ zen::IoHash Key = HashKey(442);
+
+ HttpClient::Response Result = Http.Put(fmt::format("/te!st/{}", Key), Payload);
+
+ CHECK(Result.StatusCode == HttpResponseCode::BadRequest);
+ }
+ }
+
+ // Verify that the data persists between process runs (the previous server has exited at this point)
+
+ {
+ ZenServerInstance Instance1(TestEnv);
+ Instance1.SetTestDir(TestDir);
+ const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady();
+
+ const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
+
+ HttpClient Http{BaseUri};
+
+ // Retrieve data again
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ zen::IoHash Key = HashKey(i);
+
+ HttpClient::Response Result = Http.Get(fmt::format("/{}/{}", "test", Key), {{"Accept", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ }
+ }
+}
+
+TEST_CASE("zcache.cbpackage")
+{
+ using namespace std::literals;
+
+ auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage {
+ auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9}));
+ auto CompressedData = zen::CompressedBuffer::Compress(Data);
+
+ OutAttachmentKey = CompressedData.DecodeRawHash();
+
+ zen::CbWriter Obj;
+ Obj.BeginObject("obj"sv);
+ Obj.AddBinaryAttachment("data", OutAttachmentKey);
+ Obj.EndObject();
+
+ zen::CbPackage Package;
+ Package.SetObject(Obj.Save().AsObject());
+ Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey));
+
+ return Package;
+ };
+
+ auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool {
+ std::span<const zen::CbAttachment> LhsAttachments = Lhs.GetAttachments();
+ std::span<const zen::CbAttachment> RhsAttachments = Rhs.GetAttachments();
+
+ if (LhsAttachments.size() != RhsAttachments.size())
+ {
+ return false;
+ }
+
+ for (const zen::CbAttachment& LhsAttachment : LhsAttachments)
+ {
+ const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash());
+ CHECK(RhsAttachment);
+
+ zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress();
+ CHECK(!LhsBuffer.IsNull());
+
+ zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress();
+ CHECK(!RhsBuffer.IsNull());
+
+ if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView()))
+ {
+ return false;
+ }
+ }
+
+ return true;
+ };
+
+ SUBCASE("PUT/GET returns correct package")
+ {
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+
+ ZenServerInstance Instance1(TestEnv);
+ Instance1.SetTestDir(TestDir);
+ const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady();
+ const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
+
+ HttpClient Http{BaseUri};
+
+ const std::string_view Bucket = "mosdef"sv;
+ zen::IoHash Key;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+
+ // PUT
+ {
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), Body);
+ CHECK(Result.StatusCode == HttpResponseCode::Created);
+ }
+
+ // GET
+ {
+ HttpClient::Response Result = Http.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Result.ResponsePayload);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+ }
+
+ SUBCASE("PUT propagates upstream")
+ {
+ // Setup local and remote server
+ std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
+ std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
+
+ ZenServerInstance RemoteInstance(TestEnv);
+ RemoteInstance.SetTestDir(RemoteDataDir);
+ const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady();
+
+ ZenServerInstance LocalInstance(TestEnv);
+ LocalInstance.SetTestDir(LocalDataDir);
+ LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(),
+ fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber));
+ const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady();
+ CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput());
+
+ const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
+ const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);
+
+ const std::string_view Bucket = "mosdef"sv;
+ zen::IoHash Key;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+
+ HttpClient LocalHttp{LocalBaseUri};
+ HttpClient RemoteHttp{RemoteBaseUri};
+
+ // Store the cache record package in the local instance
+ {
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key), Body);
+
+ CHECK(Result.StatusCode == HttpResponseCode::Created);
+ }
+
+ // The cache record can be retrieved as a package from the local instance
+ {
+ HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Result.ResponsePayload);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+
+ // The cache record can be retrieved as a package from the remote instance
+ {
+ HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Result.ResponsePayload);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+ }
+
+ SUBCASE("GET finds upstream when missing in local")
+ {
+ // Setup local and remote server
+ std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
+ std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
+
+ ZenServerInstance RemoteInstance(TestEnv);
+ RemoteInstance.SetTestDir(RemoteDataDir);
+ const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady();
+
+ ZenServerInstance LocalInstance(TestEnv);
+ LocalInstance.SetTestDir(LocalDataDir);
+ LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(),
+ fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber));
+ const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady();
+ CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput());
+
+ const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
+ const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);
+
+ HttpClient LocalHttp{LocalBaseUri};
+ HttpClient RemoteHttp{RemoteBaseUri};
+
+ const std::string_view Bucket = "mosdef"sv;
+ zen::IoHash Key;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+
+ // Store the cache record package in upstream cache
+ {
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), Body);
+
+ CHECK(Result.StatusCode == HttpResponseCode::Created);
+ }
+
+ // The cache record can be retrieved as a package from the local cache
+ {
+ HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Result.ResponsePayload);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+ }
+}
+
+TEST_CASE("zcache.policy")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::IoBuffer {
+ auto Buf = zen::UniqueBuffer::Alloc(Size);
+ uint8_t* Data = reinterpret_cast<uint8_t*>(Buf.GetData());
+ for (uint64_t Idx = 0; Idx < Size; Idx++)
+ {
+ Data[Idx] = Idx % 256;
+ }
+ OutHash = zen::IoHash::HashBuffer(Data, Size);
+ return Buf.MoveToShared().AsIoBuffer();
+ };
+
+ auto GeneratePackage = [](zen::IoHash& OutRecordKey, zen::IoHash& OutAttachmentKey) -> zen::CbPackage {
+ auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9}));
+ auto CompressedData = zen::CompressedBuffer::Compress(Data);
+ OutAttachmentKey = CompressedData.DecodeRawHash();
+
+ zen::CbWriter Writer;
+ Writer.BeginObject("obj"sv);
+ Writer.AddBinaryAttachment("data", OutAttachmentKey);
+ Writer.EndObject();
+ CbObject CacheRecord = Writer.Save().AsObject();
+
+ OutRecordKey = IoHash::HashBuffer(CacheRecord.GetBuffer().GetView());
+
+ zen::CbPackage Package;
+ Package.SetObject(CacheRecord);
+ Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey));
+
+ return Package;
+ };
+
+ SUBCASE("query - 'local' does not query upstream (binary)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamInst(TestEnv);
+ UpstreamCfg.Spawn(UpstreamInst);
+ const uint16_t UpstreamPort = UpstreamCfg.Port;
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort);
+ ZenServerInstance LocalInst(TestEnv);
+ LocalCfg.Spawn(LocalInst);
+
+ const std::string_view Bucket = "legacy"sv;
+
+ zen::IoHash Key;
+ IoBuffer BinaryValue = GenerateData(1024, Key);
+
+ HttpClient LocalHttp{LocalCfg.BaseUri};
+ HttpClient RemoteHttp{UpstreamCfg.BaseUri};
+
+ {
+ HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue);
+ CHECK(Result.StatusCode == HttpResponseCode::Created);
+ }
+
+ {
+ HttpClient::Response Result =
+ LocalHttp.Get(fmt::format("/{}/{}?Policy=QueryLocal,Store", Bucket, Key), {{"Accept", "application/octet-stream"}});
+ CHECK(Result.StatusCode == HttpResponseCode::NotFound);
+ }
+
+ {
+ HttpClient::Response Result =
+ LocalHttp.Get(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), {{"Accept", "application/octet-stream"}});
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ }
+ }
+
+ SUBCASE("store - 'local' does not store upstream (binary)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamInst(TestEnv);
+ UpstreamCfg.Spawn(UpstreamInst);
+ const uint16_t UpstreamPort = UpstreamCfg.Port;
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort);
+ ZenServerInstance LocalInst(TestEnv);
+ LocalCfg.Spawn(LocalInst);
+
+ const auto Bucket = "legacy"sv;
+
+ zen::IoHash Key;
+ IoBuffer BinaryValue = GenerateData(1024, Key);
+
+ HttpClient LocalHttp{LocalCfg.BaseUri};
+ HttpClient RemoteHttp{UpstreamCfg.BaseUri};
+
+ // Store binary cache value locally
+ {
+ HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,StoreLocal", Bucket, Key),
+ BinaryValue,
+ {{"Content-Type", "application/octet-stream"}});
+ CHECK(Result.StatusCode == HttpResponseCode::Created);
+ }
+
+ {
+ HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}});
+ CHECK(Result.StatusCode == HttpResponseCode::NotFound);
+ }
+
+ {
+ HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}});
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ }
+ }
+
+ SUBCASE("store - 'local/remote' stores local and upstream (binary)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamInst(TestEnv);
+ UpstreamCfg.Spawn(UpstreamInst);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
+ ZenServerInstance LocalInst(TestEnv);
+ LocalCfg.Spawn(LocalInst);
+
+ const auto Bucket = "legacy"sv;
+
+ zen::IoHash Key;
+ IoBuffer BinaryValue = GenerateData(1024, Key);
+
+ HttpClient LocalHttp{LocalCfg.BaseUri};
+ HttpClient RemoteHttp{UpstreamCfg.BaseUri};
+
+ // Store binary cache value locally and upstream
+ {
+ HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key),
+ BinaryValue,
+ {{"Content-Type", "application/octet-stream"}});
+ CHECK(Result.StatusCode == HttpResponseCode::Created);
+ }
+
+ {
+ HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}});
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ }
+
+ {
+ HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/octet-stream"}});
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ }
+ }
+
+ SUBCASE("query - 'local' does not query upstream (cbpackage)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamInst(TestEnv);
+ UpstreamCfg.Spawn(UpstreamInst);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
+ ZenServerInstance LocalInst(TestEnv);
+ LocalCfg.Spawn(LocalInst);
+
+ const auto Bucket = "legacy"sv;
+
+ zen::IoHash Key;
+ zen::IoHash PayloadId;
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
+ IoBuffer Buf = SerializeToBuffer(Package);
+
+ HttpClient LocalHttp{LocalCfg.BaseUri};
+ HttpClient RemoteHttp{UpstreamCfg.BaseUri};
+
+ // Store package upstream
+ {
+ HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), Buf);
+ CHECK(Result.StatusCode == HttpResponseCode::Created);
+ }
+
+ {
+ HttpClient::Response Result =
+ LocalHttp.Get(fmt::format("/{}/{}?Policy=QueryLocal,Store", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.StatusCode == HttpResponseCode::NotFound);
+ }
+
+ {
+ HttpClient::Response Result =
+ LocalHttp.Get(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ }
+ }
+
+ SUBCASE("store - 'local' does not store upstream (cbpackage)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamInst(TestEnv);
+ UpstreamCfg.Spawn(UpstreamInst);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
+ ZenServerInstance LocalInst(TestEnv);
+ LocalCfg.Spawn(LocalInst);
+
+ const auto Bucket = "legacy"sv;
+
+ zen::IoHash Key;
+ zen::IoHash PayloadId;
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
+ IoBuffer Buf = SerializeToBuffer(Package);
+
+ HttpClient LocalHttp{LocalCfg.BaseUri};
+ HttpClient RemoteHttp{UpstreamCfg.BaseUri};
+
+ // Store package locally
+ {
+ HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,StoreLocal", Bucket, Key), Buf);
+ CHECK(Result.StatusCode == HttpResponseCode::Created);
+ }
+
+ {
+ HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.StatusCode == HttpResponseCode::NotFound);
+ }
+
+ {
+ HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ }
+ }
+
+ SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)")
+ {
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamInst(TestEnv);
+ UpstreamCfg.Spawn(UpstreamInst);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
+ ZenServerInstance LocalInst(TestEnv);
+ LocalCfg.Spawn(LocalInst);
+
+ const auto Bucket = "legacy"sv;
+
+ zen::IoHash Key;
+ zen::IoHash PayloadId;
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
+ IoBuffer Buf = SerializeToBuffer(Package);
+
+ HttpClient LocalHttp{LocalCfg.BaseUri};
+ HttpClient RemoteHttp{UpstreamCfg.BaseUri};
+
+ // Store package locally and upstream
+ {
+ HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), Buf);
+ CHECK(Result.StatusCode == HttpResponseCode::Created);
+ }
+
+ {
+ HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ }
+
+ {
+ HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ }
+ }
+
+ SUBCASE("skip - 'data' returns cache record without attachments/empty payload")
+ {
+ ZenConfig Cfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance Instance(TestEnv);
+ Cfg.Spawn(Instance);
+
+ const auto Bucket = "test"sv;
+
+ zen::IoHash Key;
+ zen::IoHash PayloadId;
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
+ IoBuffer Buf = SerializeToBuffer(Package);
+
+ HttpClient Http{Cfg.BaseUri};
+
+ // Store package
+ {
+ HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), Buf);
+ CHECK(Result.StatusCode == HttpResponseCode::Created);
+ }
+
+ // Get package
+ {
+ HttpClient::Response Result =
+ Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result);
+ CbPackage ResponsePackage;
+ CHECK(ResponsePackage.TryLoad(Result.ResponsePayload));
+ CHECK(ResponsePackage.GetAttachments().size() == 0);
+ }
+
+ // Get record
+ {
+ HttpClient::Response Result =
+ Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cb"}});
+ CHECK(Result);
+ CbObject ResponseObject = zen::LoadCompactBinaryObject(Result.ResponsePayload);
+ CHECK(ResponseObject);
+ }
+
+ // Get payload
+ {
+ HttpClient::Response Result =
+ Http.Get(fmt::format("/{}/{}/{}?Policy=Default,SkipData", Bucket, Key, PayloadId), {{"Accept", "application/x-ue-comp"}});
+ CHECK(Result);
+ CHECK(Result.ResponsePayload.GetSize() == 0);
+ }
+ }
+
+ SUBCASE("skip - 'data' returns empty binary value")
+ {
+ ZenConfig Cfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance Instance(TestEnv);
+ Cfg.Spawn(Instance);
+
+ const auto Bucket = "test"sv;
+
+ zen::IoHash Key;
+ IoBuffer BinaryValue = GenerateData(1024, Key);
+
+ HttpClient Http{Cfg.BaseUri};
+
+ // Store binary cache value
+ {
+ HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue);
+ CHECK(Result.StatusCode == HttpResponseCode::Created);
+ }
+
+ // Get package
+ {
+ HttpClient::Response Result =
+ Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/octet-stream"}});
+ CHECK(Result);
+ CHECK(Result.ResponsePayload.GetSize() == 0);
+ }
+ }
+}
+
+TEST_CASE("zcache.rpc")
+{
+ using namespace std::literals;
+
+ auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request,
+ const zen::CacheKey& CacheKey,
+ size_t PayloadSize,
+ CachePolicy RecordPolicy) {
+ std::vector<uint8_t> Data;
+ Data.resize(PayloadSize);
+ uint32_t DataSeed = *reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0]);
+ uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data());
+ for (size_t Idx = 0; Idx < PayloadSize / 2; ++Idx)
+ {
+ DataPtr[Idx] = static_cast<uint16_t>((Idx + DataSeed) % 0xffffu);
+ }
+ if (PayloadSize & 1)
+ {
+ Data[PayloadSize - 1] = static_cast<uint8_t>((PayloadSize - 1) & 0xff);
+ }
+ CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
+ Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy});
+ };
+
+ auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ size_t Num,
+ size_t PayloadSize = 1024,
+ size_t KeyOffset = 1,
+ CachePolicy PutPolicy = CachePolicy::Default,
+ std::vector<CbPackage>* OutPackages = nullptr) -> std::vector<CacheKey> {
+ std::vector<zen::CacheKey> OutKeys;
+
+ HttpClient Http{BaseUri};
+
+ for (uint32_t Key = 1; Key <= Num; ++Key)
+ {
+ zen::IoHash KeyHash;
+ ((uint32_t*)(KeyHash.Hash))[0] = gsl::narrow<uint32_t>(KeyOffset + Key);
+ const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
+
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ AppendCacheRecord(Request, CacheKey, PayloadSize, PutPolicy);
+ OutKeys.push_back(CacheKey);
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ Body.SetContentType(HttpContentType::kCbPackage);
+ HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ if (OutPackages)
+ {
+ OutPackages->emplace_back(std::move(Package));
+ }
+ }
+
+ return OutKeys;
+ };
+
+ struct GetCacheRecordResult
+ {
+ zen::CbPackage Response;
+ cacherequests::GetCacheRecordsResult Result;
+ bool Success;
+ };
+
+ auto GetCacheRecords = [](std::string_view BaseUri,
+ std::string_view Namespace,
+ std::span<zen::CacheKey> Keys,
+ zen::CachePolicy Policy,
+ zen::RpcAcceptOptions AcceptOptions = zen::RpcAcceptOptions::kNone,
+ int Pid = 0) -> GetCacheRecordResult {
+ cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic,
+ .AcceptOptions = static_cast<uint16_t>(AcceptOptions),
+ .ProcessPid = Pid,
+ .DefaultPolicy = Policy,
+ .Namespace = std::string(Namespace)};
+ for (const CacheKey& Key : Keys)
+ {
+ Request.Requests.push_back({.Key = Key});
+ }
+
+ CbObjectWriter RequestWriter;
+ CHECK(Request.Format(RequestWriter));
+
+ IoBuffer Body = RequestWriter.Save().GetBuffer().AsIoBuffer();
+ Body.SetContentType(HttpContentType::kCbObject);
+
+ HttpClient Http{BaseUri};
+
+ HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+
+ GetCacheRecordResult OutResult;
+
+ if (Result.StatusCode == HttpResponseCode::OK)
+ {
+ CbPackage Response = ParsePackageMessage(Result.ResponsePayload);
+ CHECK(!Response.IsNull());
+ OutResult.Response = std::move(Response);
+ CHECK(OutResult.Result.Parse(OutResult.Response));
+ OutResult.Success = true;
+ }
+
+ return OutResult;
+ };
+
+ SUBCASE("get cache records")
+ {
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+
+ ZenServerInstance Inst(TestEnv);
+ Inst.SetTestDir(TestDir);
+
+ const uint16_t BasePort = Inst.SpawnServerAndWaitUntilReady();
+ const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort);
+
+ CachePolicy Policy = CachePolicy::Default;
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128);
+ GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record);
+ CHECK(Record->Key == ExpectedKey);
+ CHECK(Record->Values.size() == 1);
+
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.Body);
+ }
+ }
+ }
+
+ SUBCASE("get missing cache records")
+ {
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+
+ ZenServerInstance Inst(TestEnv);
+ Inst.SetTestDir(TestDir);
+ const uint16_t BasePort = Inst.SpawnServerAndWaitUntilReady();
+ const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort);
+
+ CachePolicy Policy = CachePolicy::Default;
+ std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128);
+ std::vector<zen::CacheKey> Keys;
+
+ for (const zen::CacheKey& Key : ExistingKeys)
+ {
+ Keys.push_back(Key);
+ Keys.push_back(CacheKey::Create("missing"sv, IoHash::Zero));
+ }
+
+ GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ size_t KeyIndex = 0;
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ const bool Missing = Index++ % 2 != 0;
+
+ if (Missing)
+ {
+ CHECK(!Record);
+ }
+ else
+ {
+ const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.Body);
+ }
+ }
+ }
+ }
+
+ SUBCASE("policy - 'QueryLocal' does not query upstream")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4);
+
+ CachePolicy Policy = CachePolicy::QueryLocal;
+ GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(!Record);
+ }
+ }
+
+ SUBCASE("policy - 'QueryRemote' does query upstream")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4);
+
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ }
+ }
+
+ SUBCASE("policy - 'QueryLocal' on put allows overwrite with differing value when not limiting overwrites")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ size_t PayloadSize = 1024;
+ std::string_view Namespace("ue4.ddc"sv);
+ std::string_view Bucket("mastodon"sv);
+ const size_t NumRecords = 4;
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize);
+
+ HttpClient LocalHttp{LocalCfg.BaseUri};
+ HttpClient UpstreamHttp{UpstreamCfg.BaseUri};
+
+ for (const zen::CacheKey& CacheKey : Keys)
+ {
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default);
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ Body.SetContentType(HttpContentType::kCbPackage);
+ HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ cacherequests::PutCacheRecordsResult ParsedResult;
+ CbPackage Response = ParsePackageMessage(Result.ResponsePayload);
+ CHECK(!Response.IsNull());
+ CHECK(ParsedResult.Parse(Response));
+ for (bool ResponseSuccess : ParsedResult.Success)
+ {
+ CHECK(ResponseSuccess);
+ }
+ CHECK(ParsedResult.Details.empty());
+ }
+
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize * 2);
+ }
+ }
+ };
+
+ // Check that the records are present and overwritten in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and overwritten in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
+ }
+
+ SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites");
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ size_t PayloadSize = 1024;
+ std::string_view Namespace("ue4.ddc"sv);
+ std::string_view Bucket("mastodon"sv);
+ const size_t NumRecords = 4;
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize);
+
+ HttpClient LocalHttp{LocalCfg.BaseUri};
+ HttpClient UpstreamHttp{UpstreamCfg.BaseUri};
+
+ for (const zen::CacheKey& CacheKey : Keys)
+ {
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default);
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ Body.SetContentType(HttpContentType::kCbPackage);
+
+ HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ cacherequests::PutCacheRecordsResult ParsedResult;
+ CbPackage Response = ParsePackageMessage(Result.ResponsePayload);
+ CHECK(!Response.IsNull());
+ CHECK(ParsedResult.Parse(Response));
+ CHECK(Request.Requests.size() == ParsedResult.Success.size());
+ for (bool ResponseSuccess : ParsedResult.Success)
+ {
+ CHECK(ResponseSuccess);
+ }
+ CHECK(Request.Requests.size() == ParsedResult.Details.size());
+ for (const CbObjectView& Details : ParsedResult.Details)
+ {
+ CHECK(Details);
+ CHECK(Details["RawHash"sv].IsHash());
+ CHECK(Details["RawSize"sv].IsInteger());
+ CHECK(Details["Record"sv].IsObject());
+ }
+ }
+
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize);
+ }
+ }
+ };
+
+ // Check that the records are present and not overwritten in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and not overwritten in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
+ }
+
+ SUBCASE("policy - no 'QueryLocal' on put allows overwrite with differing value when limiting overwrites")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites");
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ HttpClient LocalHttp{LocalCfg.BaseUri};
+ HttpClient UpstreamHttp{UpstreamCfg.BaseUri};
+
+ size_t PayloadSize = 1024;
+ std::string_view Namespace("ue4.ddc"sv);
+ std::string_view Bucket("mastodon"sv);
+ const size_t NumRecords = 4;
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize);
+
+ for (const zen::CacheKey& CacheKey : Keys)
+ {
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Store);
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ Body.SetContentType(HttpContentType::kCbPackage);
+ HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ cacherequests::PutCacheRecordsResult ParsedResult;
+ CbPackage Response = ParsePackageMessage(Result.ResponsePayload);
+ CHECK(!Response.IsNull());
+ CHECK(ParsedResult.Parse(Response));
+ for (bool ResponseSuccess : ParsedResult.Success)
+ {
+ CHECK(ResponseSuccess);
+ }
+ CHECK(ParsedResult.Details.empty());
+ }
+
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize * 2);
+ }
+ }
+ };
+
+ // Check that the records are present and overwritten in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and overwritten in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
+ }
+
+ SUBCASE("policy - 'QueryLocal' on put allows overwrite with equivalent value when limiting overwrites")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port, "--cache-bucket-limit-overwrites");
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ HttpClient LocalHttp{LocalCfg.BaseUri};
+ HttpClient UpstreamHttp{UpstreamCfg.BaseUri};
+
+ size_t PayloadSize = 1024;
+ std::string_view Namespace("ue4.ddc"sv);
+ std::string_view Bucket("mastodon"sv);
+ const size_t NumRecords = 4;
+ std::vector<CbPackage> Packages;
+ std::vector<zen::CacheKey> Keys =
+ PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1, CachePolicy::Default, &Packages);
+
+ for (const CbPackage& Package : Packages)
+ {
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ Body.SetContentType(HttpContentType::kCbPackage);
+ HttpClient::Response Result = LocalHttp.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ cacherequests::PutCacheRecordsResult ParsedResult;
+ CbPackage Response = ParsePackageMessage(Result.ResponsePayload);
+ CHECK(!Response.IsNull());
+ CHECK(ParsedResult.Parse(Response));
+ for (bool ResponseSuccess : ParsedResult.Success)
+ {
+ CHECK(ResponseSuccess);
+ }
+ CHECK(ParsedResult.Details.empty());
+ }
+
+ auto CheckRecordCorrectness = [&](const ZenConfig& Cfg) {
+ CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ CHECK(Value.RawSize == PayloadSize);
+ }
+ }
+ };
+
+ // Check that the records are present and unchanged in the local server
+ CheckRecordCorrectness(LocalCfg);
+
+ // Check that the records are present and unchanged in the upstream server
+ CheckRecordCorrectness(UpstreamCfg);
+ }
+
+ // TODO: Propagation for rejected PUTs
+ // SUBCASE("policy - 'QueryLocal' on put denies overwrite with differing value when limiting overwrites but allows propagation to
+ // upstream")
+ // {
+ // using namespace utils;
+
+ // ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ // ZenServerInstance UpstreamServer(TestEnv);
+ // SpawnServer(UpstreamServer, UpstreamCfg);
+
+ // ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port,
+ // "--cache-bucket-limit-overwrites"); ZenServerInstance LocalServer(TestEnv); SpawnServer(LocalServer, LocalCfg);
+
+ // size_t PayloadSize = 1024;
+ // std::string_view Namespace("ue4.ddc"sv);
+ // std::string_view Bucket("mastodon"sv);
+ // const size_t NumRecords = 4;
+ // std::vector<zen::CacheKey> Keys = PutCacheRecords(LocalCfg.BaseUri, Namespace, Bucket, NumRecords, PayloadSize, 1,
+ // CachePolicy::Local);
+
+ // for (const zen::CacheKey& CacheKey : Keys)
+ // {
+ // cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ // AppendCacheRecord(Request, CacheKey, PayloadSize * 2, CachePolicy::Default);
+
+ // CbPackage Package;
+ // CHECK(Request.Format(Package));
+
+ // IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ // cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", LocalCfg.BaseUri)},
+ // cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ // cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ // CHECK(Result.status_code == 200);
+ // cacherequests::PutCacheRecordsResult ParsedResult;
+ // CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ // CHECK(!Response.IsNull());
+ // CHECK(ParsedResult.Parse(Response));
+ // for (bool ResponseSuccess : ParsedResult.Success)
+ // {
+ // CHECK(!ResponseSuccess);
+ // }
+ // }
+
+ // auto CheckRecordCorrectness = [&](const ZenConfig& Cfg, size_t ExpectedPayloadSize) {
+ // CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ // GetCacheRecordResult Result = GetCacheRecords(Cfg.BaseUri, "ue4.ddc"sv, Keys, Policy);
+
+ // CHECK(Result.Result.Results.size() == Keys.size());
+
+ // for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ // {
+ // CHECK(Record);
+ // const CacheKey& ExpectedKey = Keys[Index++];
+ // CHECK(Record->Key == ExpectedKey);
+ // for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ // {
+ // CHECK(Value.RawSize == ExpectedPayloadSize);
+ // }
+ // }
+ // };
+
+ // // Check that the records are present and not overwritten in the local server
+ // CheckRecordCorrectness(LocalCfg, PayloadSize);
+
+ // // Check that the records are present and are the newer size in the upstream server
+ // CheckRecordCorrectness(UpstreamCfg, PayloadSize*2);
+ // }
+
+ SUBCASE("RpcAcceptOptions")
+ {
+ using namespace utils;
+
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+
+ ZenServerInstance Inst(TestEnv);
+ Inst.SetTestDir(TestDir);
+
+ const uint16_t BasePort = Inst.SpawnServerAndWaitUntilReady();
+ const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort);
+
+ std::vector<zen::CacheKey> SmallKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024);
+ std::vector<zen::CacheKey> LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024 * 1024 * 16, SmallKeys.size());
+
+ std::vector<zen::CacheKey> Keys(SmallKeys.begin(), SmallKeys.end());
+ Keys.insert(Keys.end(), LargeKeys.begin(), LargeKeys.end());
+
+ {
+ GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(!IsFileRef);
+ }
+ }
+ }
+
+ // File path, but only for large files
+ {
+ GetCacheRecordResult Result =
+ GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default, RpcAcceptOptions::kAllowLocalReferences);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(IsFileRef == (Body.Size() > 1024));
+ }
+ }
+ }
+
+ // File path, for all files
+ {
+ GetCacheRecordResult Result =
+ GetCacheRecords(BaseUri,
+ "ue4.ddc"sv,
+ Keys,
+ CachePolicy::Default,
+ RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(IsFileRef);
+ }
+ }
+ }
+
+ // File handle, but only for large files
+ {
+ GetCacheRecordResult Result = GetCacheRecords(BaseUri,
+ "ue4.ddc"sv,
+ Keys,
+ CachePolicy::Default,
+ RpcAcceptOptions::kAllowLocalReferences,
+ GetCurrentProcessId());
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(IsFileRef == (Body.Size() > 1024));
+ }
+ }
+ }
+
+ // File handle, for all files
+ {
+ GetCacheRecordResult Result =
+ GetCacheRecords(BaseUri,
+ "ue4.ddc"sv,
+ Keys,
+ CachePolicy::Default,
+ RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences,
+ GetCurrentProcessId());
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(IsFileRef);
+ }
+ }
+ }
+ }
+}
+
+TEST_CASE("zcache.failing.upstream")
+{
+ // This is an exploratory test that takes a long time to run, so lets skip it by default
+ if (true)
+ {
+ return;
+ }
+
+ using namespace std::literals;
+ using namespace utils;
+
+ ZenConfig Upstream1Cfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance Upstream1Server(TestEnv);
+ SpawnServer(Upstream1Server, Upstream1Cfg);
+
+ ZenConfig Upstream2Cfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance Upstream2Server(TestEnv);
+ SpawnServer(Upstream2Server, Upstream2Cfg);
+
+ std::vector<std::uint16_t> UpstreamPorts = {Upstream1Cfg.Port, Upstream2Cfg.Port};
+ ZenConfig LocalCfg = ZenConfig::NewWithThreadedUpstreams(TestEnv.GetNewPortNumber(), UpstreamPorts, false);
+ LocalCfg.Args += (" --upstream-thread-count 2");
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ const uint16_t LocalPortNumber = LocalCfg.Port;
+ const auto LocalUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
+ const auto Upstream1Uri = fmt::format("http://localhost:{}/z$", Upstream1Cfg.Port);
+ const auto Upstream2Uri = fmt::format("http://localhost:{}/z$", Upstream2Cfg.Port);
+
+ bool Upstream1Running = true;
+ bool Upstream2Running = true;
+
+ using namespace std::literals;
+
+ auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request,
+ const zen::CacheKey& CacheKey,
+ size_t PayloadSize,
+ CachePolicy RecordPolicy) {
+ std::vector<uint32_t> Data;
+ Data.resize(PayloadSize / 4);
+ for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx)
+ {
+ Data[Idx] = (*reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0])) + Idx;
+ }
+
+ CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4));
+ Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy});
+ };
+
+ auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ size_t Num,
+ size_t KeyOffset,
+ size_t PayloadSize = 8192) -> std::vector<CacheKey> {
+ std::vector<zen::CacheKey> OutKeys;
+
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ for (size_t Key = 1; Key <= Num; ++Key)
+ {
+ zen::IoHash KeyHash;
+ ((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key;
+ const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
+
+ AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default);
+ OutKeys.push_back(CacheKey);
+ }
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ HttpClient Http{BaseUri};
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+
+ if (Result.StatusCode != HttpResponseCode::OK)
+ {
+ ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", ToString(Result.StatusCode), Result.ErrorMessage(""));
+ OutKeys.clear();
+ }
+
+ return OutKeys;
+ };
+
+ struct GetCacheRecordResult
+ {
+ zen::CbPackage Response;
+ cacherequests::GetCacheRecordsResult Result;
+ bool Success = false;
+ };
+
+ auto GetCacheRecords = [](std::string_view BaseUri,
+ std::string_view Namespace,
+ std::span<zen::CacheKey> Keys,
+ zen::CachePolicy Policy) -> GetCacheRecordResult {
+ cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = Policy,
+ .Namespace = std::string(Namespace)};
+ for (const CacheKey& Key : Keys)
+ {
+ Request.Requests.push_back({.Key = Key});
+ }
+
+ CbObjectWriter RequestWriter;
+ CHECK(Request.Format(RequestWriter));
+
+ IoBuffer Body = RequestWriter.Save().GetBuffer().AsIoBuffer();
+ Body.SetContentType(HttpContentType::kCbObject);
+
+ HttpClient Http{BaseUri};
+
+ HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+
+ GetCacheRecordResult OutResult;
+
+ if (Result.StatusCode == HttpResponseCode::OK)
+ {
+ CbPackage Response = ParsePackageMessage(Result.ResponsePayload);
+ if (!Response.IsNull())
+ {
+ OutResult.Response = std::move(Response);
+ CHECK(OutResult.Result.Parse(OutResult.Response));
+ OutResult.Success = true;
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", ToString(Result.StatusCode), Result.ErrorMessage(""));
+ }
+
+ return OutResult;
+ };
+
+ // Populate with some simple data
+
+ CachePolicy Policy = CachePolicy::Default;
+
+ const size_t ThreadCount = 128;
+ const size_t KeyMultiplier = 16384;
+ const size_t RecordsPerRequest = 64;
+ WorkerThreadPool Pool(ThreadCount);
+
+ std::atomic_size_t Completed = 0;
+
+ auto Keys = new std::vector<CacheKey>[ThreadCount * KeyMultiplier];
+ RwLock KeysLock;
+
+ for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++)
+ {
+ size_t Iteration = I;
+ Pool.ScheduleWork(
+ [&] {
+ std::vector<CacheKey> NewKeys =
+ PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest);
+ if (NewKeys.size() != RecordsPerRequest)
+ {
+ ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration);
+ Completed.fetch_add(1);
+ return;
+ }
+ {
+ RwLock::ExclusiveLockScope _(KeysLock);
+ Keys[Iteration].swap(NewKeys);
+ }
+ Completed.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ bool UseUpstream1 = false;
+ while (Completed < ThreadCount * KeyMultiplier)
+ {
+ Sleep(8000);
+
+ if (UseUpstream1)
+ {
+ if (Upstream2Running)
+ {
+ Upstream2Server.EnableTermination();
+ Upstream2Server.Shutdown();
+ Sleep(100);
+ Upstream2Running = false;
+ }
+ if (!Upstream1Running)
+ {
+ SpawnServer(Upstream1Server, Upstream1Cfg);
+ Upstream1Running = true;
+ }
+ UseUpstream1 = !UseUpstream1;
+ }
+ else
+ {
+ if (Upstream1Running)
+ {
+ Upstream1Server.EnableTermination();
+ Upstream1Server.Shutdown();
+ Sleep(100);
+ Upstream1Running = false;
+ }
+ if (!Upstream2Running)
+ {
+ SpawnServer(Upstream2Server, Upstream2Cfg);
+ Upstream2Running = true;
+ }
+ UseUpstream1 = !UseUpstream1;
+ }
+ }
+
+ Completed = 0;
+ for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++)
+ {
+ size_t Iteration = I;
+ std::vector<CacheKey>& LocalKeys = Keys[Iteration];
+ if (LocalKeys.empty())
+ {
+ Completed.fetch_add(1);
+ continue;
+ }
+ Pool.ScheduleWork(
+ [&] {
+ GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy);
+
+ if (!Result.Success)
+ {
+ ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration);
+ Completed.fetch_add(1);
+ return;
+ }
+
+ if (Result.Result.Results.size() != LocalKeys.size())
+ {
+ ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration);
+ Completed.fetch_add(1);
+ return;
+ }
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ const CacheKey& ExpectedKey = LocalKeys[Index++];
+ if (!Record)
+ {
+ continue;
+ }
+ if (Record->Key != ExpectedKey)
+ {
+ continue;
+ }
+ if (Record->Values.size() != 1)
+ {
+ continue;
+ }
+
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ if (!Value.Body)
+ {
+ continue;
+ }
+ }
+ }
+ Completed.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ while (Completed < ThreadCount * KeyMultiplier)
+ {
+ Sleep(10);
+ }
+}
+
+TEST_CASE("zcache.rpc.partialchunks")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ ZenConfig LocalCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance Server(TestEnv);
+ SpawnServer(Server, LocalCfg);
+
+ std::vector<CompressedBuffer> Attachments;
+
+ const auto BaseUri = fmt::format("http://localhost:{}/z$", Server.GetBasePort());
+
+ auto GenerateKey = [](std::string_view Bucket, size_t KeyIndex) -> CacheKey {
+ IoHash KeyHash;
+ ((size_t*)(KeyHash.Hash))[0] = KeyIndex;
+ return CacheKey::Create(Bucket, KeyHash);
+ };
+
+ auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request,
+ const CacheKey& CacheKey,
+ size_t AttachmentCount,
+ size_t AttachmentsSize,
+ CachePolicy RecordPolicy) -> std::vector<std::pair<Oid, CompressedBuffer>> {
+ std::vector<std::pair<Oid, CompressedBuffer>> AttachmentBuffers;
+ std::vector<cacherequests::PutCacheRecordRequestValue> Attachments;
+ for (size_t AttachmentIndex = 0; AttachmentIndex < AttachmentCount; AttachmentIndex++)
+ {
+ CompressedBuffer Value = CreateSemiRandomBlob(AttachmentsSize);
+ AttachmentBuffers.push_back(std::make_pair(Oid::NewOid(), Value));
+ Attachments.push_back({.Id = AttachmentBuffers.back().first, .Body = std::move(Value)});
+ }
+ Request.Requests.push_back({.Key = CacheKey, .Values = Attachments, .Policy = RecordPolicy});
+ return AttachmentBuffers;
+ };
+
+ auto PutCacheRecords = [&AppendCacheRecord, &GenerateKey](
+ std::string_view BaseUri,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ size_t KeyOffset,
+ size_t Num,
+ size_t AttachmentCount,
+ size_t AttachmentsSize =
+ 8192) -> std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> {
+ std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> Keys;
+
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
+ for (size_t Key = 1; Key <= Num; ++Key)
+ {
+ const CacheKey NewCacheKey = GenerateKey(Bucket, KeyOffset + Key);
+ std::vector<std::pair<Oid, CompressedBuffer>> Attachments =
+ AppendCacheRecord(Request, NewCacheKey, AttachmentCount, AttachmentsSize, CachePolicy::Default);
+ Keys.push_back(std::make_pair(NewCacheKey, std::move(Attachments)));
+ }
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ HttpClient Http{BaseUri};
+
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ Body.SetContentType(HttpContentType::kCbPackage);
+
+ HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+
+ if (Result.StatusCode != HttpResponseCode::OK)
+ {
+ ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", ToString(Result.StatusCode), Result.ErrorMessage(""));
+ Keys.clear();
+ }
+
+ return Keys;
+ };
+
+ std::string_view TestBucket = "partialcachevaluetests"sv;
+ std::string_view TestNamespace = "ue4.ddc"sv;
+ auto RecordsWithSmallAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 0, 3, 2, 4096u);
+ CHECK(RecordsWithSmallAttachments.size() == 3);
+ auto RecordsWithLargeAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 10, 1, 2, 8u * 1024u * 1024u);
+ CHECK(RecordsWithLargeAttachments.size() == 1);
+
+ struct PartialOptions
+ {
+ uint64_t Offset = 0ull;
+ uint64_t Size = ~0ull;
+ RpcAcceptOptions AcceptOptions = RpcAcceptOptions::kNone;
+ };
+
+ auto GetCacheChunk = [](std::string_view BaseUri,
+ std::string_view Namespace,
+ const CacheKey& Key,
+ const Oid& ValueId,
+ const PartialOptions& Options = {}) -> cacherequests::GetCacheChunksResult {
+ cacherequests::GetCacheChunksRequest Request = {
+ .AcceptMagic = kCbPkgMagic,
+ .AcceptOptions = (uint16_t)Options.AcceptOptions,
+ .Namespace = std::string(Namespace),
+ .Requests = {{.Key = Key, .ValueId = ValueId, .RawOffset = Options.Offset, .RawSize = Options.Size}}};
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ Body.SetContentType(HttpContentType::kCbPackage);
+
+ HttpClient Http{BaseUri};
+
+ HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+
+ CbPackage Response = ParsePackageMessage(Result.ResponsePayload);
+ bool Loaded = !Response.IsNull();
+ CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load.");
+ cacherequests::GetCacheChunksResult GetCacheChunksResult;
+ CHECK(GetCacheChunksResult.Parse(Response));
+ return GetCacheChunksResult;
+ };
+
+ auto GetAndVerifyChunk = [&GetCacheChunk](std::string_view BaseUri,
+ std::string_view Namespace,
+ const CacheKey& Key,
+ const Oid& ChunkId,
+ const CompressedBuffer& VerifyData,
+ const PartialOptions& Options = {}) {
+ cacherequests::GetCacheChunksResult Result = GetCacheChunk(BaseUri, Namespace, Key, ChunkId, Options);
+ CHECK(Result.Results.size() == 1);
+ bool CanGetPartial = ((uint16_t)Options.AcceptOptions & (uint16_t)RpcAcceptOptions::kAllowPartialCacheChunks);
+ if (!CanGetPartial)
+ {
+ CHECK(Result.Results[0].FragmentOffset == 0);
+ CHECK(Result.Results[0].Body.GetCompressedSize() == VerifyData.GetCompressedSize());
+ }
+ IoBuffer SourceDecompressed = VerifyData.Decompress(Options.Offset, Options.Size).AsIoBuffer();
+ IoBuffer ReceivedDecompressed =
+ Result.Results[0].Body.Decompress(Options.Offset - Result.Results[0].FragmentOffset, Options.Size).AsIoBuffer();
+ CHECK(SourceDecompressed.GetView().EqualBytes(ReceivedDecompressed.GetView()));
+ };
+
+ GetAndVerifyChunk(BaseUri,
+ TestNamespace,
+ RecordsWithSmallAttachments[0].first,
+ RecordsWithSmallAttachments[0].second[0].first,
+ RecordsWithSmallAttachments[0].second[0].second);
+ GetAndVerifyChunk(BaseUri,
+ TestNamespace,
+ RecordsWithSmallAttachments[0].first,
+ RecordsWithSmallAttachments[0].second[0].first,
+ RecordsWithSmallAttachments[0].second[0].second,
+ PartialOptions{.Offset = 378, .Size = 519, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithSmallAttachments[0].first,
+ RecordsWithSmallAttachments[0].second[0].first,
+ RecordsWithSmallAttachments[0].second[0].second,
+ PartialOptions{.Offset = 378,
+ .Size = 519,
+ .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks});
+ GetAndVerifyChunk(BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.AcceptOptions = RpcAcceptOptions::kAllowLocalReferences});
+ GetAndVerifyChunk(BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowPartialCacheChunks});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u,
+ .Size = 512u * 1024u,
+ .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks});
+ GetAndVerifyChunk(
+ BaseUri,
+ TestNamespace,
+ RecordsWithLargeAttachments[0].first,
+ RecordsWithLargeAttachments[0].second[0].first,
+ RecordsWithLargeAttachments[0].second[0].second,
+ PartialOptions{.Offset = 1024u * 1024u,
+ .Size = 512u * 1024u,
+ .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences |
+ RpcAcceptOptions::kAllowPartialCacheChunks});
+}
+
+IoBuffer
+FormatPackageBody(const CbPackage& Package)
+{
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ Body.SetContentType(HttpContentType::kCbPackage);
+ return Body;
+}
+
+TEST_CASE("zcache.rpc.allpolicies")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamServer(TestEnv);
+ SpawnServer(UpstreamServer, UpstreamCfg);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
+ ZenServerInstance LocalServer(TestEnv);
+ SpawnServer(LocalServer, LocalCfg);
+
+ const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalServer.GetBasePort());
+ HttpClient Http{BaseUri};
+
+ std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv;
+ std::string_view TestBucket = "allpoliciestest"sv;
+ std::string_view TestNamespace = "ue4.ddc"sv;
+
+ // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local)
+ // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type)
+ constexpr int NumKeys = 256;
+ constexpr int NumValues = 4;
+ Oid ValueIds[NumValues];
+ IoHash Hash;
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ ExtendableStringBuilder<16> ValueName;
+ ValueName << "ValueId_"sv << ValueIndex;
+ static_assert(sizeof(IoHash) >= sizeof(Oid));
+ ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash);
+ }
+
+ struct KeyData;
+ struct UserData
+ {
+ UserData& Set(KeyData* InKeyData, int InValueIndex)
+ {
+ Data = InKeyData;
+ ValueIndex = InValueIndex;
+ return *this;
+ }
+ KeyData* Data = nullptr;
+ int ValueIndex = 0;
+ };
+ struct KeyData
+ {
+ CompressedBuffer BufferValues[NumValues];
+ uint64_t IntValues[NumValues];
+ UserData ValueUserData[NumValues];
+ bool ReceivedChunk[NumValues];
+ CacheKey Key;
+ UserData KeyUserData;
+ uint32_t KeyIndex = 0;
+ bool GetRequestsData = true;
+ bool UseValueAPI = false;
+ bool UseValuePolicy = false;
+ bool ForceMiss = false;
+ bool UseLocal = true;
+ bool UseRemote = true;
+ bool ShouldBeHit = true;
+ bool ReceivedPut = false;
+ bool ReceivedGet = false;
+ bool ReceivedPutValue = false;
+ bool ReceivedGetValue = false;
+ };
+ struct CachePutRequest
+ {
+ CacheKey Key;
+ CbObject Record;
+ CacheRecordPolicy Policy;
+ KeyData* Values;
+ UserData* Data;
+ };
+ struct CachePutValueRequest
+ {
+ CacheKey Key;
+ CompressedBuffer Value;
+ CachePolicy Policy;
+ UserData* Data;
+ };
+ struct CacheGetRequest
+ {
+ CacheKey Key;
+ CacheRecordPolicy Policy;
+ UserData* Data;
+ };
+ struct CacheGetValueRequest
+ {
+ CacheKey Key;
+ CachePolicy Policy;
+ UserData* Data;
+ };
+ struct CacheGetChunkRequest
+ {
+ CacheKey Key;
+ Oid ValueId;
+ uint64_t RawOffset;
+ uint64_t RawSize;
+ IoHash RawHash;
+ CachePolicy Policy;
+ UserData* Data;
+ };
+
+ KeyData KeyDatas[NumKeys];
+ std::vector<CachePutRequest> PutRequests;
+ std::vector<CachePutValueRequest> PutValueRequests;
+ std::vector<CacheGetRequest> GetRequests;
+ std::vector<CacheGetValueRequest> GetValueRequests;
+ std::vector<CacheGetChunkRequest> ChunkRequests;
+
+ for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex)
+ {
+ IoHashStream KeyWriter;
+ KeyWriter.Append(TestVersion.data(), TestVersion.length() * sizeof(TestVersion.data()[0]));
+ KeyWriter.Append(&KeyIndex, sizeof(KeyIndex));
+ IoHash KeyHash = KeyWriter.GetHash();
+ KeyData& KeyData = KeyDatas[KeyIndex];
+
+ KeyData.Key = CacheKey::Create(TestBucket, KeyHash);
+ KeyData.KeyIndex = KeyIndex;
+ KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0;
+ KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0;
+ KeyData.UseValuePolicy = (KeyIndex & (1 << 3)) != 0;
+ KeyData.ForceMiss = (KeyIndex & (1 << 4)) == 0;
+ KeyData.UseLocal = (KeyIndex & (1 << 5)) == 0;
+ KeyData.UseRemote = (KeyIndex & (1 << 6)) == 0;
+ KeyData.ShouldBeHit = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote);
+ CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None;
+ SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None;
+ CachePolicy PutPolicy = SharedPolicy;
+ CachePolicy GetPolicy = SharedPolicy;
+ GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None;
+ CacheKey& Key = KeyData.Key;
+
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ KeyData.IntValues[ValueIndex] = static_cast<uint64_t>(KeyIndex) | (static_cast<uint64_t>(ValueIndex) << 32);
+ KeyData.BufferValues[ValueIndex] =
+ CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex])));
+ KeyData.ReceivedChunk[ValueIndex] = false;
+ }
+
+ UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex);
+ }
+ if (!KeyData.UseValueAPI)
+ {
+ CbObjectWriter Builder;
+ Builder.BeginObject("key"sv);
+ Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash;
+ Builder.EndObject();
+ Builder.BeginArray("Values"sv);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ Builder.BeginObject();
+ Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]);
+ Builder.AddBinaryAttachment("RawHash"sv, KeyData.BufferValues[ValueIndex].DecodeRawHash());
+ Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].DecodeRawSize());
+ Builder.EndObject();
+ }
+ Builder.EndArray();
+
+ CacheRecordPolicy PutRecordPolicy;
+ CacheRecordPolicy GetRecordPolicy;
+ if (!KeyData.UseValuePolicy)
+ {
+ PutRecordPolicy = CacheRecordPolicy(PutPolicy);
+ GetRecordPolicy = CacheRecordPolicy(GetPolicy);
+ }
+ else
+ {
+ // Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies
+ // it will use the wrong value for SkipData and fail our tests.
+ CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData);
+ CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy);
+ GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy);
+ }
+ PutRecordPolicy = PutBuilder.Build();
+ GetRecordPolicy = GetBuilder.Build();
+ }
+ if (!KeyData.ForceMiss)
+ {
+ PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData});
+ }
+ GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData});
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ UserData& ValueUserData = KeyData.ValueUserData[ValueIndex];
+ ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData});
+ }
+ }
+ else
+ {
+ if (!KeyData.ForceMiss)
+ {
+ PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData});
+ }
+ GetValueRequests.push_back({Key, GetPolicy, &KeyUserData});
+ ChunkRequests.push_back({Key, Oid::Zero, 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData});
+ }
+ }
+
+ // PutCacheRecords
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = BatchDefaultPolicy,
+ .Namespace = std::string(TestNamespace)};
+ Request.Requests.reserve(PutRequests.size());
+ for (CachePutRequest& PutRequest : PutRequests)
+ {
+ cacherequests::PutCacheRecordRequest& RecordRequest = Request.Requests.emplace_back();
+ RecordRequest.Key = PutRequest.Key;
+ RecordRequest.Policy = PutRequest.Policy;
+ RecordRequest.Values.reserve(NumValues);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ RecordRequest.Values.push_back({.Id = ValueIds[ValueIndex], .Body = PutRequest.Values->BufferValues[ValueIndex]});
+ }
+ PutRequest.Data->Data->ReceivedPut = true;
+ }
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+ IoBuffer Body = FormatPackageBody(Package);
+ HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "PutCacheRecords unexpectedly failed.");
+ }
+
+ // PutCacheValues
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+
+ cacherequests::PutCacheValuesRequest Request = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = BatchDefaultPolicy,
+ .Namespace = std::string(TestNamespace)};
+ Request.Requests.reserve(PutValueRequests.size());
+ for (CachePutValueRequest& PutRequest : PutValueRequests)
+ {
+ Request.Requests.push_back({.Key = PutRequest.Key, .Body = PutRequest.Value, .Policy = PutRequest.Policy});
+ PutRequest.Data->Data->ReceivedPutValue = true;
+ }
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+
+ IoBuffer Body = FormatPackageBody(Package);
+ HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "PutCacheValues unexpectedly failed.");
+ }
+
+ for (KeyData& KeyData : KeyDatas)
+ {
+ if (!KeyData.ForceMiss)
+ {
+ if (!KeyData.UseValueAPI)
+ {
+ CHECK_MESSAGE(KeyData.ReceivedPut, WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put.").c_str());
+ }
+ else
+ {
+ CHECK_MESSAGE(KeyData.ReceivedPutValue,
+ WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put to ValueAPI.").c_str());
+ }
+ }
+ }
+
+ // GetCacheRecords
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = BatchDefaultPolicy,
+ .Namespace = std::string(TestNamespace)};
+ Request.Requests.reserve(GetRequests.size());
+ for (CacheGetRequest& GetRequest : GetRequests)
+ {
+ Request.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy});
+ }
+
+ CbPackage Package;
+ CHECK(Request.Format(Package));
+ IoBuffer Body = FormatPackageBody(Package);
+ HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheRecords unexpectedly failed.");
+ CbPackage Response = ParsePackageMessage(Result.ResponsePayload);
+ bool Loaded = !Response.IsNull();
+ CHECK_MESSAGE(Loaded, "GetCacheRecords response failed to load.");
+ cacherequests::GetCacheRecordsResult RequestResult;
+ CHECK(RequestResult.Parse(Response));
+ CHECK_MESSAGE(RequestResult.Results.size() == GetRequests.size(), "GetCacheRecords response count did not match request count.");
+ for (int Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& RecordResult : RequestResult.Results)
+ {
+ bool Succeeded = RecordResult.has_value();
+ CacheGetRequest& GetRequest = GetRequests[Index++];
+ KeyData* KeyData = GetRequest.Data->Data;
+ KeyData->ReceivedGet = true;
+ WriteToString<32> Name("Get(", KeyData->KeyIndex, ")");
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str());
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, " unexpectedly succeeded.").c_str());
+ }
+ if (!KeyData->ForceMiss && Succeeded)
+ {
+ CHECK_MESSAGE(RecordResult->Values.size() == NumValues,
+ WriteToString<32>(Name, " number of values did not match.").c_str());
+ for (const cacherequests::GetCacheRecordResultValue& Value : RecordResult->Values)
+ {
+ int ExpectedValueIndex = 0;
+ for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex)
+ {
+ if (ValueIds[ExpectedValueIndex] == Value.Id)
+ {
+ break;
+ }
+ }
+ CHECK_MESSAGE(ExpectedValueIndex < NumValues, WriteToString<32>(Name, " could not find matching ValueId.").c_str());
+
+ WriteToString<32> ValueName("Get(", KeyData->KeyIndex, ",", ExpectedValueIndex, ")");
+
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex];
+ CHECK_MESSAGE(Value.RawHash == ExpectedValue.DecodeRawHash(),
+ WriteToString<32>(ValueName, " RawHash did not match.").c_str());
+ CHECK_MESSAGE(Value.RawSize == ExpectedValue.DecodeRawSize(),
+ WriteToString<32>(ValueName, " RawSize did not match.").c_str());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = Value.Body.Decompress();
+ CHECK_MESSAGE(Buffer.GetSize() == Value.RawSize,
+ WriteToString<32>(ValueName, " BufferSize did not match RawSize.").c_str());
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex];
+ CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(ValueName, " had unexpected data.").c_str());
+ }
+ }
+ }
+ }
+ }
+
+ // GetCacheValues
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+
+ cacherequests::GetCacheValuesRequest GetCacheValuesRequest = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = BatchDefaultPolicy,
+ .Namespace = std::string(TestNamespace)};
+ GetCacheValuesRequest.Requests.reserve(GetValueRequests.size());
+ for (CacheGetValueRequest& GetRequest : GetValueRequests)
+ {
+ GetCacheValuesRequest.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy});
+ }
+
+ CbPackage Package;
+ CHECK(GetCacheValuesRequest.Format(Package));
+
+ IoBuffer Body = FormatPackageBody(Package);
+ HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheValues unexpectedly failed.");
+ IoBuffer MessageBuffer(Result.ResponsePayload);
+ CbPackage Response = ParsePackageMessage(MessageBuffer);
+ bool Loaded = !Response.IsNull();
+ CHECK_MESSAGE(Loaded, "GetCacheValues response failed to load.");
+ cacherequests::GetCacheValuesResult GetCacheValuesResult;
+ CHECK(GetCacheValuesResult.Parse(Response));
+ for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheValuesResult.Results)
+ {
+ bool Succeeded = ValueResult.RawHash != IoHash::Zero;
+ CacheGetValueRequest& Request = GetValueRequests[Index++];
+ KeyData* KeyData = Request.Data->Data;
+ KeyData->ReceivedGetValue = true;
+ WriteToString<32> Name("GetValue("sv, KeyData->KeyIndex, ")"sv);
+
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str());
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, "unexpectedly succeeded.").c_str());
+ }
+ if (!KeyData->ForceMiss && Succeeded)
+ {
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[0];
+ CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(),
+ WriteToString<32>(Name, " RawHash did not match.").c_str());
+ CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(),
+ WriteToString<32>(Name, " RawSize did not match.").c_str());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = ValueResult.Body.Decompress();
+ CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize,
+ WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str());
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[0];
+ CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str());
+ }
+ }
+ }
+ }
+
+ // GetCacheChunks
+ {
+ std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) {
+ return A.Key.Hash < B.Key.Hash;
+ });
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ cacherequests::GetCacheChunksRequest GetCacheChunksRequest = {.AcceptMagic = kCbPkgMagic,
+ .DefaultPolicy = BatchDefaultPolicy,
+ .Namespace = std::string(TestNamespace)};
+ GetCacheChunksRequest.Requests.reserve(ChunkRequests.size());
+ for (CacheGetChunkRequest& ChunkRequest : ChunkRequests)
+ {
+ GetCacheChunksRequest.Requests.push_back({.Key = ChunkRequest.Key,
+ .ValueId = ChunkRequest.ValueId,
+ .ChunkId = IoHash(),
+ .RawOffset = ChunkRequest.RawOffset,
+ .RawSize = ChunkRequest.RawSize,
+ .Policy = ChunkRequest.Policy});
+ }
+ CbPackage Package;
+ CHECK(GetCacheChunksRequest.Format(Package));
+
+ IoBuffer Body = FormatPackageBody(Package);
+ HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK_MESSAGE(Result.StatusCode == HttpResponseCode::OK, "GetCacheChunks unexpectedly failed.");
+ CbPackage Response = ParsePackageMessage(Result.ResponsePayload);
+ bool Loaded = !Response.IsNull();
+ CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load.");
+ cacherequests::GetCacheChunksResult GetCacheChunksResult;
+ CHECK(GetCacheChunksResult.Parse(Response));
+ CHECK_MESSAGE(GetCacheChunksResult.Results.size() == ChunkRequests.size(),
+ "GetCacheChunks response count did not match request count.");
+
+ for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheChunksResult.Results)
+ {
+ bool Succeeded = ValueResult.RawHash != IoHash::Zero;
+
+ CacheGetChunkRequest& Request = ChunkRequests[Index++];
+ KeyData* KeyData = Request.Data->Data;
+ int ValueIndex = Request.Data->ValueIndex >= 0 ? Request.Data->ValueIndex : 0;
+ KeyData->ReceivedChunk[ValueIndex] = true;
+ WriteToString<32> Name("GetChunks("sv, KeyData->KeyIndex, ","sv, ValueIndex, ")"sv);
+
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK_MESSAGE(Succeeded, WriteToString<256>(Name, " unexpectedly failed."sv).c_str());
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK_MESSAGE(!Succeeded, WriteToString<256>(Name, " unexpectedly succeeded."sv).c_str());
+ }
+ if (KeyData->ShouldBeHit && Succeeded)
+ {
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex];
+ CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(),
+ WriteToString<32>(Name, " had unexpected RawHash.").c_str());
+ CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(),
+ WriteToString<32>(Name, " had unexpected RawSize.").c_str());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = ValueResult.Body.Decompress();
+ CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize,
+ WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str());
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex];
+ CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str());
+ }
+ }
+ }
+ }
+
+ for (KeyData& KeyData : KeyDatas)
+ {
+ if (!KeyData.UseValueAPI)
+ {
+ CHECK_MESSAGE(KeyData.ReceivedGet, WriteToString<32>("Get(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str());
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ CHECK_MESSAGE(
+ KeyData.ReceivedChunk[ValueIndex],
+ WriteToString<32>("GetChunks(", KeyData.KeyIndex, ",", ValueIndex, ") was unexpectedly not received.").c_str());
+ }
+ }
+ else
+ {
+ CHECK_MESSAGE(KeyData.ReceivedGetValue,
+ WriteToString<32>("GetValue(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str());
+ CHECK_MESSAGE(KeyData.ReceivedChunk[0],
+ WriteToString<32>("GetChunks(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str());
+ }
+ }
+}
+
+} // namespace zen::tests
+
+#endif