diff options
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 353 |
1 files changed, 249 insertions, 104 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 59cc74d53..a1b0e1549 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -231,8 +231,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (ValidCount != AttachmentCount) { - Success = false; - ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments", + // Success = false; + ZEN_WARN("GET - '{}/{}' '{}' is partial, found '{}' of '{}' attachments", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType), @@ -554,12 +554,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request } }); - const bool AttachmentsValid = ValidAttachments.size() == Attachments.size(); + const bool IsPartialCacheRecord = ValidAttachments.size() != Attachments.size(); - if (!AttachmentsValid) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv); - } + // if (!AttachmentsValid) + //{ + // return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv); + //} ZEN_DEBUG("PUT - '{}/{}' {} '{}', {}/{} new attachments", Ref.BucketSegment, @@ -574,7 +574,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); - if (StoreUpstream) + if (StoreUpstream && !IsPartialCacheRecord) { ZEN_ASSERT(m_UpstreamCache); auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, @@ -786,15 +786,11 @@ HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request, { using namespace fmt::literals; - const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData; - const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; - const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote); - switch (auto Verb = Request.RequestVerb()) { using enum HttpVerb; - case kGet: + case kPost: { const HttpContentType ContentType = Request.RequestContentType(); const HttpContentType AcceptType = Request.AcceptContentType(); @@ -807,122 +803,271 @@ HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request, CbObject BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload()); const std::string_view Method = BatchRequest["method"sv].AsString(); - if (Method != "getcacherecords"sv) + if (Method == "get-cache-records"sv) { - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Invalid method '{}'"_format(Method)); + HandleBatchGetCacheRecords(Request, BatchRequest, Policy); } + else if (Method == "get-cache-chunks"sv) + { + HandleBatchGetCacheChunks(Request, BatchRequest, Policy); + } + else + { + Request.WriteResponse(HttpResponseCode::BadRequest); + } + } + break; + default: + Request.WriteResponse(HttpResponseCode::BadRequest); + break; + } +} - CbObjectView Params = BatchRequest["params"sv].AsObjectView(); +void +HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy) +{ + using namespace fmt::literals; - std::vector<CacheKey> CacheKeys; - std::vector<IoBuffer> CacheValues; - std::vector<IoBuffer> Payloads; - std::vector<size_t> Missing; + const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData; + const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; + const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote); - for (CbFieldView QueryView : Params["cachekeys"sv]) - { - CbObjectView Query = QueryView.AsObjectView(); - CacheKeys.push_back(CacheKey::Create(Query["bucket"sv].AsString(), Query["key"sv].AsHash())); - } + const std::string_view Method = BatchRequest["method"sv].AsString(); + ZEN_ASSERT(Method == "get-cache-records"sv); - CacheValues.resize(CacheKeys.size()); + CbObjectView Params = BatchRequest["params"sv].AsObjectView(); - for (size_t Idx = 0; const CacheKey& Key : CacheKeys) - { - ZenCacheValue CacheValue; - if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) - { - CbObjectView CacheRecord(CacheValue.Value.Data()); + std::vector<CacheKey> CacheKeys; + std::vector<IoBuffer> CacheValues; + std::vector<IoBuffer> Payloads; + std::vector<size_t> Missing; - if (!SkipAttachments) - { - CacheRecord.IterateAttachments([this, &Payloads](CbFieldView AttachmentHash) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - Payloads.push_back(Chunk); - } - }); - } + for (CbFieldView QueryView : Params["cachekeys"sv]) + { + CbObjectView Query = QueryView.AsObjectView(); + CacheKeys.push_back(CacheKey::Create(Query["bucket"sv].AsString(), Query["hash"sv].AsHash())); + } - CacheValues[Idx] = CacheValue.Value; - } - else + CacheValues.resize(CacheKeys.size()); + + for (size_t Idx = 0; const CacheKey& Key : CacheKeys) + { + ZenCacheValue CacheValue; + if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) + { + CbObjectView CacheRecord(CacheValue.Value.Data()); + + if (!SkipAttachments) + { + CacheRecord.IterateAttachments([this, &Payloads](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { - Missing.push_back(Idx); + Payloads.push_back(Chunk); } + }); + } - ++Idx; - } + CacheValues[Idx] = CacheValue.Value; + } + else + { + Missing.push_back(Idx); + } + + ++Idx; + } - if (QueryUpstream) + if (!Missing.empty() && QueryUpstream) + { + auto UpstreamResult = m_UpstreamCache->GetCacheRecords( + CacheKeys, + Missing, + [this, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) { + CbPackage UpstreamPackage; + if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue)) { - auto UpstreamResult = m_UpstreamCache->GetCacheRecords( - CacheKeys, - Missing, - [this, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) { - CbPackage UpstreamPackage; - if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue)) + CbObjectView CacheRecord = UpstreamPackage.GetObject(); + + CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = UpstreamPackage.FindAttachment(AttachmentHash.AsHash())) + { + if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) { - CbObjectView CacheRecord = UpstreamPackage.GetObject(); - - CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { - if (const CbAttachment* Attachment = UpstreamPackage.FindAttachment(AttachmentHash.AsHash())) - { - if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) - { - m_CidStore.AddChunk(Chunk); - - if (!SkipAttachments) - { - Payloads.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); - } - } - } - }); - - CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView()); + m_CidStore.AddChunk(Chunk); + + if (!SkipAttachments) + { + Payloads.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + } } - }); + } + }); + + CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView()); } + }); + } - CbObjectWriter BatchResponse; + CbObjectWriter BatchResponse; - BatchResponse.BeginArray("result"sv); - for (const IoBuffer& Value : CacheValues) - { - if (Value) - { - BatchResponse << CbObjectView(Value.Data()); - } - else - { - BatchResponse.AddNull(); - } - } - BatchResponse.EndArray(); + BatchResponse.BeginArray("result"sv); + for (const IoBuffer& Value : CacheValues) + { + if (Value) + { + CbObjectView Record(Value.Data()); + BatchResponse << Record; + } + else + { + BatchResponse.AddNull(); + } + } + BatchResponse.EndArray(); - CbPackage Package; - Package.SetObject(BatchResponse.Save()); + CbPackage Package; + Package.SetObject(BatchResponse.Save()); - for (const IoBuffer& Payload : Payloads) - { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Payload)))); - } + for (const IoBuffer& Payload : Payloads) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Payload)))); + } + + BinaryWriter MemStream; + Package.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +void +HttpStructuredCacheService::HandleBatchGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy) +{ + using namespace fmt::literals; - BinaryWriter MemStream; - Package.Save(MemStream); + const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData; + const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; + const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote); + + const std::string_view Method = BatchRequest["method"sv].AsString(); + ZEN_ASSERT(Method == "get-cache-chunks"sv); - Request.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + CbObjectView Params = BatchRequest["params"sv].AsObjectView(); + + const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash { + if (CbObjectView ValueObject = Record["Value"].AsObjectView()) + { + const Oid Id = ValueObject["Id"].AsObjectId(); + if (Id == PayloadId) + { + return ValueObject["RawHash"sv].AsHash(); } - break; - default: - Request.WriteResponse(HttpResponseCode::BadRequest); - break; + } + + for (CbFieldView AttachmentView : Record["Attachments"sv]) + { + CbObjectView AttachmentObject = AttachmentView.AsObjectView(); + const Oid Id = AttachmentObject["Id"].AsObjectId(); + + if (Id == PayloadId) + { + return AttachmentObject["RawHash"sv].AsHash(); + } + } + + return IoHash::Zero; + }; + + std::vector<CacheChunk> CacheChunks; + std::vector<CachePolicy> ChunkPolicies; + std::vector<size_t> Missing; + + for (CbFieldView ChunkView : Params["cachechunks"sv]) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + CbObjectView CacheKeyObject = Chunk["key"sv].AsObjectView(); + const CacheKey Key = CacheKey::Create(CacheKeyObject["bucket"sv].AsString(), CacheKeyObject["hash"sv].AsHash()); + const Oid PayloadId = Chunk["id"sv].AsObjectId(); + const uint64_t RawOffset = Chunk["rawoffset"sv].AsUInt64(); + const uint64_t RawSize = Chunk["rawsize"sv].AsUInt64(); + const uint32_t ChunkPolicy = Chunk["policy"sv].AsUInt32(); + + IoHash ChunkId = IoHash::Zero; + + ZenCacheValue CacheValue; + if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) + { + ChunkId = GetChunkIdFromPayloadId(CbObjectView(CacheValue.Value.Data()), PayloadId); + } + + CacheChunks.emplace_back(Key, ChunkId, RawOffset, RawSize); + ChunkPolicies.emplace_back(static_cast<CachePolicy>(ChunkPolicy)); + } + + if (CacheChunks.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + std::vector<IoBuffer> Chunks; + Chunks.resize(CacheChunks.size()); + + for (size_t Idx = 0; CacheChunk & CacheChunk : CacheChunks) + { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(CacheChunk.Id)) + { + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + CacheChunk.Key.Bucket, + CacheChunk.Key.Hash, + CacheChunk.Id, + NiceBytes(Chunk.Size()), + ToString(Chunk.GetContentType()), + "LOCAL"); + + Chunks[Idx] = Chunk; + } + else + { + ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", + CacheChunk.Key.Bucket, + CacheChunk.Key.Hash, + CacheChunk.Id, + ToString(HttpContentType::kCompressedBinary)); + + CacheChunk.Id = IoHash::Zero; + Missing.push_back(Idx); + } + ++Idx; + } + + CbPackage Package; + CbObjectWriter BatchResponse; + + BatchResponse.BeginArray("result"sv); + + for (size_t Idx = 0; Idx < Chunks.size(); ++Idx) + { + if (Chunks[Idx]) + { + BatchResponse << CacheChunks[Idx].Id; + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[Idx]))))); + } + else + { + BatchResponse << IoHash::Zero; + } } + BatchResponse.EndArray(); + + Package.SetObject(BatchResponse.Save()); + + BinaryWriter MemStream; + Package.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } void |