aboutsummaryrefslogtreecommitdiff
path: root/zenserver-test/zenserver-test.cpp
diff options
context:
space:
mode:
authormattpetersepic <[email protected]>2022-02-09 09:03:37 -0700
committerGitHub <[email protected]>2022-02-09 09:03:37 -0700
commitadc7927757c1f917a00032268ba1fa1e26562d4f (patch)
tree2747a0ceec1b92913d06db2740774f2d0e016b6a /zenserver-test/zenserver-test.cpp
parentRemove the backwards compatibility for the Zen CachePolicy changes no… (#49) (diff)
downloadzen-adc7927757c1f917a00032268ba1fa1e26562d4f.tar.xz
zen-adc7927757c1f917a00032268ba1fa1e26562d4f.zip
Simplify HandleRpcGetCacheChunks (#53)
Refactor HandleRpcGetCacheChunks to reduce complexity. Port CacheStore tests from Unreal.
Diffstat (limited to 'zenserver-test/zenserver-test.cpp')
-rw-r--r--zenserver-test/zenserver-test.cpp623
1 files changed, 623 insertions, 0 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index b46106287..a9e096dac 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -1712,6 +1712,629 @@ TEST_CASE("zcache.rpc")
}
}
+TEST_CASE("zcache.rpc.allpolicies")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamServer(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalServer(TestEnv);
+ const uint16_t LocalPortNumber = 13337;
+ const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
+
+ SpawnServer(UpstreamServer, UpstreamCfg);
+ SpawnServer(LocalServer, LocalCfg);
+
+ std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv;
+ std::string_view TestBucket = "allpoliciestest"sv;
+
+ // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local)
+ // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type)
+ constexpr int NumKeys = 256;
+ constexpr int NumValues = 4;
+ Oid ValueIds[NumValues];
+ IoHash Hash;
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ ExtendableStringBuilder<16> ValueName;
+ ValueName << "ValueId_"sv << ValueIndex;
+ static_assert(sizeof(IoHash) >= sizeof(Oid));
+ ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash);
+ }
+
+ struct KeyData;
+ struct UserData
+ {
+ UserData& Set(KeyData* InKeyData, int InValueIndex)
+ {
+ KeyData = InKeyData;
+ ValueIndex = InValueIndex;
+ return *this;
+ }
+ KeyData* KeyData = nullptr;
+ int ValueIndex = 0;
+ };
+ struct KeyData
+ {
+ CompressedBuffer BufferValues[NumValues];
+ uint64_t IntValues[NumValues];
+ UserData ValueUserData[NumValues];
+ bool ReceivedChunk[NumValues];
+ CacheKey Key;
+ UserData KeyUserData;
+ uint32_t KeyIndex = 0;
+ bool GetRequestsData = true;
+ bool UseValueAPI = false;
+ bool UseValuePolicy = false;
+ bool ForceMiss = false;
+ bool UseLocal = true;
+ bool UseRemote = true;
+ bool ShouldBeHit = true;
+ bool ReceivedPut = false;
+ bool ReceivedGet = false;
+ bool ReceivedPutValue = false;
+ bool ReceivedGetValue = false;
+ };
+ struct CachePutRequest
+ {
+ CacheKey Key;
+ CbObject Record;
+ CacheRecordPolicy Policy;
+ KeyData* Values;
+ UserData* UserData;
+ };
+ struct CachePutValueRequest
+ {
+ CacheKey Key;
+ CompressedBuffer Value;
+ CachePolicy Policy;
+ UserData* UserData;
+ };
+ struct CacheGetRequest
+ {
+ CacheKey Key;
+ CacheRecordPolicy Policy;
+ UserData* UserData;
+ };
+ struct CacheGetValueRequest
+ {
+ CacheKey Key;
+ CachePolicy Policy;
+ UserData* UserData;
+ };
+ struct CacheGetChunkRequest
+ {
+ CacheKey Key;
+ Oid ValueId;
+ uint64_t RawOffset;
+ uint64_t RawSize;
+ IoHash RawHash;
+ CachePolicy Policy;
+ UserData* UserData;
+ };
+
+ KeyData KeyDatas[NumKeys];
+ std::vector<CachePutRequest> PutRequests;
+ std::vector<CachePutValueRequest> PutValueRequests;
+ std::vector<CacheGetRequest> GetRequests;
+ std::vector<CacheGetValueRequest> GetValueRequests;
+ std::vector<CacheGetChunkRequest> ChunkRequests;
+
+ for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex)
+ {
+ IoHashStream KeyWriter;
+ KeyWriter.Append(TestVersion.data(), TestVersion.length() * sizeof(TestVersion.data()[0]));
+ KeyWriter.Append(&KeyIndex, sizeof(KeyIndex));
+ IoHash KeyHash = KeyWriter.GetHash();
+ KeyData& KeyData = KeyDatas[KeyIndex];
+
+ KeyData.Key = CacheKey::Create(TestBucket, KeyHash);
+ KeyData.KeyIndex = KeyIndex;
+ KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0;
+ KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0;
+ KeyData.UseValuePolicy = (KeyIndex & (1 << 3)) != 0;
+ KeyData.ForceMiss = (KeyIndex & (1 << 4)) == 0;
+ KeyData.UseLocal = (KeyIndex & (1 << 5)) == 0;
+ KeyData.UseRemote = (KeyIndex & (1 << 6)) == 0;
+ KeyData.ShouldBeHit = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote);
+ CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None;
+ SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None;
+ CachePolicy PutPolicy = SharedPolicy;
+ CachePolicy GetPolicy = SharedPolicy;
+ GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None;
+ CacheKey& Key = KeyData.Key;
+
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ KeyData.IntValues[ValueIndex] = static_cast<uint64_t>(KeyIndex) | (static_cast<uint64_t>(ValueIndex) << 32);
+ KeyData.BufferValues[ValueIndex] =
+ CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex])));
+ KeyData.ReceivedChunk[ValueIndex] = false;
+ }
+
+ UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex);
+ }
+ if (!KeyData.UseValueAPI)
+ {
+ CbObjectWriter Builder;
+ Builder.BeginObject("key"sv);
+ Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash;
+ Builder.EndObject();
+ Builder.BeginArray("Values"sv);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ Builder.BeginObject();
+ Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]);
+ Builder.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(KeyData.BufferValues[ValueIndex].GetRawHash()));
+ Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].GetRawSize());
+ Builder.EndObject();
+ }
+ Builder.EndArray();
+
+ CacheRecordPolicy PutRecordPolicy;
+ CacheRecordPolicy GetRecordPolicy;
+ if (!KeyData.UseValuePolicy)
+ {
+ PutRecordPolicy = CacheRecordPolicy(PutPolicy);
+ GetRecordPolicy = CacheRecordPolicy(GetPolicy);
+ }
+ else
+ {
+ // Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies
+ // it will use the wrong value for SkipData and fail our tests.
+ CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData);
+ CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy);
+ GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy);
+ }
+ PutRecordPolicy = PutBuilder.Build();
+ GetRecordPolicy = GetBuilder.Build();
+ }
+ if (!KeyData.ForceMiss)
+ {
+ PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData});
+ }
+ GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData});
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ UserData& ValueUserData = KeyData.ValueUserData[ValueIndex];
+ ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData});
+ }
+ }
+ else
+ {
+ if (!KeyData.ForceMiss)
+ {
+ PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData});
+ }
+ GetValueRequests.push_back({Key, GetPolicy, &KeyUserData});
+ ChunkRequests.push_back({Key, Oid(), 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData});
+ }
+ }
+
+ // PutCacheRecords
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ Writer << "Method"sv
+ << "PutCacheRecords"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ for (CachePutRequest& Request : PutRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Record"sv);
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ Writer.BeginArray("Values"sv);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ Writer.BeginObject();
+ {
+ CompressedBuffer Buffer = Request.Values->BufferValues[ValueIndex];
+ Writer.AddObjectId("Id"sv, ValueIds[ValueIndex]);
+ Writer.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Buffer.GetRawHash()));
+ Package.AddAttachment(CbAttachment(Buffer));
+ Writer.AddInteger("RawSize"sv, Buffer.GetRawSize());
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Writer.SetName("Policy"sv);
+ Request.Policy.Save(Writer);
+ }
+ Writer.EndObject();
+ Request.UserData->KeyData->ReceivedPut = true;
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ }
+
+ // PutCacheValues
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ Writer << "Method"sv
+ << "PutCacheValues"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ for (CachePutValueRequest& Request : PutValueRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ CompressedBuffer Buffer = Request.Value;
+ Writer.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Buffer.GetRawHash()));
+ Package.AddAttachment(CbAttachment(Buffer));
+ Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy));
+ }
+ Writer.EndObject();
+ Request.UserData->KeyData->ReceivedPutValue = true;
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ }
+
+ for (KeyData& KeyData : KeyDatas)
+ {
+ if (!KeyData.ForceMiss)
+ {
+ if (!KeyData.UseValueAPI)
+ {
+ CHECK(KeyData.ReceivedPut);
+ }
+ else
+ {
+ CHECK(KeyData.ReceivedPutValue);
+ }
+ }
+ }
+
+ // GetCacheRecords
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ Writer << "Method"sv
+ << "GetCacheRecords"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ for (CacheGetRequest& Request : GetRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ Writer.SetName("Policy"sv);
+ Request.Policy.Save(Writer);
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ CbPackage Response;
+ bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(Loaded);
+ CbObjectView ResponseObject = Response.GetObject();
+ CbArrayView Responses = ResponseObject["Result"sv].AsArrayView();
+ CHECK(Responses.Num() == GetRequests.size());
+ int Index = 0;
+ for (CbFieldView ResponseField : Responses)
+ {
+ CbObjectView RecordView = ResponseField.AsObjectView();
+ bool Succeeded = !ResponseField.HasError();
+
+ CacheGetRequest& Request = GetRequests[Index++];
+ KeyData* KeyData = Request.UserData->KeyData;
+ KeyData->ReceivedGet = true;
+
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK(Succeeded);
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK(!Succeeded);
+ }
+ if (!KeyData->ForceMiss && Succeeded)
+ {
+ CbArrayView ValuesArray = RecordView["Values"sv].AsArrayView();
+ CHECK(ValuesArray.Num() == NumValues);
+
+ for (CbFieldView ValueField : ValuesArray)
+ {
+ CbObjectView ValueObject = ValueField.AsObjectView();
+ Oid ActualValueId = ValueObject["Id"sv].AsObjectId();
+ int ExpectedValueIndex = 0;
+ for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex)
+ {
+ if (ValueIds[ExpectedValueIndex] == ActualValueId)
+ {
+ break;
+ }
+ }
+ CHECK(ExpectedValueIndex < NumValues);
+ IoHash ActualRawHash = ValueObject["RawHash"sv].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash);
+ CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer();
+ uint64_t ActualRawSize = UINT64_MAX;
+ if (ActualBuffer)
+ {
+ ActualRawSize = ActualBuffer.GetRawSize();
+ }
+ else
+ {
+ ActualRawSize = ValueObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ }
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex];
+ CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash()));
+ CHECK(ActualRawSize == ExpectedValue.GetRawSize());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = ActualBuffer.Decompress();
+ CHECK(Buffer.GetSize() == ActualRawSize);
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex];
+ CHECK(ActualIntValue == ExpectedIntValue);
+ }
+ }
+ }
+ }
+ }
+
+ // GetCacheValues
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ Writer << "Method"sv
+ << "GetCacheValues"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ for (CacheGetValueRequest& Request : GetValueRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy));
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ CbPackage Response;
+ bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(Loaded);
+ CbObjectView ResponseObject = Response.GetObject();
+ CbArrayView Responses = ResponseObject["Result"sv].AsArrayView();
+ CHECK(Responses.Num() == GetValueRequests.size());
+ int Index = 0;
+ for (CbFieldView RequestResultView : Responses)
+ {
+ CbObjectView RequestResultObject = RequestResultView.AsObjectView();
+ CbFieldView RawHashField = RequestResultObject["RawHash"sv];
+ IoHash ActualRawHash = RawHashField.AsHash();
+ bool Succeeded = !RawHashField.HasError();
+
+ CacheGetValueRequest& Request = GetValueRequests[Index++];
+ KeyData* KeyData = Request.UserData->KeyData;
+ KeyData->ReceivedGetValue = true;
+
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK(Succeeded);
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK(!Succeeded);
+ }
+ if (!KeyData->ForceMiss && Succeeded)
+ {
+ const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash);
+ CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer();
+ uint64_t ActualRawSize = UINT64_MAX;
+ if (ActualBuffer)
+ {
+ ActualRawSize = ActualBuffer.GetRawSize();
+ }
+ else
+ {
+ ActualRawSize = RequestResultObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ }
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[0];
+ CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash()));
+ CHECK(ActualRawSize == ExpectedValue.GetRawSize());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = ActualBuffer.Decompress();
+ CHECK(Buffer.GetSize() == ActualRawSize);
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[0];
+ CHECK(ActualIntValue == ExpectedIntValue);
+ }
+ }
+ }
+ }
+
+ // GetCacheChunks
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) {
+ return A.Key.Hash < B.Key.Hash;
+ });
+ Writer << "Method"sv
+ << "GetCacheChunks"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("ChunkRequests"sv);
+ for (CacheGetChunkRequest& Request : ChunkRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ Writer.AddObjectId("ValueId"sv, ValueIds[Request.UserData->ValueIndex]);
+ Writer.AddInteger("RawOffset"sv, Request.RawOffset);
+ Writer.AddInteger("RawSize"sv, Request.RawSize);
+ Writer.AddHash("ChunkId"sv, IoHash());
+ Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy));
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ CbPackage Response;
+ bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(Loaded);
+ CbObjectView ResponseObject = Response.GetObject();
+ CbArrayView Responses = ResponseObject["Result"sv].AsArrayView();
+ CHECK(Responses.Num() == ChunkRequests.size());
+ int Index = 0;
+ for (CbFieldView RequestResultView : Responses)
+ {
+ CbObjectView RequestResultObject = RequestResultView.AsObjectView();
+ CbFieldView RawHashField = RequestResultObject["RawHash"sv];
+ IoHash ActualRawHash = RawHashField.AsHash();
+ bool Succeeded = !RawHashField.HasError();
+
+ CacheGetChunkRequest& Request = ChunkRequests[Index++];
+ KeyData* KeyData = Request.UserData->KeyData;
+ int ValueIndex = Request.UserData->ValueIndex >= 0 ? Request.UserData->ValueIndex : 0;
+ KeyData->ReceivedChunk[ValueIndex] = true;
+
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK(Succeeded);
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK(!Succeeded);
+ }
+ if (KeyData->ShouldBeHit && Succeeded)
+ {
+ const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash);
+ CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer();
+ uint64_t ActualRawSize = UINT64_MAX;
+ if (ActualBuffer)
+ {
+ ActualRawSize = ActualBuffer.GetRawSize();
+ }
+ else
+ {
+ ActualRawSize = RequestResultObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ }
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex];
+ CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash()));
+ CHECK(ActualRawSize == ExpectedValue.GetRawSize());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = ActualBuffer.Decompress();
+ CHECK(Buffer.GetSize() == ActualRawSize);
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex];
+ CHECK(ActualIntValue == ExpectedIntValue);
+ }
+ }
+ }
+ }
+
+ for (KeyData& KeyData : KeyDatas)
+ {
+ if (!KeyData.UseValueAPI)
+ {
+ CHECK(KeyData.ReceivedGet);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ CHECK(KeyData.ReceivedChunk[ValueIndex]);
+ }
+ }
+ else
+ {
+ CHECK(KeyData.ReceivedGetValue);
+ CHECK(KeyData.ReceivedChunk[0]);
+ }
+ }
+}
+
# if ZEN_USE_EXEC
struct RemoteExecutionRequest