diff options
| author | mattpetersepic <[email protected]> | 2022-02-09 09:03:37 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-02-09 09:03:37 -0700 |
| commit | adc7927757c1f917a00032268ba1fa1e26562d4f (patch) | |
| tree | 2747a0ceec1b92913d06db2740774f2d0e016b6a /zenserver/cache/structuredcache.cpp | |
| parent | Remove the backwards compatibility for the Zen CachePolicy changes no… (#49) (diff) | |
| download | zen-adc7927757c1f917a00032268ba1fa1e26562d4f.tar.xz zen-adc7927757c1f917a00032268ba1fa1e26562d4f.zip | |
Simplify HandleRpcGetCacheChunks (#53)
Refactor HandleRpcGetCacheChunks to reduce complexity. Port CacheStore tests from Unreal.
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 552 |
1 files changed, 292 insertions, 260 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index e23030e24..3d5359188 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1537,433 +1537,465 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } -namespace GetCacheChunks::detail { +namespace cache::detail { - struct ValueData + struct RecordValue { Oid ValueId; IoHash ContentId; uint64_t RawSize; }; - struct KeyRequestData - { - CacheKeyRequest Upstream; - IoBuffer CacheValue; - std::vector<ValueData> Values; - CachePolicy DownstreamRecordPolicy; - CachePolicy DownstreamPolicy; - std::string_view Source; - bool Exists = false; - bool HasRequest = false; - bool HasRecordRequest = false; - bool HasValueRequest = false; - bool ValuesRead = false; + struct Record + { + IoBuffer CacheValue; + std::vector<RecordValue> Values; + std::string_view Source; + CachePolicy DownstreamPolicy; + bool Exists = false; + bool HasRequest = false; + bool ValuesRead = false; }; - struct ChunkRequestData - { - CacheChunkRequest Upstream; - KeyRequestData* KeyRequest; - size_t KeyRequestIndex; - CachePolicy DownstreamPolicy; - CompressedBuffer Value; - std::string_view Source; - uint64_t TotalSize = 0; - bool Exists = false; - bool IsRecordRequest = false; - bool TotalSizeKnown = false; + struct ChunkRequest + { + CacheChunkRequest* Key = nullptr; + Record* Record = nullptr; + CompressedBuffer Value; + std::string_view Source; + uint64_t TotalSize = 0; + uint64_t RequestedSize = 0; + uint64_t RequestedOffset = 0; + CachePolicy DownstreamPolicy; + bool Exists = false; + bool TotalSizeKnown = false; + bool IsRecordRequest = false; }; -} // namespace GetCacheChunks::detail +} // namespace cache::detail void HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) { - using namespace GetCacheChunks::detail; + using namespace cache::detail; ZEN_TRACE_CPU("Z$::RpcGetCacheChunks"); - std::vector<KeyRequestData> KeyRequests; - std::vector<ChunkRequestData> Chunks; - if (!TryGetCacheChunks_Parse(KeyRequests, Chunks, RpcRequest)) + std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream + std::vector<Record> Records; // Scratch-space data about a Record when fulfilling RecordRequests + std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream + std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest + std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key + std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key + std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream + + // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests + if (!ParseGetCacheChunksRequest(RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } - GetCacheChunks_LoadKeys(KeyRequests); - GetCacheChunks_LoadChunks(Chunks); - GetCacheChunks_SendResults(Chunks, HttpRequest); + + // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we + // have it locally, and otherwise append a request for the payload to UpstreamChunks + GetLocalCacheRecords(RecordKeys, Records, RecordRequests, UpstreamChunks); + + // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks + GetLocalCacheValues(ValueRequests, UpstreamChunks); + + // Call GetCacheChunks on the upstream for any payloads we do not have locally + GetUpstreamCacheChunks(UpstreamChunks, RequestKeys, Requests); + + // Send the payload and descriptive data about each chunk to the client + WriteGetCacheChunksResponse(Requests, HttpRequest); } bool -HttpStructuredCacheService::TryGetCacheChunks_Parse(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests, - std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, - CbObjectView RpcRequest) +HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::Record>& Records, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + CbObjectView RpcRequest) { - using namespace GetCacheChunks::detail; + using namespace cache::detail; ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; + CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); + size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num()); + + // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed, + // we will need to change the pointers to indexes to handle reallocations. + RecordKeys.reserve(NumRequests); + Records.reserve(NumRequests); + RequestKeys.reserve(NumRequests); + Requests.reserve(NumRequests); + RecordRequests.reserve(NumRequests); + ValueRequests.reserve(NumRequests); + + CacheKeyRequest* PreviousRecordKey = nullptr; + Record* PreviousRecord = nullptr; - KeyRequestData* PreviousKeyRequest = nullptr; - CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); - Chunks.reserve(ChunkRequestsArray.Num()); for (CbFieldView RequestView : ChunkRequestsArray) { - ChunkRequestData& Chunk = Chunks.emplace_back(); - CbObjectView RequestObject = RequestView.AsObjectView(); + CbObjectView RequestObject = RequestView.AsObjectView(); + CacheChunkRequest& RequestKey = RequestKeys.emplace_back(); + ChunkRequest& Request = Requests.emplace_back(); + Request.Key = &RequestKey; CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); CbFieldView HashField = KeyObject["Hash"sv]; - Chunk.Upstream.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash()); - if (Chunk.Upstream.Key.Bucket.empty() || HashField.HasError()) + RequestKey.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash()); + if (RequestKey.Key.Bucket.empty() || HashField.HasError()) { ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest."); return false; } - KeyRequestData* KeyRequest = nullptr; - if (!PreviousKeyRequest || PreviousKeyRequest->Upstream.Key < Chunk.Upstream.Key) - { - KeyRequest = &KeyRequests.emplace_back(); - KeyRequest->Upstream.Key = Chunk.Upstream.Key; - PreviousKeyRequest = KeyRequest; - } - else if (!(Chunk.Upstream.Key < PreviousKeyRequest->Upstream.Key)) + RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash(); + RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId(); + RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64(); + RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); + Request.RequestedSize = RequestKey.RawSize; + Request.RequestedOffset = RequestKey.RawOffset; + std::string_view PolicyText = RequestObject["Policy"sv].AsString(); + Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; + Request.IsRecordRequest = (bool)RequestKey.ValueId; + + if (!Request.IsRecordRequest) { - KeyRequest = PreviousKeyRequest; + ValueRequests.push_back(&Request); } else { - ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", - Chunk.Upstream.Key.Bucket, - Chunk.Upstream.Key.Hash, - PreviousKeyRequest->Upstream.Key.Bucket, - PreviousKeyRequest->Upstream.Key.Hash); - return false; - } - Chunk.KeyRequestIndex = std::distance(KeyRequests.data(), KeyRequest); - - Chunk.Upstream.ChunkId = RequestObject["ChunkId"sv].AsHash(); - Chunk.Upstream.ValueId = RequestObject["ValueId"sv].AsObjectId(); - Chunk.Upstream.RawOffset = RequestObject["RawOffset"sv].AsUInt64(); - Chunk.Upstream.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); - std::string_view PolicyText = RequestObject["Policy"sv].AsString(); - Chunk.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - Chunk.IsRecordRequest = (bool)Chunk.Upstream.ValueId; + RecordRequests.push_back(&Request); + CacheKeyRequest* RecordKey = nullptr; + Record* Record = nullptr; - if (!Chunk.IsRecordRequest || Chunk.Upstream.ChunkId == IoHash::Zero) - { - KeyRequest->DownstreamPolicy = - KeyRequest->HasRequest ? Union(KeyRequest->DownstreamPolicy, Chunk.DownstreamPolicy) : Chunk.DownstreamPolicy; - KeyRequest->HasRequest = true; - (Chunk.IsRecordRequest ? KeyRequest->HasRecordRequest : KeyRequest->HasValueRequest) = true; + if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key) + { + RecordKey = &RecordKeys.emplace_back(); + PreviousRecordKey = RecordKey; + Record = &Records.emplace_back(); + PreviousRecord = Record; + RecordKey->Key = RequestKey.Key; + } + else if (RequestKey.Key == PreviousRecordKey->Key) + { + RecordKey = PreviousRecordKey; + Record = PreviousRecord; + } + else + { + ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", + RequestKey.Key.Bucket, + RequestKey.Key.Hash, + PreviousRecordKey->Key.Bucket, + PreviousRecordKey->Key.Hash); + return false; + } + Request.Record = Record; + if (RequestKey.ChunkId == RequestKey.ChunkId.Zero) + { + Record->DownstreamPolicy = + Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy; + Record->HasRequest = true; + } } } - if (Chunks.empty()) + if (Requests.empty()) { return false; } - for (ChunkRequestData& Chunk : Chunks) - { - Chunk.KeyRequest = &KeyRequests[Chunk.KeyRequestIndex]; - } return true; } void -HttpStructuredCacheService::GetCacheChunks_LoadKeys(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests) +HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::Record>& Records, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks) { - using namespace GetCacheChunks::detail; + using namespace cache::detail; std::vector<CacheKeyRequest*> UpstreamRecordRequests; - std::vector<KeyRequestData*> UpstreamValueRequests; - for (KeyRequestData& KeyRequest : KeyRequests) + for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) { - if (KeyRequest.HasRequest) + CacheKeyRequest& RecordKey = RecordKeys[RecordIndex]; + Record& Record = Records[RecordIndex]; + if (Record.HasRequest) { - if (KeyRequest.HasRecordRequest) - { - KeyRequest.DownstreamRecordPolicy = KeyRequest.DownstreamPolicy | CachePolicy::SkipData | CachePolicy::SkipMeta; - } + Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta; - if (!KeyRequest.Exists && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryLocal)) + if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal)) { - // There's currently no interface for checking only whether a CacheValue exists without loading it, - // so we load it here even if SkipData is true and its a CacheValue request. ZenCacheValue CacheValue; - if (m_CacheStore.Get(KeyRequest.Upstream.Key.Bucket, KeyRequest.Upstream.Key.Hash, CacheValue)) + if (m_CacheStore.Get(RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) { - KeyRequest.Exists = true; - KeyRequest.CacheValue = std::move(CacheValue.Value); - KeyRequest.Source = "LOCAL"sv; + Record.Exists = true; + Record.CacheValue = std::move(CacheValue.Value); + Record.Source = "LOCAL"sv; } } - if (!KeyRequest.Exists) + if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote)) { - // At most one of RecordRequest or ValueRequest will succeed for the upstream request of the key a given key, but we don't - // know which, - // and if the requests (from arbitrary Unreal Class code) includes both types of request for a key, we want to ask for both - // kinds and pass the request that uses the one that succeeds. - if (KeyRequest.HasRecordRequest && EnumHasAllFlags(KeyRequest.DownstreamRecordPolicy, CachePolicy::QueryRemote)) - { - KeyRequest.Upstream.Policy = CacheRecordPolicy(ConvertToUpstream(KeyRequest.DownstreamRecordPolicy)); - UpstreamRecordRequests.push_back(&KeyRequest.Upstream); - } - if (KeyRequest.HasValueRequest && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryRemote)) - { - UpstreamValueRequests.push_back(&KeyRequest); - } + RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy)); + UpstreamRecordRequests.push_back(&RecordKey); } } } if (!UpstreamRecordRequests.empty()) { - const auto OnCacheRecordGetComplete = [this](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; } + CacheKeyRequest& RecordKey = Params.Request; + size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); + Record& Record = Records[RecordIndex]; - KeyRequestData& KeyRequest = - *reinterpret_cast<KeyRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(KeyRequestData, Upstream)); - const CacheKey& Key = KeyRequest.Upstream.Key; - KeyRequest.Exists = true; + const CacheKey& Key = RecordKey.Key; + Record.Exists = true; CbObject ObjectBuffer = CbObject::Clone(Params.Record); - KeyRequest.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - KeyRequest.CacheValue.SetContentType(ZenContentType::kCbObject); - KeyRequest.Source = "UPSTREAM"sv; + Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + Record.CacheValue.SetContentType(ZenContentType::kCbObject); + Record.Source = "UPSTREAM"sv; - if (EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::StoreLocal)) + if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) { - m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue}); + m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); } }; m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } - if (!UpstreamValueRequests.empty()) - { - for (KeyRequestData* KeyRequestPtr : UpstreamValueRequests) - { - KeyRequestData& KeyRequest = *KeyRequestPtr; - CacheKey& Key = KeyRequest.Upstream.Key; - GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kBinary); - if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType())) - { - CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)); - if (Result) - { - KeyRequest.CacheValue = std::move(UpstreamResult.Value); - KeyRequest.CacheValue.SetContentType(ZenContentType::kCompressedBinary); - KeyRequest.Exists = true; - KeyRequest.Source = "UPSTREAM"sv; - // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement - // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive - // for us to keep the data only on the upstream server. - // if (EnumHasAllFlags(KeyRequest->DownstreamValuePolicy, CachePolicy::StoreLocal)) - { - m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue}); - } - } - } - } - } -} - -void -HttpStructuredCacheService::GetCacheChunks_LoadChunks(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks) -{ - using namespace GetCacheChunks::detail; - std::vector<CacheChunkRequest*> UpstreamPayloadRequests; - for (ChunkRequestData& Chunk : Chunks) + for (ChunkRequest* Request : RecordRequests) { - if (Chunk.IsRecordRequest) + if (Request->Key->ChunkId == IoHash::Zero) { - if (Chunk.Upstream.ChunkId == IoHash::Zero) + // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) + // is missing, parse the cache record and try to find the raw hash from the ValueId. + Record& Record = *Request->Record; + if (!Record.ValuesRead) { - // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) - // is missing, parse the cache record and try to find the raw hash from the ValueId. - KeyRequestData& KeyRequest = *Chunk.KeyRequest; - if (!KeyRequest.ValuesRead) + Record.ValuesRead = true; + if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject) { - KeyRequest.ValuesRead = true; - if (KeyRequest.CacheValue && KeyRequest.CacheValue.GetContentType() == ZenContentType::kCbObject) + CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); + CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); + Record.Values.reserve(ValuesArray.Num()); + for (CbFieldView ValueField : ValuesArray) { - CbObjectView RecordObject = CbObjectView(KeyRequest.CacheValue.GetData()); - CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); - KeyRequest.Values.reserve(ValuesArray.Num()); - for (CbFieldView ValueField : ValuesArray) + CbObjectView ValueObject = ValueField.AsObjectView(); + Oid ValueId = ValueObject["Id"sv].AsObjectId(); + CbFieldView RawHashField = ValueObject["RawHash"sv]; + IoHash RawHash = RawHashField.AsBinaryAttachment(); + if (ValueId && !RawHashField.HasError()) { - CbObjectView ValueObject = ValueField.AsObjectView(); - Oid ValueId = ValueObject["Id"sv].AsObjectId(); - CbFieldView RawHashField = ValueObject["RawHash"sv]; - IoHash RawHash = RawHashField.AsBinaryAttachment(); - if (ValueId && !RawHashField.HasError()) - { - KeyRequest.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); - } + Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); } } } + } - for (const ValueData& Value : KeyRequest.Values) + for (const RecordValue& Value : Record.Values) + { + if (Value.ValueId == Request->Key->ValueId) { - if (Value.ValueId == Chunk.Upstream.ValueId) - { - Chunk.Upstream.ChunkId = Value.ContentId; - Chunk.TotalSize = Value.RawSize; - Chunk.TotalSizeKnown = true; - break; - } + Request->Key->ChunkId = Value.ContentId; + Request->TotalSize = Value.RawSize; + Request->TotalSizeKnown = true; + break; } } + } - // Now load the ContentId from the local ContentIdStore or from the upstream - if (Chunk.Upstream.ChunkId != IoHash::Zero) + // Now load the ContentId from the local ContentIdStore or from the upstream + if (Request->Key->ChunkId != IoHash::Zero) + { + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { - if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryLocal)) + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown) { - if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData) && Chunk.TotalSizeKnown) + if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) { - if (m_CidStore.ContainsChunk(Chunk.Upstream.ChunkId)) - { - Chunk.Exists = true; - Chunk.Source = "LOCAL"sv; - } + Request->Exists = true; + Request->Source = "LOCAL"sv; } - else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Upstream.ChunkId)) + } + else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + if (Compressed) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); - if (Compressed) + if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { - if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) - { - Chunk.Value = Compressed; - } - Chunk.Exists = true; - Chunk.TotalSize = Compressed.GetRawSize(); - Chunk.TotalSizeKnown = true; - Chunk.Source = "LOCAL"sv; + Request->Value = Compressed; } + Request->Exists = true; + Request->TotalSize = Compressed.GetRawSize(); + Request->TotalSizeKnown = true; + Request->Source = "LOCAL"sv; } } - if (!Chunk.Exists && EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryRemote)) - { - Chunk.Upstream.Policy = ConvertToUpstream(Chunk.DownstreamPolicy); - UpstreamPayloadRequests.push_back(&Chunk.Upstream); - } + } + if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) + { + Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy); + OutUpstreamChunks.push_back(Request->Key); } } - else + } +} + +void +HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks) +{ + using namespace cache::detail; + + for (ChunkRequest* Request : ValueRequests) + { + if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { - if (Chunk.KeyRequest->Exists) + ZenCacheValue CacheValue; + if (m_CacheStore.Get(Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) { - if (Chunk.KeyRequest->CacheValue && IsCompressedBinary(Chunk.KeyRequest->CacheValue.GetContentType())) + if (IsCompressedBinary(CacheValue.Value.GetContentType())) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk.KeyRequest->CacheValue)); - if (Compressed) + CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); + if (Result) { - if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { - Chunk.Value = Compressed; + Request->Value = Result; } - Chunk.Exists = true; - Chunk.TotalSize = Compressed.GetRawSize(); - Chunk.TotalSizeKnown = true; - Chunk.Source = Chunk.KeyRequest->Source; - Chunk.Upstream.ChunkId = IoHash::FromBLAKE3(Compressed.GetRawHash()); + Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash()); + Request->Exists = true; + Request->TotalSize = Result.GetRawSize(); + Request->TotalSizeKnown = true; + Request->Source = "LOCAL"sv; } } } } + if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) + { + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal)) + { + // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally + Request->Key->RawOffset = 0; + Request->Key->RawSize = UINT64_MAX; + } + OutUpstreamChunks.push_back(Request->Key); + } } +} + +void +HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests) +{ + using namespace cache::detail; - if (!UpstreamPayloadRequests.empty()) + if (!UpstreamChunks.empty()) { - const auto OnCacheValueGetComplete = [this](CacheValueGetCompleteParams&& Params) { + const auto OnCacheValueGetComplete = [this, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) { if (Params.RawHash == Params.RawHash.Zero) { return; } - ChunkRequestData& Chunk = - *reinterpret_cast<ChunkRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(ChunkRequestData, Upstream)); - if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal) || - !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + CacheChunkRequest& Key = Params.Request; + size_t RequestIndex = std::distance(RequestKeys.data(), &Key); + ChunkRequest& Request = Requests[RequestIndex]; + if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) || + !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); - if (!Compressed || Compressed.GetRawSize() != Params.RawSize) + if (!Compressed || Compressed.GetRawSize() != Params.RawSize || + IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash) { return; } - if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal)) + if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal)) { - m_CidStore.AddChunk(Compressed); + if (Request.IsRecordRequest) + { + m_CidStore.AddChunk(Compressed); + } + else + { + m_CacheStore.Put(Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value}); + } } - if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { - Chunk.Value = std::move(Compressed); + Request.Value = std::move(Compressed); } } - Chunk.Exists = true; - Chunk.TotalSize = Params.RawSize; - Chunk.TotalSizeKnown = true; - Chunk.Source = "UPSTREAM"sv; + Key.ChunkId = Params.RawHash; + Request.Exists = true; + Request.TotalSize = Params.RawSize; + Request.TotalSizeKnown = true; + Request.Source = "UPSTREAM"sv; m_CacheStats.UpstreamHitCount++; }; - m_UpstreamCache.GetCacheValues(UpstreamPayloadRequests, std::move(OnCacheValueGetComplete)); + m_UpstreamCache.GetCacheValues(UpstreamChunks, std::move(OnCacheValueGetComplete)); } } void -HttpStructuredCacheService::GetCacheChunks_SendResults(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, - zen::HttpServerRequest& HttpRequest) +HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests, + zen::HttpServerRequest& HttpRequest) { - using namespace GetCacheChunks::detail; + using namespace cache::detail; CbPackage RpcResponse; CbObjectWriter Writer; Writer.BeginArray("Result"sv); - for (ChunkRequestData& Chunk : Chunks) + for (ChunkRequest& Request : Requests) { Writer.BeginObject(); { - if (Chunk.Exists) + if (Request.Exists) { - Writer.AddHash("RawHash"sv, Chunk.Upstream.ChunkId); - if (Chunk.Value && !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + Writer.AddHash("RawHash"sv, Request.Key->ChunkId); + if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { - RpcResponse.AddAttachment(CbAttachment(Chunk.Value)); + RpcResponse.AddAttachment(CbAttachment(Request.Value)); } else { - Writer.AddInteger("RawSize"sv, Chunk.TotalSize); + Writer.AddInteger("RawSize"sv, Request.TotalSize); } - ZEN_DEBUG("CHUNKHIT - '{}/{}/{}' {} '{}' ({})", - Chunk.Upstream.Key.Bucket, - Chunk.Upstream.Key.Hash, - Chunk.Upstream.ValueId, - NiceBytes(Chunk.TotalSize), - Chunk.IsRecordRequest ? "Record"sv : "Value"sv, - Chunk.Source); + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + Request.Key->Key.Bucket, + Request.Key->Key.Hash, + Request.Key->ValueId, + NiceBytes(Request.TotalSize), + Request.IsRecordRequest ? "Record"sv : "Value"sv, + Request.Source); m_CacheStats.HitCount++; } - else if (!EnumHasAnyFlags(Chunk.DownstreamPolicy, CachePolicy::Query)) + else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query)) { - ZEN_DEBUG("CHUNKSKIP - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId); + ZEN_DEBUG("SKIP - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); } else { - ZEN_DEBUG("MISS - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId); + ZEN_DEBUG("MISS - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); m_CacheStats.MissCount++; } } |