diff options
| author | Stefan Boberg <[email protected]> | 2021-11-18 14:33:44 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-11-18 14:33:44 +0100 |
| commit | e53df312f3c4dcef19add9cd26afc324557b1f5a (patch) | |
| tree | a3d7b59f29e484d48edffb2a26bbb0dd2d95533d /zenserver/cache/structuredcache.cpp | |
| parent | gc: implemented timestamped snapshot persistence (diff) | |
| parent | Change error code for failed upsteam apply (diff) | |
| download | zen-e53df312f3c4dcef19add9cd26afc324557b1f5a.tar.xz zen-e53df312f3c4dcef19add9cd26afc324557b1f5a.zip | |
merge from main
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 642 |
1 files changed, 479 insertions, 163 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 177cdbf55..53e1b1c61 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -14,7 +14,9 @@ #include <zencore/timer.h> #include <zenhttp/httpserver.h> #include <zenstore/CAS.h> +#include <zenutil/cache/cache.h> +//#include "cachekey.h" #include "monitoring/httpstats.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" @@ -36,115 +38,24 @@ using namespace std::literals; ////////////////////////////////////////////////////////////////////////// -namespace detail { namespace cacheopt { - constexpr std::string_view Local = "local"sv; - constexpr std::string_view Remote = "remote"sv; - constexpr std::string_view Data = "data"sv; - constexpr std::string_view Meta = "meta"sv; - constexpr std::string_view Value = "value"sv; - constexpr std::string_view Attachments = "attachments"sv; -}} // namespace detail::cacheopt - -////////////////////////////////////////////////////////////////////////// - -enum class CachePolicy : uint8_t -{ - None = 0, - QueryLocal = 1 << 0, - QueryRemote = 1 << 1, - Query = QueryLocal | QueryRemote, - StoreLocal = 1 << 2, - StoreRemote = 1 << 3, - Store = StoreLocal | StoreRemote, - SkipMeta = 1 << 4, - SkipValue = 1 << 5, - SkipAttachments = 1 << 6, - SkipData = SkipMeta | SkipValue | SkipAttachments, - SkipLocalCopy = 1 << 7, - Local = QueryLocal | StoreLocal, - Remote = QueryRemote | StoreRemote, - Default = Query | Store, - Disable = None, -}; - -gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy); - CachePolicy ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) { - CachePolicy QueryPolicy = CachePolicy::Query; - - { - std::string_view Opts = QueryParams.GetValue("query"sv); - if (!Opts.empty()) - { - QueryPolicy = CachePolicy::None; - ForEachStrTok(Opts, ',', [&QueryPolicy](const std::string_view& Opt) { - if (Opt == detail::cacheopt::Local) - { - QueryPolicy |= CachePolicy::QueryLocal; - } - if (Opt == detail::cacheopt::Remote) - { - QueryPolicy |= CachePolicy::QueryRemote; - } - return true; - }); - } - } - - CachePolicy StorePolicy = CachePolicy::Store; - - { - std::string_view Opts = QueryParams.GetValue("store"sv); - if (!Opts.empty()) - { - StorePolicy = CachePolicy::None; - ForEachStrTok(Opts, ',', [&StorePolicy](const std::string_view& Opt) { - if (Opt == detail::cacheopt::Local) - { - StorePolicy |= CachePolicy::StoreLocal; - } - if (Opt == detail::cacheopt::Remote) - { - StorePolicy |= CachePolicy::StoreRemote; - } - return true; - }); - } - } - - CachePolicy SkipPolicy = CachePolicy::None; - - { - std::string_view Opts = QueryParams.GetValue("skip"sv); - if (!Opts.empty()) - { - ForEachStrTok(Opts, ',', [&SkipPolicy](const std::string_view& Opt) { - if (Opt == detail::cacheopt::Meta) - { - SkipPolicy |= CachePolicy::SkipMeta; - } - if (Opt == detail::cacheopt::Value) - { - SkipPolicy |= CachePolicy::SkipValue; - } - if (Opt == detail::cacheopt::Attachments) - { - SkipPolicy |= CachePolicy::SkipAttachments; - } - if (Opt == detail::cacheopt::Data) - { - SkipPolicy |= CachePolicy::SkipData; - } - return true; - }); - } - } + const CachePolicy QueryPolicy = zen::ParseQueryCachePolicy(QueryParams.GetValue("query"sv)); + const CachePolicy StorePolicy = zen::ParseStoreCachePolicy(QueryParams.GetValue("store"sv)); + const CachePolicy SkipPolicy = zen::ParseSkipCachePolicy(QueryParams.GetValue("skip"sv)); return QueryPolicy | StorePolicy | SkipPolicy; } +struct AttachmentCount +{ + uint32_t New = 0; + uint32_t Valid = 0; + uint32_t Invalid = 0; + uint32_t Total = 0; +}; + ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, @@ -207,6 +118,11 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { std::string_view Key = Request.RelativeUri(); + if (Key == "$rpc") + { + return HandleRpcRequest(Request); + } + if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); })) { // Bucket reference @@ -319,8 +235,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (ValidCount != AttachmentCount) { - Success = false; - ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments", + // Success = false; + ZEN_WARN("GET - '{}/{}' '{}' is partial, found '{}' of '{}' attachments", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType), @@ -543,52 +459,39 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request CbObjectView CacheRecord(Body.Data()); std::vector<IoHash> ValidAttachments; - uint32_t AttachmentCount = 0; + int32_t TotalCount = 0; - CacheRecord.IterateAttachments([this, &AttachmentCount, &ValidAttachments](CbFieldView AttachmentHash) { + CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments](CbFieldView AttachmentHash) { const IoHash Hash = AttachmentHash.AsHash(); if (m_CidStore.ContainsChunk(Hash)) { ValidAttachments.emplace_back(Hash); } - AttachmentCount++; + TotalCount++; }); - const uint32_t ValidCount = static_cast<uint32_t>(ValidAttachments.size()); - const bool ValidCacheRecord = ValidCount == AttachmentCount; - - if (ValidCacheRecord) - { - ZEN_DEBUG("PUT - '{}/{}' {} '{}', {} attachments", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Body.Size()), - ToString(ContentType), - ValidCount); + ZEN_DEBUG("PUT - '{}/{}' {} '{}' attachments '{}/{}' (valid/total)", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Body.Size()), + ToString(ContentType), + TotalCount, + ValidAttachments.size()); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); + Body.SetContentType(ZenContentType::kCbObject); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - if (StoreUpstream) - { - ZEN_ASSERT(m_UpstreamCache); - auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, - .CacheKey = {Ref.BucketSegment, Ref.HashKey}, - .PayloadIds = std::move(ValidAttachments)}); - } + const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size()); - Request.WriteResponse(HttpResponseCode::Created); - } - else + if (StoreUpstream && !IsPartialRecord) { - ZEN_WARN("PUT - '{}/{}' '{}' FAILED, found {}/{} attachments", - Ref.BucketSegment, - Ref.HashKey, - ToString(ContentType), - ValidCount, - AttachmentCount); - - Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Missing attachments"sv); + ZEN_ASSERT(m_UpstreamCache); + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(ValidAttachments)}); } + + Request.WriteResponse(HttpResponseCode::Created); } else if (ContentType == HttpContentType::kCbPackage) { @@ -600,16 +503,15 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); } - CbObject CacheRecord = Package.GetObject(); - - std::span<const CbAttachment> Attachments = Package.GetAttachments(); - std::vector<IoHash> ValidAttachments; - int32_t NewAttachmentCount = 0; + CbObject CacheRecord = Package.GetObject(); + AttachmentCount Count; + std::vector<IoHash> ValidAttachments; - ValidAttachments.reserve(Attachments.size()); + ValidAttachments.reserve(Package.GetAttachments().size()); - CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &NewAttachmentCount](CbFieldView AttachmentHash) { - if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) + CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &Count](CbFieldView HashView) { + const IoHash Hash = HashView.AsHash(); + if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) { if (Attachment->IsCompressedBinary()) { @@ -620,8 +522,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (InsertResult.New) { - NewAttachmentCount++; + Count.New++; } + Count.Valid++; } else { @@ -629,40 +532,40 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request Ref.BucketSegment, Ref.HashKey, ToString(HttpContentType::kCbPackage), - AttachmentHash.AsHash()); + Hash); + Count.Invalid++; } } - else + else if (m_CidStore.ContainsChunk(Hash)) { - ZEN_WARN("PUT - '{}/{}' '{}' FAILED, missing attachment '{}'", - Ref.BucketSegment, - Ref.HashKey, - ToString(HttpContentType::kCbPackage), - AttachmentHash.AsHash()); + ValidAttachments.emplace_back(Hash); + Count.Valid++; } + Count.Total++; }); - const bool AttachmentsValid = ValidAttachments.size() == Attachments.size(); - - if (!AttachmentsValid) + if (Count.Invalid > 0) { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); } - ZEN_DEBUG("PUT - '{}/{}' {} '{}', {}/{} new attachments", + ZEN_DEBUG("PUT - '{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.GetSize()), ToString(ContentType), - NewAttachmentCount, - Attachments.size()); + Count.New, + Count.Valid, + Count.Total); IoBuffer CacheRecordValue = CacheRecord.GetBuffer().AsIoBuffer(); CacheRecordValue.SetContentType(ZenContentType::kCbObject); m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); - if (StoreUpstream) + const bool IsPartialRecord = Count.Valid != Count.Total; + + if (StoreUpstream && !IsPartialRecord) { ZEN_ASSERT(m_UpstreamCache); auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, @@ -708,8 +611,7 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques if (QueryUpstream) { - if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); - UpstreamResult.Success) + if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { @@ -864,6 +766,420 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& } void +HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) +{ + switch (auto Verb = Request.RequestVerb()) + { + using enum HttpVerb; + + case kPost: + { + const HttpContentType ContentType = Request.RequestContentType(); + const HttpContentType AcceptType = Request.AcceptContentType(); + + if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + Request.WriteResponseAsync( + [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) { + const std::string_view Method = RpcRequest["Method"sv].AsString(); + if (Method == "GetCacheRecords"sv) + { + HandleRpcGetCacheRecords(AsyncRequest, RpcRequest); + } + else if (Method == "GetCachePayloads"sv) + { + HandleRpcGetCachePayloads(AsyncRequest, RpcRequest); + } + else + { + AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + } + }); + } + break; + default: + Request.WriteResponse(HttpResponseCode::BadRequest); + break; + } +} + +void +HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest) +{ + using namespace fmt::literals; + + CbPackage RpcResponse; + CacheRecordPolicy Policy; + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::vector<CacheKey> CacheKeys; + std::vector<IoBuffer> CacheValues; + std::vector<size_t> UpstreamRequests; + + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); + + CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy); + + const bool PartialOnError = Policy.HasRecordPolicy(CachePolicy::PartialOnError); + const bool SkipAttachments = Policy.HasRecordPolicy(CachePolicy::SkipAttachments); + const bool QueryRemote = Policy.HasRecordPolicy(CachePolicy::QueryRemote) && m_UpstreamCache; + + for (CbFieldView KeyView : Params["CacheKeys"sv]) + { + CbObjectView KeyObject = KeyView.AsObjectView(); + CacheKeys.push_back(CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash())); + } + + if (CacheKeys.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + CacheValues.resize(CacheKeys.size()); + + for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys) + { + ZenCacheValue CacheValue; + uint32_t MissingCount = 0; + + if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) + { + CbObjectView CacheRecord(CacheValue.Value.Data()); + + if (!SkipAttachments) + { + CacheRecord.IterateAttachments([this, &MissingCount, &RpcResponse](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + } + else + { + MissingCount++; + } + }); + } + } + + if (CacheValue.Value && (MissingCount == 0 || PartialOnError)) + { + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", + Key.Bucket, + Key.Hash, + NiceBytes(CacheValue.Value.Size()), + ToString(CacheValue.Value.GetContentType())); + + CacheValues[KeyIndex] = std::move(CacheValue.Value); + m_CacheStats.HitCount++; + } + else if (QueryRemote) + { + UpstreamRequests.push_back(KeyIndex); + } + else + { + ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(partial)"sv : ""sv); + m_CacheStats.MissCount++; + } + + ++KeyIndex; + } + + if (!UpstreamRequests.empty() && m_UpstreamCache) + { + const auto OnCacheRecordGetComplete = + [this, &CacheKeys, &CacheValues, &RpcResponse, PartialOnError, SkipAttachments](CacheRecordGetCompleteParams&& Params) { + ZEN_ASSERT(Params.KeyIndex < CacheValues.size()); + + IoBuffer CacheValue; + AttachmentCount Count; + + if (Params.Record) + { + Params.Record.IterateAttachments([this, &RpcResponse, SkipAttachments, &Params, &Count](CbFieldView HashView) { + if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash())) + { + if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) + { + auto InsertResult = m_CidStore.AddChunk(Compressed); + if (InsertResult.New) + { + Count.New++; + } + Count.Valid++; + + if (!SkipAttachments) + { + RpcResponse.AddAttachment(CbAttachment(Compressed)); + } + } + else + { + ZEN_DEBUG("Uncompressed payload '{}' from upstream cache record '{}/{}'", + HashView.AsHash(), + Params.CacheKey.Bucket, + Params.CacheKey.Hash); + Count.Invalid++; + } + } + else if (m_CidStore.ContainsChunk(HashView.AsHash())) + { + Count.Valid++; + } + Count.Total++; + }); + + if ((Count.Valid == Count.Total) || PartialOnError) + { + CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer(); + } + } + + if (CacheValue) + { + ZEN_DEBUG("HIT - '{}/{}' {} '{}' attachments '{}/{}/{}' (new/valid/total) (UPSTREAM)", + Params.CacheKey.Bucket, + Params.CacheKey.Hash, + NiceBytes(CacheValue.GetSize()), + ToString(HttpContentType::kCbPackage), + Count.New, + Count.Valid, + Count.Total); + + CacheValue.SetContentType(ZenContentType::kCbObject); + + CacheValues[Params.KeyIndex] = CacheValue; + m_CacheStore.Put(Params.CacheKey.Bucket, Params.CacheKey.Hash, {.Value = CacheValue}); + + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + } + else + { + const bool IsPartial = Count.Valid != Count.Total; + ZEN_DEBUG("MISS - '{}/{}' {}", Params.CacheKey.Bucket, Params.CacheKey.Hash, IsPartial ? "(partial)"sv : ""sv); + m_CacheStats.MissCount++; + } + }; + + m_UpstreamCache->GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete)); + } + + CbObjectWriter ResponseObject; + + ResponseObject.BeginArray("Result"sv); + for (const IoBuffer& Value : CacheValues) + { + if (Value) + { + CbObjectView Record(Value.Data()); + ResponseObject << Record; + } + else + { + ResponseObject.AddNull(); + } + } + ResponseObject.EndArray(); + + RpcResponse.SetObject(ResponseObject.Save()); + + BinaryWriter MemStream; + RpcResponse.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +void +HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView RpcRequest) +{ + using namespace fmt::literals; + + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCachePayloads"sv); + + std::vector<CacheChunkRequest> ChunkRequests; + std::vector<size_t> UpstreamRequests; + std::vector<IoBuffer> Chunks; + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + + for (CbFieldView RequestView : Params["ChunkRequests"sv]) + { + CbObjectView RequestObject = RequestView.AsObjectView(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); + const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash(); + const Oid PayloadId = RequestObject["PayloadId"sv].AsObjectId(); + const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64(); + const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); + const uint32_t ChunkPolicy = RequestObject["Policy"sv].AsUInt32(); + + ChunkRequests.emplace_back(Key, ChunkId, PayloadId, RawOffset, RawSize, static_cast<CachePolicy>(ChunkPolicy)); + } + + if (ChunkRequests.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + Chunks.resize(ChunkRequests.size()); + + // Unreal uses a 12 byte ID to address cache record payloads. When the uncompressed hash (ChunkId) + // is missing, load the cache record and try to find the raw hash from the payload ID. + { + const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash { + if (CbObjectView ValueObject = Record["Value"sv].AsObjectView()) + { + const Oid Id = ValueObject["Id"sv].AsObjectId(); + if (Id == PayloadId) + { + return ValueObject["RawHash"sv].AsHash(); + } + } + + for (CbFieldView AttachmentView : Record["Attachments"sv]) + { + CbObjectView AttachmentObject = AttachmentView.AsObjectView(); + const Oid Id = AttachmentObject["Id"sv].AsObjectId(); + + if (Id == PayloadId) + { + return AttachmentObject["RawHash"sv].AsHash(); + } + } + + return IoHash::Zero; + }; + + CacheKey CurrentKey = CacheKey::Empty; + IoBuffer CurrentRecordBuffer; + + for (CacheChunkRequest& ChunkRequest : ChunkRequests) + { + if (ChunkRequest.ChunkId != IoHash::Zero) + { + continue; + } + + if (ChunkRequest.Key != CurrentKey) + { + CurrentKey = ChunkRequest.Key; + + ZenCacheValue CacheValue; + if (m_CacheStore.Get(CurrentKey.Bucket, CurrentKey.Hash, CacheValue)) + { + CurrentRecordBuffer = CacheValue.Value; + } + } + + if (CurrentRecordBuffer) + { + ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId); + } + } + } + + for (size_t RequestIndex = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests) + { + const bool QueryLocal = (ChunkRequest.Policy & CachePolicy::QueryLocal) == CachePolicy::QueryLocal; + const bool QueryRemote = (ChunkRequest.Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote; + + if (QueryLocal) + { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkRequest.ChunkId)) + { + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + ChunkRequest.Key.Bucket, + ChunkRequest.Key.Hash, + ChunkRequest.ChunkId, + NiceBytes(Chunk.Size()), + ToString(Chunk.GetContentType()), + "LOCAL"); + + Chunks[RequestIndex] = Chunk; + m_CacheStats.HitCount++; + } + else if (QueryRemote) + { + UpstreamRequests.push_back(RequestIndex); + } + else + { + ZEN_DEBUG("MISS - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId); + m_CacheStats.MissCount++; + } + } + else + { + ZEN_DEBUG("SKIP - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId); + } + + ++RequestIndex; + } + + if (!UpstreamRequests.empty() && m_UpstreamCache) + { + const auto OnCachePayloadGetComplete = [this, &ChunkRequests, &Chunks](CachePayloadGetCompleteParams&& Params) { + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload))) + { + auto InsertResult = m_CidStore.AddChunk(Compressed); + + ZEN_DEBUG("HIT - '{}/{}/{}' {} ({})", + Params.Request.Key.Bucket, + Params.Request.Key.Hash, + Params.Request.ChunkId, + NiceBytes(Params.Payload.GetSize()), + "UPSTREAM"); + + ZEN_ASSERT(Params.RequestIndex < Chunks.size()); + Chunks[Params.RequestIndex] = std::move(Params.Payload); + + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + } + else + { + ZEN_DEBUG("MISS - '{}/{}/{}'", Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId); + m_CacheStats.MissCount++; + } + }; + + m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete)); + } + + CbPackage RpcResponse; + CbObjectWriter ResponseObject; + + ResponseObject.BeginArray("Result"sv); + + for (size_t ChunkIndex = 0; ChunkIndex < Chunks.size(); ++ChunkIndex) + { + if (Chunks[ChunkIndex]) + { + ResponseObject << ChunkRequests[ChunkIndex].ChunkId; + RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex]))))); + } + else + { + ResponseObject << IoHash::Zero; + } + } + ResponseObject.EndArray(); + + RpcResponse.SetObject(ResponseObject.Save()); + + BinaryWriter MemStream; + RpcResponse.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +void HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) { CbObjectWriter Cbo; |