diff options
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 623 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 552 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 37 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cachepolicy.h | 1 |
4 files changed, 942 insertions, 271 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index b46106287..a9e096dac 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1712,6 +1712,629 @@ TEST_CASE("zcache.rpc") } } +TEST_CASE("zcache.rpc.allpolicies") +{ + using namespace std::literals; + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamServer(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalServer(TestEnv); + const uint16_t LocalPortNumber = 13337; + const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); + + SpawnServer(UpstreamServer, UpstreamCfg); + SpawnServer(LocalServer, LocalCfg); + + std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv; + std::string_view TestBucket = "allpoliciestest"sv; + + // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local) + // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type) + constexpr int NumKeys = 256; + constexpr int NumValues = 4; + Oid ValueIds[NumValues]; + IoHash Hash; + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + ExtendableStringBuilder<16> ValueName; + ValueName << "ValueId_"sv << ValueIndex; + static_assert(sizeof(IoHash) >= sizeof(Oid)); + ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash); + } + + struct KeyData; + struct UserData + { + UserData& Set(KeyData* InKeyData, int InValueIndex) + { + KeyData = InKeyData; + ValueIndex = InValueIndex; + return *this; + } + KeyData* KeyData = nullptr; + int ValueIndex = 0; + }; + struct KeyData + { + CompressedBuffer BufferValues[NumValues]; + uint64_t IntValues[NumValues]; + UserData ValueUserData[NumValues]; + bool ReceivedChunk[NumValues]; + CacheKey Key; + UserData KeyUserData; + uint32_t KeyIndex = 0; + bool GetRequestsData = true; + bool UseValueAPI = false; + bool UseValuePolicy = false; + bool ForceMiss = false; + bool UseLocal = true; + bool UseRemote = true; + bool ShouldBeHit = true; + bool ReceivedPut = false; + bool ReceivedGet = false; + bool ReceivedPutValue = false; + bool ReceivedGetValue = false; + }; + struct CachePutRequest + { + CacheKey Key; + CbObject Record; + CacheRecordPolicy Policy; + KeyData* Values; + UserData* UserData; + }; + struct CachePutValueRequest + { + CacheKey Key; + CompressedBuffer Value; + CachePolicy Policy; + UserData* UserData; + }; + struct CacheGetRequest + { + CacheKey Key; + CacheRecordPolicy Policy; + UserData* UserData; + }; + struct CacheGetValueRequest + { + CacheKey Key; + CachePolicy Policy; + UserData* UserData; + }; + struct CacheGetChunkRequest + { + CacheKey Key; + Oid ValueId; + uint64_t RawOffset; + uint64_t RawSize; + IoHash RawHash; + CachePolicy Policy; + UserData* UserData; + }; + + KeyData KeyDatas[NumKeys]; + std::vector<CachePutRequest> PutRequests; + std::vector<CachePutValueRequest> PutValueRequests; + std::vector<CacheGetRequest> GetRequests; + std::vector<CacheGetValueRequest> GetValueRequests; + std::vector<CacheGetChunkRequest> ChunkRequests; + + for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex) + { + IoHashStream KeyWriter; + KeyWriter.Append(TestVersion.data(), TestVersion.length() * sizeof(TestVersion.data()[0])); + KeyWriter.Append(&KeyIndex, sizeof(KeyIndex)); + IoHash KeyHash = KeyWriter.GetHash(); + KeyData& KeyData = KeyDatas[KeyIndex]; + + KeyData.Key = CacheKey::Create(TestBucket, KeyHash); + KeyData.KeyIndex = KeyIndex; + KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0; + KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0; + KeyData.UseValuePolicy = (KeyIndex & (1 << 3)) != 0; + KeyData.ForceMiss = (KeyIndex & (1 << 4)) == 0; + KeyData.UseLocal = (KeyIndex & (1 << 5)) == 0; + KeyData.UseRemote = (KeyIndex & (1 << 6)) == 0; + KeyData.ShouldBeHit = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote); + CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None; + SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None; + CachePolicy PutPolicy = SharedPolicy; + CachePolicy GetPolicy = SharedPolicy; + GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None; + CacheKey& Key = KeyData.Key; + + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + KeyData.IntValues[ValueIndex] = static_cast<uint64_t>(KeyIndex) | (static_cast<uint64_t>(ValueIndex) << 32); + KeyData.BufferValues[ValueIndex] = + CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex]))); + KeyData.ReceivedChunk[ValueIndex] = false; + } + + UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex); + } + if (!KeyData.UseValueAPI) + { + CbObjectWriter Builder; + Builder.BeginObject("key"sv); + Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash; + Builder.EndObject(); + Builder.BeginArray("Values"sv); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + Builder.BeginObject(); + Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]); + Builder.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(KeyData.BufferValues[ValueIndex].GetRawHash())); + Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].GetRawSize()); + Builder.EndObject(); + } + Builder.EndArray(); + + CacheRecordPolicy PutRecordPolicy; + CacheRecordPolicy GetRecordPolicy; + if (!KeyData.UseValuePolicy) + { + PutRecordPolicy = CacheRecordPolicy(PutPolicy); + GetRecordPolicy = CacheRecordPolicy(GetPolicy); + } + else + { + // Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies + // it will use the wrong value for SkipData and fail our tests. + CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData); + CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy); + GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy); + } + PutRecordPolicy = PutBuilder.Build(); + GetRecordPolicy = GetBuilder.Build(); + } + if (!KeyData.ForceMiss) + { + PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData}); + } + GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData}); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + UserData& ValueUserData = KeyData.ValueUserData[ValueIndex]; + ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData}); + } + } + else + { + if (!KeyData.ForceMiss) + { + PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData}); + } + GetValueRequests.push_back({Key, GetPolicy, &KeyUserData}); + ChunkRequests.push_back({Key, Oid(), 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData}); + } + } + + // PutCacheRecords + { + CbPackage Package; + CbObjectWriter Writer; + Writer << "Method"sv + << "PutCacheRecords"sv; + Writer.BeginObject("Params"sv); + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer.BeginArray("Requests"sv); + for (CachePutRequest& Request : PutRequests) + { + Writer.BeginObject(); + { + Writer.BeginObject("Record"sv); + { + Writer.BeginObject("Key"sv); + { + Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash; + } + Writer.EndObject(); + Writer.BeginArray("Values"sv); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + Writer.BeginObject(); + { + CompressedBuffer Buffer = Request.Values->BufferValues[ValueIndex]; + Writer.AddObjectId("Id"sv, ValueIds[ValueIndex]); + Writer.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Buffer.GetRawHash())); + Package.AddAttachment(CbAttachment(Buffer)); + Writer.AddInteger("RawSize"sv, Buffer.GetRawSize()); + } + Writer.EndObject(); + } + Writer.EndArray(); + } + Writer.EndObject(); + Writer.SetName("Policy"sv); + Request.Policy.Save(Writer); + } + Writer.EndObject(); + Request.UserData->KeyData->ReceivedPut = true; + } + Writer.EndArray(); + } + Writer.EndObject(); + Package.SetObject(Writer.Save()); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK(Result.status_code == 200); + } + + // PutCacheValues + { + CbPackage Package; + CbObjectWriter Writer; + Writer << "Method"sv + << "PutCacheValues"sv; + Writer.BeginObject("Params"sv); + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer.BeginArray("Requests"sv); + for (CachePutValueRequest& Request : PutValueRequests) + { + Writer.BeginObject(); + { + Writer.BeginObject("Key"sv); + { + Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash; + } + Writer.EndObject(); + CompressedBuffer Buffer = Request.Value; + Writer.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Buffer.GetRawHash())); + Package.AddAttachment(CbAttachment(Buffer)); + Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy)); + } + Writer.EndObject(); + Request.UserData->KeyData->ReceivedPutValue = true; + } + Writer.EndArray(); + } + Writer.EndObject(); + Package.SetObject(Writer.Save()); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK(Result.status_code == 200); + } + + for (KeyData& KeyData : KeyDatas) + { + if (!KeyData.ForceMiss) + { + if (!KeyData.UseValueAPI) + { + CHECK(KeyData.ReceivedPut); + } + else + { + CHECK(KeyData.ReceivedPutValue); + } + } + } + + // GetCacheRecords + { + CbPackage Package; + CbObjectWriter Writer; + Writer << "Method"sv + << "GetCacheRecords"sv; + Writer.BeginObject("Params"sv); + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer.BeginArray("Requests"sv); + for (CacheGetRequest& Request : GetRequests) + { + Writer.BeginObject(); + { + Writer.BeginObject("Key"sv); + { + Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash; + } + Writer.EndObject(); + Writer.SetName("Policy"sv); + Request.Policy.Save(Writer); + } + Writer.EndObject(); + } + Writer.EndArray(); + } + Writer.EndObject(); + Package.SetObject(Writer.Save()); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK(Result.status_code == 200); + CbPackage Response; + bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(Loaded); + CbObjectView ResponseObject = Response.GetObject(); + CbArrayView Responses = ResponseObject["Result"sv].AsArrayView(); + CHECK(Responses.Num() == GetRequests.size()); + int Index = 0; + for (CbFieldView ResponseField : Responses) + { + CbObjectView RecordView = ResponseField.AsObjectView(); + bool Succeeded = !ResponseField.HasError(); + + CacheGetRequest& Request = GetRequests[Index++]; + KeyData* KeyData = Request.UserData->KeyData; + KeyData->ReceivedGet = true; + + if (KeyData->ShouldBeHit) + { + CHECK(Succeeded); + } + else if (KeyData->ForceMiss) + { + CHECK(!Succeeded); + } + if (!KeyData->ForceMiss && Succeeded) + { + CbArrayView ValuesArray = RecordView["Values"sv].AsArrayView(); + CHECK(ValuesArray.Num() == NumValues); + + for (CbFieldView ValueField : ValuesArray) + { + CbObjectView ValueObject = ValueField.AsObjectView(); + Oid ActualValueId = ValueObject["Id"sv].AsObjectId(); + int ExpectedValueIndex = 0; + for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex) + { + if (ValueIds[ExpectedValueIndex] == ActualValueId) + { + break; + } + } + CHECK(ExpectedValueIndex < NumValues); + IoHash ActualRawHash = ValueObject["RawHash"sv].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash); + CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer(); + uint64_t ActualRawSize = UINT64_MAX; + if (ActualBuffer) + { + ActualRawSize = ActualBuffer.GetRawSize(); + } + else + { + ActualRawSize = ValueObject["RawSize"sv].AsUInt64(UINT64_MAX); + } + CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex]; + CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash())); + CHECK(ActualRawSize == ExpectedValue.GetRawSize()); + + if (KeyData->GetRequestsData) + { + SharedBuffer Buffer = ActualBuffer.Decompress(); + CHECK(Buffer.GetSize() == ActualRawSize); + uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; + uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex]; + CHECK(ActualIntValue == ExpectedIntValue); + } + } + } + } + } + + // GetCacheValues + { + CbPackage Package; + CbObjectWriter Writer; + Writer << "Method"sv + << "GetCacheValues"sv; + Writer.BeginObject("Params"sv); + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer.BeginArray("Requests"sv); + for (CacheGetValueRequest& Request : GetValueRequests) + { + Writer.BeginObject(); + { + Writer.BeginObject("Key"sv); + { + Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash; + } + Writer.EndObject(); + Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy)); + } + Writer.EndObject(); + } + Writer.EndArray(); + } + Writer.EndObject(); + Package.SetObject(Writer.Save()); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK(Result.status_code == 200); + CbPackage Response; + bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(Loaded); + CbObjectView ResponseObject = Response.GetObject(); + CbArrayView Responses = ResponseObject["Result"sv].AsArrayView(); + CHECK(Responses.Num() == GetValueRequests.size()); + int Index = 0; + for (CbFieldView RequestResultView : Responses) + { + CbObjectView RequestResultObject = RequestResultView.AsObjectView(); + CbFieldView RawHashField = RequestResultObject["RawHash"sv]; + IoHash ActualRawHash = RawHashField.AsHash(); + bool Succeeded = !RawHashField.HasError(); + + CacheGetValueRequest& Request = GetValueRequests[Index++]; + KeyData* KeyData = Request.UserData->KeyData; + KeyData->ReceivedGetValue = true; + + if (KeyData->ShouldBeHit) + { + CHECK(Succeeded); + } + else if (KeyData->ForceMiss) + { + CHECK(!Succeeded); + } + if (!KeyData->ForceMiss && Succeeded) + { + const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash); + CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer(); + uint64_t ActualRawSize = UINT64_MAX; + if (ActualBuffer) + { + ActualRawSize = ActualBuffer.GetRawSize(); + } + else + { + ActualRawSize = RequestResultObject["RawSize"sv].AsUInt64(UINT64_MAX); + } + CompressedBuffer ExpectedValue = KeyData->BufferValues[0]; + CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash())); + CHECK(ActualRawSize == ExpectedValue.GetRawSize()); + + if (KeyData->GetRequestsData) + { + SharedBuffer Buffer = ActualBuffer.Decompress(); + CHECK(Buffer.GetSize() == ActualRawSize); + uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; + uint64_t ExpectedIntValue = KeyData->IntValues[0]; + CHECK(ActualIntValue == ExpectedIntValue); + } + } + } + } + + // GetCacheChunks + { + CbPackage Package; + CbObjectWriter Writer; + std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) { + return A.Key.Hash < B.Key.Hash; + }); + Writer << "Method"sv + << "GetCacheChunks"sv; + Writer.BeginObject("Params"sv); + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer.BeginArray("ChunkRequests"sv); + for (CacheGetChunkRequest& Request : ChunkRequests) + { + Writer.BeginObject(); + { + Writer.BeginObject("Key"sv); + { + Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash; + } + Writer.EndObject(); + Writer.AddObjectId("ValueId"sv, ValueIds[Request.UserData->ValueIndex]); + Writer.AddInteger("RawOffset"sv, Request.RawOffset); + Writer.AddInteger("RawSize"sv, Request.RawSize); + Writer.AddHash("ChunkId"sv, IoHash()); + Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy)); + } + Writer.EndObject(); + } + Writer.EndArray(); + } + Writer.EndObject(); + Package.SetObject(Writer.Save()); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK(Result.status_code == 200); + CbPackage Response; + bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(Loaded); + CbObjectView ResponseObject = Response.GetObject(); + CbArrayView Responses = ResponseObject["Result"sv].AsArrayView(); + CHECK(Responses.Num() == ChunkRequests.size()); + int Index = 0; + for (CbFieldView RequestResultView : Responses) + { + CbObjectView RequestResultObject = RequestResultView.AsObjectView(); + CbFieldView RawHashField = RequestResultObject["RawHash"sv]; + IoHash ActualRawHash = RawHashField.AsHash(); + bool Succeeded = !RawHashField.HasError(); + + CacheGetChunkRequest& Request = ChunkRequests[Index++]; + KeyData* KeyData = Request.UserData->KeyData; + int ValueIndex = Request.UserData->ValueIndex >= 0 ? Request.UserData->ValueIndex : 0; + KeyData->ReceivedChunk[ValueIndex] = true; + + if (KeyData->ShouldBeHit) + { + CHECK(Succeeded); + } + else if (KeyData->ForceMiss) + { + CHECK(!Succeeded); + } + if (KeyData->ShouldBeHit && Succeeded) + { + const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash); + CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer(); + uint64_t ActualRawSize = UINT64_MAX; + if (ActualBuffer) + { + ActualRawSize = ActualBuffer.GetRawSize(); + } + else + { + ActualRawSize = RequestResultObject["RawSize"sv].AsUInt64(UINT64_MAX); + } + CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex]; + CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash())); + CHECK(ActualRawSize == ExpectedValue.GetRawSize()); + + if (KeyData->GetRequestsData) + { + SharedBuffer Buffer = ActualBuffer.Decompress(); + CHECK(Buffer.GetSize() == ActualRawSize); + uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; + uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex]; + CHECK(ActualIntValue == ExpectedIntValue); + } + } + } + } + + for (KeyData& KeyData : KeyDatas) + { + if (!KeyData.UseValueAPI) + { + CHECK(KeyData.ReceivedGet); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + CHECK(KeyData.ReceivedChunk[ValueIndex]); + } + } + else + { + CHECK(KeyData.ReceivedGetValue); + CHECK(KeyData.ReceivedChunk[0]); + } + } +} + # if ZEN_USE_EXEC struct RemoteExecutionRequest diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index e23030e24..3d5359188 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1537,433 +1537,465 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } -namespace GetCacheChunks::detail { +namespace cache::detail { - struct ValueData + struct RecordValue { Oid ValueId; IoHash ContentId; uint64_t RawSize; }; - struct KeyRequestData - { - CacheKeyRequest Upstream; - IoBuffer CacheValue; - std::vector<ValueData> Values; - CachePolicy DownstreamRecordPolicy; - CachePolicy DownstreamPolicy; - std::string_view Source; - bool Exists = false; - bool HasRequest = false; - bool HasRecordRequest = false; - bool HasValueRequest = false; - bool ValuesRead = false; + struct Record + { + IoBuffer CacheValue; + std::vector<RecordValue> Values; + std::string_view Source; + CachePolicy DownstreamPolicy; + bool Exists = false; + bool HasRequest = false; + bool ValuesRead = false; }; - struct ChunkRequestData - { - CacheChunkRequest Upstream; - KeyRequestData* KeyRequest; - size_t KeyRequestIndex; - CachePolicy DownstreamPolicy; - CompressedBuffer Value; - std::string_view Source; - uint64_t TotalSize = 0; - bool Exists = false; - bool IsRecordRequest = false; - bool TotalSizeKnown = false; + struct ChunkRequest + { + CacheChunkRequest* Key = nullptr; + Record* Record = nullptr; + CompressedBuffer Value; + std::string_view Source; + uint64_t TotalSize = 0; + uint64_t RequestedSize = 0; + uint64_t RequestedOffset = 0; + CachePolicy DownstreamPolicy; + bool Exists = false; + bool TotalSizeKnown = false; + bool IsRecordRequest = false; }; -} // namespace GetCacheChunks::detail +} // namespace cache::detail void HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) { - using namespace GetCacheChunks::detail; + using namespace cache::detail; ZEN_TRACE_CPU("Z$::RpcGetCacheChunks"); - std::vector<KeyRequestData> KeyRequests; - std::vector<ChunkRequestData> Chunks; - if (!TryGetCacheChunks_Parse(KeyRequests, Chunks, RpcRequest)) + std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream + std::vector<Record> Records; // Scratch-space data about a Record when fulfilling RecordRequests + std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream + std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest + std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key + std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key + std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream + + // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests + if (!ParseGetCacheChunksRequest(RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } - GetCacheChunks_LoadKeys(KeyRequests); - GetCacheChunks_LoadChunks(Chunks); - GetCacheChunks_SendResults(Chunks, HttpRequest); + + // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we + // have it locally, and otherwise append a request for the payload to UpstreamChunks + GetLocalCacheRecords(RecordKeys, Records, RecordRequests, UpstreamChunks); + + // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks + GetLocalCacheValues(ValueRequests, UpstreamChunks); + + // Call GetCacheChunks on the upstream for any payloads we do not have locally + GetUpstreamCacheChunks(UpstreamChunks, RequestKeys, Requests); + + // Send the payload and descriptive data about each chunk to the client + WriteGetCacheChunksResponse(Requests, HttpRequest); } bool -HttpStructuredCacheService::TryGetCacheChunks_Parse(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests, - std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, - CbObjectView RpcRequest) +HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::Record>& Records, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + CbObjectView RpcRequest) { - using namespace GetCacheChunks::detail; + using namespace cache::detail; ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; + CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); + size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num()); + + // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed, + // we will need to change the pointers to indexes to handle reallocations. + RecordKeys.reserve(NumRequests); + Records.reserve(NumRequests); + RequestKeys.reserve(NumRequests); + Requests.reserve(NumRequests); + RecordRequests.reserve(NumRequests); + ValueRequests.reserve(NumRequests); + + CacheKeyRequest* PreviousRecordKey = nullptr; + Record* PreviousRecord = nullptr; - KeyRequestData* PreviousKeyRequest = nullptr; - CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); - Chunks.reserve(ChunkRequestsArray.Num()); for (CbFieldView RequestView : ChunkRequestsArray) { - ChunkRequestData& Chunk = Chunks.emplace_back(); - CbObjectView RequestObject = RequestView.AsObjectView(); + CbObjectView RequestObject = RequestView.AsObjectView(); + CacheChunkRequest& RequestKey = RequestKeys.emplace_back(); + ChunkRequest& Request = Requests.emplace_back(); + Request.Key = &RequestKey; CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); CbFieldView HashField = KeyObject["Hash"sv]; - Chunk.Upstream.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash()); - if (Chunk.Upstream.Key.Bucket.empty() || HashField.HasError()) + RequestKey.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash()); + if (RequestKey.Key.Bucket.empty() || HashField.HasError()) { ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest."); return false; } - KeyRequestData* KeyRequest = nullptr; - if (!PreviousKeyRequest || PreviousKeyRequest->Upstream.Key < Chunk.Upstream.Key) - { - KeyRequest = &KeyRequests.emplace_back(); - KeyRequest->Upstream.Key = Chunk.Upstream.Key; - PreviousKeyRequest = KeyRequest; - } - else if (!(Chunk.Upstream.Key < PreviousKeyRequest->Upstream.Key)) + RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash(); + RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId(); + RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64(); + RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); + Request.RequestedSize = RequestKey.RawSize; + Request.RequestedOffset = RequestKey.RawOffset; + std::string_view PolicyText = RequestObject["Policy"sv].AsString(); + Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; + Request.IsRecordRequest = (bool)RequestKey.ValueId; + + if (!Request.IsRecordRequest) { - KeyRequest = PreviousKeyRequest; + ValueRequests.push_back(&Request); } else { - ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", - Chunk.Upstream.Key.Bucket, - Chunk.Upstream.Key.Hash, - PreviousKeyRequest->Upstream.Key.Bucket, - PreviousKeyRequest->Upstream.Key.Hash); - return false; - } - Chunk.KeyRequestIndex = std::distance(KeyRequests.data(), KeyRequest); - - Chunk.Upstream.ChunkId = RequestObject["ChunkId"sv].AsHash(); - Chunk.Upstream.ValueId = RequestObject["ValueId"sv].AsObjectId(); - Chunk.Upstream.RawOffset = RequestObject["RawOffset"sv].AsUInt64(); - Chunk.Upstream.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); - std::string_view PolicyText = RequestObject["Policy"sv].AsString(); - Chunk.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - Chunk.IsRecordRequest = (bool)Chunk.Upstream.ValueId; + RecordRequests.push_back(&Request); + CacheKeyRequest* RecordKey = nullptr; + Record* Record = nullptr; - if (!Chunk.IsRecordRequest || Chunk.Upstream.ChunkId == IoHash::Zero) - { - KeyRequest->DownstreamPolicy = - KeyRequest->HasRequest ? Union(KeyRequest->DownstreamPolicy, Chunk.DownstreamPolicy) : Chunk.DownstreamPolicy; - KeyRequest->HasRequest = true; - (Chunk.IsRecordRequest ? KeyRequest->HasRecordRequest : KeyRequest->HasValueRequest) = true; + if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key) + { + RecordKey = &RecordKeys.emplace_back(); + PreviousRecordKey = RecordKey; + Record = &Records.emplace_back(); + PreviousRecord = Record; + RecordKey->Key = RequestKey.Key; + } + else if (RequestKey.Key == PreviousRecordKey->Key) + { + RecordKey = PreviousRecordKey; + Record = PreviousRecord; + } + else + { + ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", + RequestKey.Key.Bucket, + RequestKey.Key.Hash, + PreviousRecordKey->Key.Bucket, + PreviousRecordKey->Key.Hash); + return false; + } + Request.Record = Record; + if (RequestKey.ChunkId == RequestKey.ChunkId.Zero) + { + Record->DownstreamPolicy = + Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy; + Record->HasRequest = true; + } } } - if (Chunks.empty()) + if (Requests.empty()) { return false; } - for (ChunkRequestData& Chunk : Chunks) - { - Chunk.KeyRequest = &KeyRequests[Chunk.KeyRequestIndex]; - } return true; } void -HttpStructuredCacheService::GetCacheChunks_LoadKeys(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests) +HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::Record>& Records, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks) { - using namespace GetCacheChunks::detail; + using namespace cache::detail; std::vector<CacheKeyRequest*> UpstreamRecordRequests; - std::vector<KeyRequestData*> UpstreamValueRequests; - for (KeyRequestData& KeyRequest : KeyRequests) + for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) { - if (KeyRequest.HasRequest) + CacheKeyRequest& RecordKey = RecordKeys[RecordIndex]; + Record& Record = Records[RecordIndex]; + if (Record.HasRequest) { - if (KeyRequest.HasRecordRequest) - { - KeyRequest.DownstreamRecordPolicy = KeyRequest.DownstreamPolicy | CachePolicy::SkipData | CachePolicy::SkipMeta; - } + Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta; - if (!KeyRequest.Exists && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryLocal)) + if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal)) { - // There's currently no interface for checking only whether a CacheValue exists without loading it, - // so we load it here even if SkipData is true and its a CacheValue request. ZenCacheValue CacheValue; - if (m_CacheStore.Get(KeyRequest.Upstream.Key.Bucket, KeyRequest.Upstream.Key.Hash, CacheValue)) + if (m_CacheStore.Get(RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) { - KeyRequest.Exists = true; - KeyRequest.CacheValue = std::move(CacheValue.Value); - KeyRequest.Source = "LOCAL"sv; + Record.Exists = true; + Record.CacheValue = std::move(CacheValue.Value); + Record.Source = "LOCAL"sv; } } - if (!KeyRequest.Exists) + if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote)) { - // At most one of RecordRequest or ValueRequest will succeed for the upstream request of the key a given key, but we don't - // know which, - // and if the requests (from arbitrary Unreal Class code) includes both types of request for a key, we want to ask for both - // kinds and pass the request that uses the one that succeeds. - if (KeyRequest.HasRecordRequest && EnumHasAllFlags(KeyRequest.DownstreamRecordPolicy, CachePolicy::QueryRemote)) - { - KeyRequest.Upstream.Policy = CacheRecordPolicy(ConvertToUpstream(KeyRequest.DownstreamRecordPolicy)); - UpstreamRecordRequests.push_back(&KeyRequest.Upstream); - } - if (KeyRequest.HasValueRequest && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryRemote)) - { - UpstreamValueRequests.push_back(&KeyRequest); - } + RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy)); + UpstreamRecordRequests.push_back(&RecordKey); } } } if (!UpstreamRecordRequests.empty()) { - const auto OnCacheRecordGetComplete = [this](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; } + CacheKeyRequest& RecordKey = Params.Request; + size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); + Record& Record = Records[RecordIndex]; - KeyRequestData& KeyRequest = - *reinterpret_cast<KeyRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(KeyRequestData, Upstream)); - const CacheKey& Key = KeyRequest.Upstream.Key; - KeyRequest.Exists = true; + const CacheKey& Key = RecordKey.Key; + Record.Exists = true; CbObject ObjectBuffer = CbObject::Clone(Params.Record); - KeyRequest.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - KeyRequest.CacheValue.SetContentType(ZenContentType::kCbObject); - KeyRequest.Source = "UPSTREAM"sv; + Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + Record.CacheValue.SetContentType(ZenContentType::kCbObject); + Record.Source = "UPSTREAM"sv; - if (EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::StoreLocal)) + if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) { - m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue}); + m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); } }; m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } - if (!UpstreamValueRequests.empty()) - { - for (KeyRequestData* KeyRequestPtr : UpstreamValueRequests) - { - KeyRequestData& KeyRequest = *KeyRequestPtr; - CacheKey& Key = KeyRequest.Upstream.Key; - GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kBinary); - if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType())) - { - CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)); - if (Result) - { - KeyRequest.CacheValue = std::move(UpstreamResult.Value); - KeyRequest.CacheValue.SetContentType(ZenContentType::kCompressedBinary); - KeyRequest.Exists = true; - KeyRequest.Source = "UPSTREAM"sv; - // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement - // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive - // for us to keep the data only on the upstream server. - // if (EnumHasAllFlags(KeyRequest->DownstreamValuePolicy, CachePolicy::StoreLocal)) - { - m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue}); - } - } - } - } - } -} - -void -HttpStructuredCacheService::GetCacheChunks_LoadChunks(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks) -{ - using namespace GetCacheChunks::detail; - std::vector<CacheChunkRequest*> UpstreamPayloadRequests; - for (ChunkRequestData& Chunk : Chunks) + for (ChunkRequest* Request : RecordRequests) { - if (Chunk.IsRecordRequest) + if (Request->Key->ChunkId == IoHash::Zero) { - if (Chunk.Upstream.ChunkId == IoHash::Zero) + // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) + // is missing, parse the cache record and try to find the raw hash from the ValueId. + Record& Record = *Request->Record; + if (!Record.ValuesRead) { - // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) - // is missing, parse the cache record and try to find the raw hash from the ValueId. - KeyRequestData& KeyRequest = *Chunk.KeyRequest; - if (!KeyRequest.ValuesRead) + Record.ValuesRead = true; + if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject) { - KeyRequest.ValuesRead = true; - if (KeyRequest.CacheValue && KeyRequest.CacheValue.GetContentType() == ZenContentType::kCbObject) + CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); + CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); + Record.Values.reserve(ValuesArray.Num()); + for (CbFieldView ValueField : ValuesArray) { - CbObjectView RecordObject = CbObjectView(KeyRequest.CacheValue.GetData()); - CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); - KeyRequest.Values.reserve(ValuesArray.Num()); - for (CbFieldView ValueField : ValuesArray) + CbObjectView ValueObject = ValueField.AsObjectView(); + Oid ValueId = ValueObject["Id"sv].AsObjectId(); + CbFieldView RawHashField = ValueObject["RawHash"sv]; + IoHash RawHash = RawHashField.AsBinaryAttachment(); + if (ValueId && !RawHashField.HasError()) { - CbObjectView ValueObject = ValueField.AsObjectView(); - Oid ValueId = ValueObject["Id"sv].AsObjectId(); - CbFieldView RawHashField = ValueObject["RawHash"sv]; - IoHash RawHash = RawHashField.AsBinaryAttachment(); - if (ValueId && !RawHashField.HasError()) - { - KeyRequest.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); - } + Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); } } } + } - for (const ValueData& Value : KeyRequest.Values) + for (const RecordValue& Value : Record.Values) + { + if (Value.ValueId == Request->Key->ValueId) { - if (Value.ValueId == Chunk.Upstream.ValueId) - { - Chunk.Upstream.ChunkId = Value.ContentId; - Chunk.TotalSize = Value.RawSize; - Chunk.TotalSizeKnown = true; - break; - } + Request->Key->ChunkId = Value.ContentId; + Request->TotalSize = Value.RawSize; + Request->TotalSizeKnown = true; + break; } } + } - // Now load the ContentId from the local ContentIdStore or from the upstream - if (Chunk.Upstream.ChunkId != IoHash::Zero) + // Now load the ContentId from the local ContentIdStore or from the upstream + if (Request->Key->ChunkId != IoHash::Zero) + { + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { - if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryLocal)) + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown) { - if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData) && Chunk.TotalSizeKnown) + if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) { - if (m_CidStore.ContainsChunk(Chunk.Upstream.ChunkId)) - { - Chunk.Exists = true; - Chunk.Source = "LOCAL"sv; - } + Request->Exists = true; + Request->Source = "LOCAL"sv; } - else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Upstream.ChunkId)) + } + else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + if (Compressed) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); - if (Compressed) + if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { - if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) - { - Chunk.Value = Compressed; - } - Chunk.Exists = true; - Chunk.TotalSize = Compressed.GetRawSize(); - Chunk.TotalSizeKnown = true; - Chunk.Source = "LOCAL"sv; + Request->Value = Compressed; } + Request->Exists = true; + Request->TotalSize = Compressed.GetRawSize(); + Request->TotalSizeKnown = true; + Request->Source = "LOCAL"sv; } } - if (!Chunk.Exists && EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryRemote)) - { - Chunk.Upstream.Policy = ConvertToUpstream(Chunk.DownstreamPolicy); - UpstreamPayloadRequests.push_back(&Chunk.Upstream); - } + } + if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) + { + Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy); + OutUpstreamChunks.push_back(Request->Key); } } - else + } +} + +void +HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks) +{ + using namespace cache::detail; + + for (ChunkRequest* Request : ValueRequests) + { + if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { - if (Chunk.KeyRequest->Exists) + ZenCacheValue CacheValue; + if (m_CacheStore.Get(Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) { - if (Chunk.KeyRequest->CacheValue && IsCompressedBinary(Chunk.KeyRequest->CacheValue.GetContentType())) + if (IsCompressedBinary(CacheValue.Value.GetContentType())) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk.KeyRequest->CacheValue)); - if (Compressed) + CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); + if (Result) { - if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { - Chunk.Value = Compressed; + Request->Value = Result; } - Chunk.Exists = true; - Chunk.TotalSize = Compressed.GetRawSize(); - Chunk.TotalSizeKnown = true; - Chunk.Source = Chunk.KeyRequest->Source; - Chunk.Upstream.ChunkId = IoHash::FromBLAKE3(Compressed.GetRawHash()); + Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash()); + Request->Exists = true; + Request->TotalSize = Result.GetRawSize(); + Request->TotalSizeKnown = true; + Request->Source = "LOCAL"sv; } } } } + if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) + { + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal)) + { + // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally + Request->Key->RawOffset = 0; + Request->Key->RawSize = UINT64_MAX; + } + OutUpstreamChunks.push_back(Request->Key); + } } +} + +void +HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests) +{ + using namespace cache::detail; - if (!UpstreamPayloadRequests.empty()) + if (!UpstreamChunks.empty()) { - const auto OnCacheValueGetComplete = [this](CacheValueGetCompleteParams&& Params) { + const auto OnCacheValueGetComplete = [this, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) { if (Params.RawHash == Params.RawHash.Zero) { return; } - ChunkRequestData& Chunk = - *reinterpret_cast<ChunkRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(ChunkRequestData, Upstream)); - if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal) || - !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + CacheChunkRequest& Key = Params.Request; + size_t RequestIndex = std::distance(RequestKeys.data(), &Key); + ChunkRequest& Request = Requests[RequestIndex]; + if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) || + !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); - if (!Compressed || Compressed.GetRawSize() != Params.RawSize) + if (!Compressed || Compressed.GetRawSize() != Params.RawSize || + IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash) { return; } - if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal)) + if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal)) { - m_CidStore.AddChunk(Compressed); + if (Request.IsRecordRequest) + { + m_CidStore.AddChunk(Compressed); + } + else + { + m_CacheStore.Put(Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value}); + } } - if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { - Chunk.Value = std::move(Compressed); + Request.Value = std::move(Compressed); } } - Chunk.Exists = true; - Chunk.TotalSize = Params.RawSize; - Chunk.TotalSizeKnown = true; - Chunk.Source = "UPSTREAM"sv; + Key.ChunkId = Params.RawHash; + Request.Exists = true; + Request.TotalSize = Params.RawSize; + Request.TotalSizeKnown = true; + Request.Source = "UPSTREAM"sv; m_CacheStats.UpstreamHitCount++; }; - m_UpstreamCache.GetCacheValues(UpstreamPayloadRequests, std::move(OnCacheValueGetComplete)); + m_UpstreamCache.GetCacheValues(UpstreamChunks, std::move(OnCacheValueGetComplete)); } } void -HttpStructuredCacheService::GetCacheChunks_SendResults(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, - zen::HttpServerRequest& HttpRequest) +HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests, + zen::HttpServerRequest& HttpRequest) { - using namespace GetCacheChunks::detail; + using namespace cache::detail; CbPackage RpcResponse; CbObjectWriter Writer; Writer.BeginArray("Result"sv); - for (ChunkRequestData& Chunk : Chunks) + for (ChunkRequest& Request : Requests) { Writer.BeginObject(); { - if (Chunk.Exists) + if (Request.Exists) { - Writer.AddHash("RawHash"sv, Chunk.Upstream.ChunkId); - if (Chunk.Value && !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + Writer.AddHash("RawHash"sv, Request.Key->ChunkId); + if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { - RpcResponse.AddAttachment(CbAttachment(Chunk.Value)); + RpcResponse.AddAttachment(CbAttachment(Request.Value)); } else { - Writer.AddInteger("RawSize"sv, Chunk.TotalSize); + Writer.AddInteger("RawSize"sv, Request.TotalSize); } - ZEN_DEBUG("CHUNKHIT - '{}/{}/{}' {} '{}' ({})", - Chunk.Upstream.Key.Bucket, - Chunk.Upstream.Key.Hash, - Chunk.Upstream.ValueId, - NiceBytes(Chunk.TotalSize), - Chunk.IsRecordRequest ? "Record"sv : "Value"sv, - Chunk.Source); + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + Request.Key->Key.Bucket, + Request.Key->Key.Hash, + Request.Key->ValueId, + NiceBytes(Request.TotalSize), + Request.IsRecordRequest ? "Record"sv : "Value"sv, + Request.Source); m_CacheStats.HitCount++; } - else if (!EnumHasAnyFlags(Chunk.DownstreamPolicy, CachePolicy::Query)) + else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query)) { - ZEN_DEBUG("CHUNKSKIP - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId); + ZEN_DEBUG("SKIP - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); } else { - ZEN_DEBUG("MISS - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId); + ZEN_DEBUG("MISS - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); m_CacheStats.MissCount++; } } diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 9d60c5bf4..3477e6c4f 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -17,6 +17,8 @@ class logger; namespace zen { +struct CacheChunkRequest; +struct CacheKeyRequest; class CasStore; class CidStore; class CbObjectView; @@ -26,10 +28,10 @@ class UpstreamCache; class ZenCacheStore; enum class CachePolicy : uint32_t; -namespace GetCacheChunks::detail { - struct KeyRequestData; - struct ChunkRequestData; -} // namespace GetCacheChunks::detail +namespace cache::detail { + struct Record; + struct ChunkRequest; +} // namespace cache::detail /** * Structured cache service. Imposes constraints on keys, supports blobs and @@ -114,12 +116,27 @@ private: virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); - bool TryGetCacheChunks_Parse(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests, - std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, - CbObjectView RpcRequest); - void GetCacheChunks_LoadKeys(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests); - void GetCacheChunks_LoadChunks(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks); - void GetCacheChunks_SendResults(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, zen::HttpServerRequest& HttpRequest); + /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ + bool ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::Record>& Records, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + CbObjectView RpcRequest); + /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */ + void GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::Record>& Records, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks); + /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */ + void GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks); + /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */ + void GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests); + /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ + void WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests, zen::HttpServerRequest& HttpRequest); spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; diff --git a/zenutil/include/zenutil/cache/cachepolicy.h b/zenutil/include/zenutil/cache/cachepolicy.h index b249982fc..9a745e42c 100644 --- a/zenutil/include/zenutil/cache/cachepolicy.h +++ b/zenutil/include/zenutil/cache/cachepolicy.h @@ -10,7 +10,6 @@ #include <gsl/gsl-lite.hpp> #include <span> - namespace zen::Private { class ICacheRecordPolicyShared; } |