aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--zenserver-test/zenserver-test.cpp623
-rw-r--r--zenserver/cache/structuredcache.cpp552
-rw-r--r--zenserver/cache/structuredcache.h37
-rw-r--r--zenutil/include/zenutil/cache/cachepolicy.h1
4 files changed, 942 insertions, 271 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index b46106287..a9e096dac 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -1712,6 +1712,629 @@ TEST_CASE("zcache.rpc")
}
}
+TEST_CASE("zcache.rpc.allpolicies")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamServer(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalServer(TestEnv);
+ const uint16_t LocalPortNumber = 13337;
+ const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
+
+ SpawnServer(UpstreamServer, UpstreamCfg);
+ SpawnServer(LocalServer, LocalCfg);
+
+ std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv;
+ std::string_view TestBucket = "allpoliciestest"sv;
+
+ // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local)
+ // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type)
+ constexpr int NumKeys = 256;
+ constexpr int NumValues = 4;
+ Oid ValueIds[NumValues];
+ IoHash Hash;
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ ExtendableStringBuilder<16> ValueName;
+ ValueName << "ValueId_"sv << ValueIndex;
+ static_assert(sizeof(IoHash) >= sizeof(Oid));
+ ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash);
+ }
+
+ struct KeyData;
+ struct UserData
+ {
+ UserData& Set(KeyData* InKeyData, int InValueIndex)
+ {
+ KeyData = InKeyData;
+ ValueIndex = InValueIndex;
+ return *this;
+ }
+ KeyData* KeyData = nullptr;
+ int ValueIndex = 0;
+ };
+ struct KeyData
+ {
+ CompressedBuffer BufferValues[NumValues];
+ uint64_t IntValues[NumValues];
+ UserData ValueUserData[NumValues];
+ bool ReceivedChunk[NumValues];
+ CacheKey Key;
+ UserData KeyUserData;
+ uint32_t KeyIndex = 0;
+ bool GetRequestsData = true;
+ bool UseValueAPI = false;
+ bool UseValuePolicy = false;
+ bool ForceMiss = false;
+ bool UseLocal = true;
+ bool UseRemote = true;
+ bool ShouldBeHit = true;
+ bool ReceivedPut = false;
+ bool ReceivedGet = false;
+ bool ReceivedPutValue = false;
+ bool ReceivedGetValue = false;
+ };
+ struct CachePutRequest
+ {
+ CacheKey Key;
+ CbObject Record;
+ CacheRecordPolicy Policy;
+ KeyData* Values;
+ UserData* UserData;
+ };
+ struct CachePutValueRequest
+ {
+ CacheKey Key;
+ CompressedBuffer Value;
+ CachePolicy Policy;
+ UserData* UserData;
+ };
+ struct CacheGetRequest
+ {
+ CacheKey Key;
+ CacheRecordPolicy Policy;
+ UserData* UserData;
+ };
+ struct CacheGetValueRequest
+ {
+ CacheKey Key;
+ CachePolicy Policy;
+ UserData* UserData;
+ };
+ struct CacheGetChunkRequest
+ {
+ CacheKey Key;
+ Oid ValueId;
+ uint64_t RawOffset;
+ uint64_t RawSize;
+ IoHash RawHash;
+ CachePolicy Policy;
+ UserData* UserData;
+ };
+
+ KeyData KeyDatas[NumKeys];
+ std::vector<CachePutRequest> PutRequests;
+ std::vector<CachePutValueRequest> PutValueRequests;
+ std::vector<CacheGetRequest> GetRequests;
+ std::vector<CacheGetValueRequest> GetValueRequests;
+ std::vector<CacheGetChunkRequest> ChunkRequests;
+
+ for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex)
+ {
+ IoHashStream KeyWriter;
+ KeyWriter.Append(TestVersion.data(), TestVersion.length() * sizeof(TestVersion.data()[0]));
+ KeyWriter.Append(&KeyIndex, sizeof(KeyIndex));
+ IoHash KeyHash = KeyWriter.GetHash();
+ KeyData& KeyData = KeyDatas[KeyIndex];
+
+ KeyData.Key = CacheKey::Create(TestBucket, KeyHash);
+ KeyData.KeyIndex = KeyIndex;
+ KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0;
+ KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0;
+ KeyData.UseValuePolicy = (KeyIndex & (1 << 3)) != 0;
+ KeyData.ForceMiss = (KeyIndex & (1 << 4)) == 0;
+ KeyData.UseLocal = (KeyIndex & (1 << 5)) == 0;
+ KeyData.UseRemote = (KeyIndex & (1 << 6)) == 0;
+ KeyData.ShouldBeHit = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote);
+ CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None;
+ SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None;
+ CachePolicy PutPolicy = SharedPolicy;
+ CachePolicy GetPolicy = SharedPolicy;
+ GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None;
+ CacheKey& Key = KeyData.Key;
+
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ KeyData.IntValues[ValueIndex] = static_cast<uint64_t>(KeyIndex) | (static_cast<uint64_t>(ValueIndex) << 32);
+ KeyData.BufferValues[ValueIndex] =
+ CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex])));
+ KeyData.ReceivedChunk[ValueIndex] = false;
+ }
+
+ UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex);
+ }
+ if (!KeyData.UseValueAPI)
+ {
+ CbObjectWriter Builder;
+ Builder.BeginObject("key"sv);
+ Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash;
+ Builder.EndObject();
+ Builder.BeginArray("Values"sv);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ Builder.BeginObject();
+ Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]);
+ Builder.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(KeyData.BufferValues[ValueIndex].GetRawHash()));
+ Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].GetRawSize());
+ Builder.EndObject();
+ }
+ Builder.EndArray();
+
+ CacheRecordPolicy PutRecordPolicy;
+ CacheRecordPolicy GetRecordPolicy;
+ if (!KeyData.UseValuePolicy)
+ {
+ PutRecordPolicy = CacheRecordPolicy(PutPolicy);
+ GetRecordPolicy = CacheRecordPolicy(GetPolicy);
+ }
+ else
+ {
+ // Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies
+ // it will use the wrong value for SkipData and fail our tests.
+ CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData);
+ CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy);
+ GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy);
+ }
+ PutRecordPolicy = PutBuilder.Build();
+ GetRecordPolicy = GetBuilder.Build();
+ }
+ if (!KeyData.ForceMiss)
+ {
+ PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData});
+ }
+ GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData});
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ UserData& ValueUserData = KeyData.ValueUserData[ValueIndex];
+ ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData});
+ }
+ }
+ else
+ {
+ if (!KeyData.ForceMiss)
+ {
+ PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData});
+ }
+ GetValueRequests.push_back({Key, GetPolicy, &KeyUserData});
+ ChunkRequests.push_back({Key, Oid(), 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData});
+ }
+ }
+
+ // PutCacheRecords
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ Writer << "Method"sv
+ << "PutCacheRecords"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ for (CachePutRequest& Request : PutRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Record"sv);
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ Writer.BeginArray("Values"sv);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ Writer.BeginObject();
+ {
+ CompressedBuffer Buffer = Request.Values->BufferValues[ValueIndex];
+ Writer.AddObjectId("Id"sv, ValueIds[ValueIndex]);
+ Writer.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Buffer.GetRawHash()));
+ Package.AddAttachment(CbAttachment(Buffer));
+ Writer.AddInteger("RawSize"sv, Buffer.GetRawSize());
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Writer.SetName("Policy"sv);
+ Request.Policy.Save(Writer);
+ }
+ Writer.EndObject();
+ Request.UserData->KeyData->ReceivedPut = true;
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ }
+
+ // PutCacheValues
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ Writer << "Method"sv
+ << "PutCacheValues"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ for (CachePutValueRequest& Request : PutValueRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ CompressedBuffer Buffer = Request.Value;
+ Writer.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Buffer.GetRawHash()));
+ Package.AddAttachment(CbAttachment(Buffer));
+ Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy));
+ }
+ Writer.EndObject();
+ Request.UserData->KeyData->ReceivedPutValue = true;
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ }
+
+ for (KeyData& KeyData : KeyDatas)
+ {
+ if (!KeyData.ForceMiss)
+ {
+ if (!KeyData.UseValueAPI)
+ {
+ CHECK(KeyData.ReceivedPut);
+ }
+ else
+ {
+ CHECK(KeyData.ReceivedPutValue);
+ }
+ }
+ }
+
+ // GetCacheRecords
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ Writer << "Method"sv
+ << "GetCacheRecords"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ for (CacheGetRequest& Request : GetRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ Writer.SetName("Policy"sv);
+ Request.Policy.Save(Writer);
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ CbPackage Response;
+ bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(Loaded);
+ CbObjectView ResponseObject = Response.GetObject();
+ CbArrayView Responses = ResponseObject["Result"sv].AsArrayView();
+ CHECK(Responses.Num() == GetRequests.size());
+ int Index = 0;
+ for (CbFieldView ResponseField : Responses)
+ {
+ CbObjectView RecordView = ResponseField.AsObjectView();
+ bool Succeeded = !ResponseField.HasError();
+
+ CacheGetRequest& Request = GetRequests[Index++];
+ KeyData* KeyData = Request.UserData->KeyData;
+ KeyData->ReceivedGet = true;
+
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK(Succeeded);
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK(!Succeeded);
+ }
+ if (!KeyData->ForceMiss && Succeeded)
+ {
+ CbArrayView ValuesArray = RecordView["Values"sv].AsArrayView();
+ CHECK(ValuesArray.Num() == NumValues);
+
+ for (CbFieldView ValueField : ValuesArray)
+ {
+ CbObjectView ValueObject = ValueField.AsObjectView();
+ Oid ActualValueId = ValueObject["Id"sv].AsObjectId();
+ int ExpectedValueIndex = 0;
+ for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex)
+ {
+ if (ValueIds[ExpectedValueIndex] == ActualValueId)
+ {
+ break;
+ }
+ }
+ CHECK(ExpectedValueIndex < NumValues);
+ IoHash ActualRawHash = ValueObject["RawHash"sv].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash);
+ CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer();
+ uint64_t ActualRawSize = UINT64_MAX;
+ if (ActualBuffer)
+ {
+ ActualRawSize = ActualBuffer.GetRawSize();
+ }
+ else
+ {
+ ActualRawSize = ValueObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ }
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex];
+ CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash()));
+ CHECK(ActualRawSize == ExpectedValue.GetRawSize());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = ActualBuffer.Decompress();
+ CHECK(Buffer.GetSize() == ActualRawSize);
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex];
+ CHECK(ActualIntValue == ExpectedIntValue);
+ }
+ }
+ }
+ }
+ }
+
+ // GetCacheValues
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ Writer << "Method"sv
+ << "GetCacheValues"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ for (CacheGetValueRequest& Request : GetValueRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy));
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ CbPackage Response;
+ bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(Loaded);
+ CbObjectView ResponseObject = Response.GetObject();
+ CbArrayView Responses = ResponseObject["Result"sv].AsArrayView();
+ CHECK(Responses.Num() == GetValueRequests.size());
+ int Index = 0;
+ for (CbFieldView RequestResultView : Responses)
+ {
+ CbObjectView RequestResultObject = RequestResultView.AsObjectView();
+ CbFieldView RawHashField = RequestResultObject["RawHash"sv];
+ IoHash ActualRawHash = RawHashField.AsHash();
+ bool Succeeded = !RawHashField.HasError();
+
+ CacheGetValueRequest& Request = GetValueRequests[Index++];
+ KeyData* KeyData = Request.UserData->KeyData;
+ KeyData->ReceivedGetValue = true;
+
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK(Succeeded);
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK(!Succeeded);
+ }
+ if (!KeyData->ForceMiss && Succeeded)
+ {
+ const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash);
+ CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer();
+ uint64_t ActualRawSize = UINT64_MAX;
+ if (ActualBuffer)
+ {
+ ActualRawSize = ActualBuffer.GetRawSize();
+ }
+ else
+ {
+ ActualRawSize = RequestResultObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ }
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[0];
+ CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash()));
+ CHECK(ActualRawSize == ExpectedValue.GetRawSize());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = ActualBuffer.Decompress();
+ CHECK(Buffer.GetSize() == ActualRawSize);
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[0];
+ CHECK(ActualIntValue == ExpectedIntValue);
+ }
+ }
+ }
+ }
+
+ // GetCacheChunks
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) {
+ return A.Key.Hash < B.Key.Hash;
+ });
+ Writer << "Method"sv
+ << "GetCacheChunks"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("ChunkRequests"sv);
+ for (CacheGetChunkRequest& Request : ChunkRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ Writer.AddObjectId("ValueId"sv, ValueIds[Request.UserData->ValueIndex]);
+ Writer.AddInteger("RawOffset"sv, Request.RawOffset);
+ Writer.AddInteger("RawSize"sv, Request.RawSize);
+ Writer.AddHash("ChunkId"sv, IoHash());
+ Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy));
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ CbPackage Response;
+ bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(Loaded);
+ CbObjectView ResponseObject = Response.GetObject();
+ CbArrayView Responses = ResponseObject["Result"sv].AsArrayView();
+ CHECK(Responses.Num() == ChunkRequests.size());
+ int Index = 0;
+ for (CbFieldView RequestResultView : Responses)
+ {
+ CbObjectView RequestResultObject = RequestResultView.AsObjectView();
+ CbFieldView RawHashField = RequestResultObject["RawHash"sv];
+ IoHash ActualRawHash = RawHashField.AsHash();
+ bool Succeeded = !RawHashField.HasError();
+
+ CacheGetChunkRequest& Request = ChunkRequests[Index++];
+ KeyData* KeyData = Request.UserData->KeyData;
+ int ValueIndex = Request.UserData->ValueIndex >= 0 ? Request.UserData->ValueIndex : 0;
+ KeyData->ReceivedChunk[ValueIndex] = true;
+
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK(Succeeded);
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK(!Succeeded);
+ }
+ if (KeyData->ShouldBeHit && Succeeded)
+ {
+ const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash);
+ CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer();
+ uint64_t ActualRawSize = UINT64_MAX;
+ if (ActualBuffer)
+ {
+ ActualRawSize = ActualBuffer.GetRawSize();
+ }
+ else
+ {
+ ActualRawSize = RequestResultObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ }
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex];
+ CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash()));
+ CHECK(ActualRawSize == ExpectedValue.GetRawSize());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = ActualBuffer.Decompress();
+ CHECK(Buffer.GetSize() == ActualRawSize);
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex];
+ CHECK(ActualIntValue == ExpectedIntValue);
+ }
+ }
+ }
+ }
+
+ for (KeyData& KeyData : KeyDatas)
+ {
+ if (!KeyData.UseValueAPI)
+ {
+ CHECK(KeyData.ReceivedGet);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ CHECK(KeyData.ReceivedChunk[ValueIndex]);
+ }
+ }
+ else
+ {
+ CHECK(KeyData.ReceivedGetValue);
+ CHECK(KeyData.ReceivedChunk[0]);
+ }
+ }
+}
+
# if ZEN_USE_EXEC
struct RemoteExecutionRequest
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index e23030e24..3d5359188 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -1537,433 +1537,465 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
}
-namespace GetCacheChunks::detail {
+namespace cache::detail {
- struct ValueData
+ struct RecordValue
{
Oid ValueId;
IoHash ContentId;
uint64_t RawSize;
};
- struct KeyRequestData
- {
- CacheKeyRequest Upstream;
- IoBuffer CacheValue;
- std::vector<ValueData> Values;
- CachePolicy DownstreamRecordPolicy;
- CachePolicy DownstreamPolicy;
- std::string_view Source;
- bool Exists = false;
- bool HasRequest = false;
- bool HasRecordRequest = false;
- bool HasValueRequest = false;
- bool ValuesRead = false;
+ struct Record
+ {
+ IoBuffer CacheValue;
+ std::vector<RecordValue> Values;
+ std::string_view Source;
+ CachePolicy DownstreamPolicy;
+ bool Exists = false;
+ bool HasRequest = false;
+ bool ValuesRead = false;
};
- struct ChunkRequestData
- {
- CacheChunkRequest Upstream;
- KeyRequestData* KeyRequest;
- size_t KeyRequestIndex;
- CachePolicy DownstreamPolicy;
- CompressedBuffer Value;
- std::string_view Source;
- uint64_t TotalSize = 0;
- bool Exists = false;
- bool IsRecordRequest = false;
- bool TotalSizeKnown = false;
+ struct ChunkRequest
+ {
+ CacheChunkRequest* Key = nullptr;
+ Record* Record = nullptr;
+ CompressedBuffer Value;
+ std::string_view Source;
+ uint64_t TotalSize = 0;
+ uint64_t RequestedSize = 0;
+ uint64_t RequestedOffset = 0;
+ CachePolicy DownstreamPolicy;
+ bool Exists = false;
+ bool TotalSizeKnown = false;
+ bool IsRecordRequest = false;
};
-} // namespace GetCacheChunks::detail
+} // namespace cache::detail
void
HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
{
- using namespace GetCacheChunks::detail;
+ using namespace cache::detail;
ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
- std::vector<KeyRequestData> KeyRequests;
- std::vector<ChunkRequestData> Chunks;
- if (!TryGetCacheChunks_Parse(KeyRequests, Chunks, RpcRequest))
+ std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream
+ std::vector<Record> Records; // Scratch-space data about a Record when fulfilling RecordRequests
+ std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream
+ std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest
+ std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key
+ std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key
+ std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
+
+ // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
+ if (!ParseGetCacheChunksRequest(RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
{
return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
}
- GetCacheChunks_LoadKeys(KeyRequests);
- GetCacheChunks_LoadChunks(Chunks);
- GetCacheChunks_SendResults(Chunks, HttpRequest);
+
+ // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
+ // have it locally, and otherwise append a request for the payload to UpstreamChunks
+ GetLocalCacheRecords(RecordKeys, Records, RecordRequests, UpstreamChunks);
+
+ // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks
+ GetLocalCacheValues(ValueRequests, UpstreamChunks);
+
+ // Call GetCacheChunks on the upstream for any payloads we do not have locally
+ GetUpstreamCacheChunks(UpstreamChunks, RequestKeys, Requests);
+
+ // Send the payload and descriptive data about each chunk to the client
+ WriteGetCacheChunksResponse(Requests, HttpRequest);
}
bool
-HttpStructuredCacheService::TryGetCacheChunks_Parse(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests,
- std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks,
- CbObjectView RpcRequest)
+HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::Record>& Records,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ CbObjectView RpcRequest)
{
- using namespace GetCacheChunks::detail;
+ using namespace cache::detail;
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
+ CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
+ size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num());
+
+ // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed,
+ // we will need to change the pointers to indexes to handle reallocations.
+ RecordKeys.reserve(NumRequests);
+ Records.reserve(NumRequests);
+ RequestKeys.reserve(NumRequests);
+ Requests.reserve(NumRequests);
+ RecordRequests.reserve(NumRequests);
+ ValueRequests.reserve(NumRequests);
+
+ CacheKeyRequest* PreviousRecordKey = nullptr;
+ Record* PreviousRecord = nullptr;
- KeyRequestData* PreviousKeyRequest = nullptr;
- CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
- Chunks.reserve(ChunkRequestsArray.Num());
for (CbFieldView RequestView : ChunkRequestsArray)
{
- ChunkRequestData& Chunk = Chunks.emplace_back();
- CbObjectView RequestObject = RequestView.AsObjectView();
+ CbObjectView RequestObject = RequestView.AsObjectView();
+ CacheChunkRequest& RequestKey = RequestKeys.emplace_back();
+ ChunkRequest& Request = Requests.emplace_back();
+ Request.Key = &RequestKey;
CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
CbFieldView HashField = KeyObject["Hash"sv];
- Chunk.Upstream.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash());
- if (Chunk.Upstream.Key.Bucket.empty() || HashField.HasError())
+ RequestKey.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash());
+ if (RequestKey.Key.Bucket.empty() || HashField.HasError())
{
ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest.");
return false;
}
- KeyRequestData* KeyRequest = nullptr;
- if (!PreviousKeyRequest || PreviousKeyRequest->Upstream.Key < Chunk.Upstream.Key)
- {
- KeyRequest = &KeyRequests.emplace_back();
- KeyRequest->Upstream.Key = Chunk.Upstream.Key;
- PreviousKeyRequest = KeyRequest;
- }
- else if (!(Chunk.Upstream.Key < PreviousKeyRequest->Upstream.Key))
+ RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash();
+ RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId();
+ RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64();
+ RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ Request.RequestedSize = RequestKey.RawSize;
+ Request.RequestedOffset = RequestKey.RawOffset;
+ std::string_view PolicyText = RequestObject["Policy"sv].AsString();
+ Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+ Request.IsRecordRequest = (bool)RequestKey.ValueId;
+
+ if (!Request.IsRecordRequest)
{
- KeyRequest = PreviousKeyRequest;
+ ValueRequests.push_back(&Request);
}
else
{
- ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.",
- Chunk.Upstream.Key.Bucket,
- Chunk.Upstream.Key.Hash,
- PreviousKeyRequest->Upstream.Key.Bucket,
- PreviousKeyRequest->Upstream.Key.Hash);
- return false;
- }
- Chunk.KeyRequestIndex = std::distance(KeyRequests.data(), KeyRequest);
-
- Chunk.Upstream.ChunkId = RequestObject["ChunkId"sv].AsHash();
- Chunk.Upstream.ValueId = RequestObject["ValueId"sv].AsObjectId();
- Chunk.Upstream.RawOffset = RequestObject["RawOffset"sv].AsUInt64();
- Chunk.Upstream.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
- std::string_view PolicyText = RequestObject["Policy"sv].AsString();
- Chunk.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- Chunk.IsRecordRequest = (bool)Chunk.Upstream.ValueId;
+ RecordRequests.push_back(&Request);
+ CacheKeyRequest* RecordKey = nullptr;
+ Record* Record = nullptr;
- if (!Chunk.IsRecordRequest || Chunk.Upstream.ChunkId == IoHash::Zero)
- {
- KeyRequest->DownstreamPolicy =
- KeyRequest->HasRequest ? Union(KeyRequest->DownstreamPolicy, Chunk.DownstreamPolicy) : Chunk.DownstreamPolicy;
- KeyRequest->HasRequest = true;
- (Chunk.IsRecordRequest ? KeyRequest->HasRecordRequest : KeyRequest->HasValueRequest) = true;
+ if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key)
+ {
+ RecordKey = &RecordKeys.emplace_back();
+ PreviousRecordKey = RecordKey;
+ Record = &Records.emplace_back();
+ PreviousRecord = Record;
+ RecordKey->Key = RequestKey.Key;
+ }
+ else if (RequestKey.Key == PreviousRecordKey->Key)
+ {
+ RecordKey = PreviousRecordKey;
+ Record = PreviousRecord;
+ }
+ else
+ {
+ ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.",
+ RequestKey.Key.Bucket,
+ RequestKey.Key.Hash,
+ PreviousRecordKey->Key.Bucket,
+ PreviousRecordKey->Key.Hash);
+ return false;
+ }
+ Request.Record = Record;
+ if (RequestKey.ChunkId == RequestKey.ChunkId.Zero)
+ {
+ Record->DownstreamPolicy =
+ Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy;
+ Record->HasRequest = true;
+ }
}
}
- if (Chunks.empty())
+ if (Requests.empty())
{
return false;
}
- for (ChunkRequestData& Chunk : Chunks)
- {
- Chunk.KeyRequest = &KeyRequests[Chunk.KeyRequestIndex];
- }
return true;
}
void
-HttpStructuredCacheService::GetCacheChunks_LoadKeys(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests)
+HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::Record>& Records,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks)
{
- using namespace GetCacheChunks::detail;
+ using namespace cache::detail;
std::vector<CacheKeyRequest*> UpstreamRecordRequests;
- std::vector<KeyRequestData*> UpstreamValueRequests;
- for (KeyRequestData& KeyRequest : KeyRequests)
+ for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
{
- if (KeyRequest.HasRequest)
+ CacheKeyRequest& RecordKey = RecordKeys[RecordIndex];
+ Record& Record = Records[RecordIndex];
+ if (Record.HasRequest)
{
- if (KeyRequest.HasRecordRequest)
- {
- KeyRequest.DownstreamRecordPolicy = KeyRequest.DownstreamPolicy | CachePolicy::SkipData | CachePolicy::SkipMeta;
- }
+ Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta;
- if (!KeyRequest.Exists && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryLocal))
+ if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal))
{
- // There's currently no interface for checking only whether a CacheValue exists without loading it,
- // so we load it here even if SkipData is true and its a CacheValue request.
ZenCacheValue CacheValue;
- if (m_CacheStore.Get(KeyRequest.Upstream.Key.Bucket, KeyRequest.Upstream.Key.Hash, CacheValue))
+ if (m_CacheStore.Get(RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
{
- KeyRequest.Exists = true;
- KeyRequest.CacheValue = std::move(CacheValue.Value);
- KeyRequest.Source = "LOCAL"sv;
+ Record.Exists = true;
+ Record.CacheValue = std::move(CacheValue.Value);
+ Record.Source = "LOCAL"sv;
}
}
- if (!KeyRequest.Exists)
+ if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote))
{
- // At most one of RecordRequest or ValueRequest will succeed for the upstream request of the key a given key, but we don't
- // know which,
- // and if the requests (from arbitrary Unreal Class code) includes both types of request for a key, we want to ask for both
- // kinds and pass the request that uses the one that succeeds.
- if (KeyRequest.HasRecordRequest && EnumHasAllFlags(KeyRequest.DownstreamRecordPolicy, CachePolicy::QueryRemote))
- {
- KeyRequest.Upstream.Policy = CacheRecordPolicy(ConvertToUpstream(KeyRequest.DownstreamRecordPolicy));
- UpstreamRecordRequests.push_back(&KeyRequest.Upstream);
- }
- if (KeyRequest.HasValueRequest && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryRemote))
- {
- UpstreamValueRequests.push_back(&KeyRequest);
- }
+ RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy));
+ UpstreamRecordRequests.push_back(&RecordKey);
}
}
}
if (!UpstreamRecordRequests.empty())
{
- const auto OnCacheRecordGetComplete = [this](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) {
if (!Params.Record)
{
return;
}
+ CacheKeyRequest& RecordKey = Params.Request;
+ size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
+ Record& Record = Records[RecordIndex];
- KeyRequestData& KeyRequest =
- *reinterpret_cast<KeyRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(KeyRequestData, Upstream));
- const CacheKey& Key = KeyRequest.Upstream.Key;
- KeyRequest.Exists = true;
+ const CacheKey& Key = RecordKey.Key;
+ Record.Exists = true;
CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- KeyRequest.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- KeyRequest.CacheValue.SetContentType(ZenContentType::kCbObject);
- KeyRequest.Source = "UPSTREAM"sv;
+ Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Record.CacheValue.SetContentType(ZenContentType::kCbObject);
+ Record.Source = "UPSTREAM"sv;
- if (EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::StoreLocal))
+ if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal))
{
- m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue});
+ m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Record.CacheValue});
}
};
m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
}
- if (!UpstreamValueRequests.empty())
- {
- for (KeyRequestData* KeyRequestPtr : UpstreamValueRequests)
- {
- KeyRequestData& KeyRequest = *KeyRequestPtr;
- CacheKey& Key = KeyRequest.Upstream.Key;
- GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kBinary);
- if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType()))
- {
- CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value));
- if (Result)
- {
- KeyRequest.CacheValue = std::move(UpstreamResult.Value);
- KeyRequest.CacheValue.SetContentType(ZenContentType::kCompressedBinary);
- KeyRequest.Exists = true;
- KeyRequest.Source = "UPSTREAM"sv;
- // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement
- // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive
- // for us to keep the data only on the upstream server.
- // if (EnumHasAllFlags(KeyRequest->DownstreamValuePolicy, CachePolicy::StoreLocal))
- {
- m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue});
- }
- }
- }
- }
- }
-}
-
-void
-HttpStructuredCacheService::GetCacheChunks_LoadChunks(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks)
-{
- using namespace GetCacheChunks::detail;
-
std::vector<CacheChunkRequest*> UpstreamPayloadRequests;
- for (ChunkRequestData& Chunk : Chunks)
+ for (ChunkRequest* Request : RecordRequests)
{
- if (Chunk.IsRecordRequest)
+ if (Request->Key->ChunkId == IoHash::Zero)
{
- if (Chunk.Upstream.ChunkId == IoHash::Zero)
+ // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
+ // is missing, parse the cache record and try to find the raw hash from the ValueId.
+ Record& Record = *Request->Record;
+ if (!Record.ValuesRead)
{
- // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
- // is missing, parse the cache record and try to find the raw hash from the ValueId.
- KeyRequestData& KeyRequest = *Chunk.KeyRequest;
- if (!KeyRequest.ValuesRead)
+ Record.ValuesRead = true;
+ if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject)
{
- KeyRequest.ValuesRead = true;
- if (KeyRequest.CacheValue && KeyRequest.CacheValue.GetContentType() == ZenContentType::kCbObject)
+ CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData());
+ CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
+ Record.Values.reserve(ValuesArray.Num());
+ for (CbFieldView ValueField : ValuesArray)
{
- CbObjectView RecordObject = CbObjectView(KeyRequest.CacheValue.GetData());
- CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
- KeyRequest.Values.reserve(ValuesArray.Num());
- for (CbFieldView ValueField : ValuesArray)
+ CbObjectView ValueObject = ValueField.AsObjectView();
+ Oid ValueId = ValueObject["Id"sv].AsObjectId();
+ CbFieldView RawHashField = ValueObject["RawHash"sv];
+ IoHash RawHash = RawHashField.AsBinaryAttachment();
+ if (ValueId && !RawHashField.HasError())
{
- CbObjectView ValueObject = ValueField.AsObjectView();
- Oid ValueId = ValueObject["Id"sv].AsObjectId();
- CbFieldView RawHashField = ValueObject["RawHash"sv];
- IoHash RawHash = RawHashField.AsBinaryAttachment();
- if (ValueId && !RawHashField.HasError())
- {
- KeyRequest.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
- }
+ Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
}
}
}
+ }
- for (const ValueData& Value : KeyRequest.Values)
+ for (const RecordValue& Value : Record.Values)
+ {
+ if (Value.ValueId == Request->Key->ValueId)
{
- if (Value.ValueId == Chunk.Upstream.ValueId)
- {
- Chunk.Upstream.ChunkId = Value.ContentId;
- Chunk.TotalSize = Value.RawSize;
- Chunk.TotalSizeKnown = true;
- break;
- }
+ Request->Key->ChunkId = Value.ContentId;
+ Request->TotalSize = Value.RawSize;
+ Request->TotalSizeKnown = true;
+ break;
}
}
+ }
- // Now load the ContentId from the local ContentIdStore or from the upstream
- if (Chunk.Upstream.ChunkId != IoHash::Zero)
+ // Now load the ContentId from the local ContentIdStore or from the upstream
+ if (Request->Key->ChunkId != IoHash::Zero)
+ {
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
- if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryLocal))
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown)
{
- if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData) && Chunk.TotalSizeKnown)
+ if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
{
- if (m_CidStore.ContainsChunk(Chunk.Upstream.ChunkId))
- {
- Chunk.Exists = true;
- Chunk.Source = "LOCAL"sv;
- }
+ Request->Exists = true;
+ Request->Source = "LOCAL"sv;
}
- else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Upstream.ChunkId))
+ }
+ else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ if (Compressed)
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
- if (Compressed)
+ if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
- if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
- {
- Chunk.Value = Compressed;
- }
- Chunk.Exists = true;
- Chunk.TotalSize = Compressed.GetRawSize();
- Chunk.TotalSizeKnown = true;
- Chunk.Source = "LOCAL"sv;
+ Request->Value = Compressed;
}
+ Request->Exists = true;
+ Request->TotalSize = Compressed.GetRawSize();
+ Request->TotalSizeKnown = true;
+ Request->Source = "LOCAL"sv;
}
}
- if (!Chunk.Exists && EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryRemote))
- {
- Chunk.Upstream.Policy = ConvertToUpstream(Chunk.DownstreamPolicy);
- UpstreamPayloadRequests.push_back(&Chunk.Upstream);
- }
+ }
+ if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
+ {
+ Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy);
+ OutUpstreamChunks.push_back(Request->Key);
}
}
- else
+ }
+}
+
+void
+HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks)
+{
+ using namespace cache::detail;
+
+ for (ChunkRequest* Request : ValueRequests)
+ {
+ if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
- if (Chunk.KeyRequest->Exists)
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
{
- if (Chunk.KeyRequest->CacheValue && IsCompressedBinary(Chunk.KeyRequest->CacheValue.GetContentType()))
+ if (IsCompressedBinary(CacheValue.Value.GetContentType()))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk.KeyRequest->CacheValue));
- if (Compressed)
+ CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
+ if (Result)
{
- if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
- Chunk.Value = Compressed;
+ Request->Value = Result;
}
- Chunk.Exists = true;
- Chunk.TotalSize = Compressed.GetRawSize();
- Chunk.TotalSizeKnown = true;
- Chunk.Source = Chunk.KeyRequest->Source;
- Chunk.Upstream.ChunkId = IoHash::FromBLAKE3(Compressed.GetRawHash());
+ Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash());
+ Request->Exists = true;
+ Request->TotalSize = Result.GetRawSize();
+ Request->TotalSizeKnown = true;
+ Request->Source = "LOCAL"sv;
}
}
}
}
+ if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
+ {
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal))
+ {
+ // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally
+ Request->Key->RawOffset = 0;
+ Request->Key->RawSize = UINT64_MAX;
+ }
+ OutUpstreamChunks.push_back(Request->Key);
+ }
}
+}
+
+void
+HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests)
+{
+ using namespace cache::detail;
- if (!UpstreamPayloadRequests.empty())
+ if (!UpstreamChunks.empty())
{
- const auto OnCacheValueGetComplete = [this](CacheValueGetCompleteParams&& Params) {
+ const auto OnCacheValueGetComplete = [this, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) {
if (Params.RawHash == Params.RawHash.Zero)
{
return;
}
- ChunkRequestData& Chunk =
- *reinterpret_cast<ChunkRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(ChunkRequestData, Upstream));
- if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal) ||
- !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ CacheChunkRequest& Key = Params.Request;
+ size_t RequestIndex = std::distance(RequestKeys.data(), &Key);
+ ChunkRequest& Request = Requests[RequestIndex];
+ if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) ||
+ !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
- if (!Compressed || Compressed.GetRawSize() != Params.RawSize)
+ if (!Compressed || Compressed.GetRawSize() != Params.RawSize ||
+ IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash)
{
return;
}
- if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal))
+ if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal))
{
- m_CidStore.AddChunk(Compressed);
+ if (Request.IsRecordRequest)
+ {
+ m_CidStore.AddChunk(Compressed);
+ }
+ else
+ {
+ m_CacheStore.Put(Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value});
+ }
}
- if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
- Chunk.Value = std::move(Compressed);
+ Request.Value = std::move(Compressed);
}
}
- Chunk.Exists = true;
- Chunk.TotalSize = Params.RawSize;
- Chunk.TotalSizeKnown = true;
- Chunk.Source = "UPSTREAM"sv;
+ Key.ChunkId = Params.RawHash;
+ Request.Exists = true;
+ Request.TotalSize = Params.RawSize;
+ Request.TotalSizeKnown = true;
+ Request.Source = "UPSTREAM"sv;
m_CacheStats.UpstreamHitCount++;
};
- m_UpstreamCache.GetCacheValues(UpstreamPayloadRequests, std::move(OnCacheValueGetComplete));
+ m_UpstreamCache.GetCacheValues(UpstreamChunks, std::move(OnCacheValueGetComplete));
}
}
void
-HttpStructuredCacheService::GetCacheChunks_SendResults(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks,
- zen::HttpServerRequest& HttpRequest)
+HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests,
+ zen::HttpServerRequest& HttpRequest)
{
- using namespace GetCacheChunks::detail;
+ using namespace cache::detail;
CbPackage RpcResponse;
CbObjectWriter Writer;
Writer.BeginArray("Result"sv);
- for (ChunkRequestData& Chunk : Chunks)
+ for (ChunkRequest& Request : Requests)
{
Writer.BeginObject();
{
- if (Chunk.Exists)
+ if (Request.Exists)
{
- Writer.AddHash("RawHash"sv, Chunk.Upstream.ChunkId);
- if (Chunk.Value && !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ Writer.AddHash("RawHash"sv, Request.Key->ChunkId);
+ if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
- RpcResponse.AddAttachment(CbAttachment(Chunk.Value));
+ RpcResponse.AddAttachment(CbAttachment(Request.Value));
}
else
{
- Writer.AddInteger("RawSize"sv, Chunk.TotalSize);
+ Writer.AddInteger("RawSize"sv, Request.TotalSize);
}
- ZEN_DEBUG("CHUNKHIT - '{}/{}/{}' {} '{}' ({})",
- Chunk.Upstream.Key.Bucket,
- Chunk.Upstream.Key.Hash,
- Chunk.Upstream.ValueId,
- NiceBytes(Chunk.TotalSize),
- Chunk.IsRecordRequest ? "Record"sv : "Value"sv,
- Chunk.Source);
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ Request.Key->Key.Bucket,
+ Request.Key->Key.Hash,
+ Request.Key->ValueId,
+ NiceBytes(Request.TotalSize),
+ Request.IsRecordRequest ? "Record"sv : "Value"sv,
+ Request.Source);
m_CacheStats.HitCount++;
}
- else if (!EnumHasAnyFlags(Chunk.DownstreamPolicy, CachePolicy::Query))
+ else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query))
{
- ZEN_DEBUG("CHUNKSKIP - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId);
+ ZEN_DEBUG("SKIP - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId);
}
else
{
- ZEN_DEBUG("MISS - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId);
+ ZEN_DEBUG("MISS - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId);
m_CacheStats.MissCount++;
}
}
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 9d60c5bf4..3477e6c4f 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -17,6 +17,8 @@ class logger;
namespace zen {
+struct CacheChunkRequest;
+struct CacheKeyRequest;
class CasStore;
class CidStore;
class CbObjectView;
@@ -26,10 +28,10 @@ class UpstreamCache;
class ZenCacheStore;
enum class CachePolicy : uint32_t;
-namespace GetCacheChunks::detail {
- struct KeyRequestData;
- struct ChunkRequestData;
-} // namespace GetCacheChunks::detail
+namespace cache::detail {
+ struct Record;
+ struct ChunkRequest;
+} // namespace cache::detail
/**
* Structured cache service. Imposes constraints on keys, supports blobs and
@@ -114,12 +116,27 @@ private:
virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
- bool TryGetCacheChunks_Parse(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests,
- std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks,
- CbObjectView RpcRequest);
- void GetCacheChunks_LoadKeys(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests);
- void GetCacheChunks_LoadChunks(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks);
- void GetCacheChunks_SendResults(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, zen::HttpServerRequest& HttpRequest);
+ /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
+ bool ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::Record>& Records,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ CbObjectView RpcRequest);
+ /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */
+ void GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::Record>& Records,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks);
+ /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */
+ void GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks);
+ /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */
+ void GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests);
+ /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
+ void WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests, zen::HttpServerRequest& HttpRequest);
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
diff --git a/zenutil/include/zenutil/cache/cachepolicy.h b/zenutil/include/zenutil/cache/cachepolicy.h
index b249982fc..9a745e42c 100644
--- a/zenutil/include/zenutil/cache/cachepolicy.h
+++ b/zenutil/include/zenutil/cache/cachepolicy.h
@@ -10,7 +10,6 @@
#include <gsl/gsl-lite.hpp>
#include <span>
-
namespace zen::Private {
class ICacheRecordPolicyShared;
}