From 9f60e3a53ea082e4dd2553d50c4c03fa9e9a42c8 Mon Sep 17 00:00:00 2001 From: mattpetersepic <58296718+mattpetersepic@users.noreply.github.com> Date: Thu, 27 Jan 2022 09:16:05 -0700 Subject: Add batched CacheRecord put rpc (#38) * Add batched CacheRecord put rpc --- zenserver/cache/structuredcache.cpp | 195 ++++++++++++++++++++++++++++++++---- 1 file changed, 178 insertions(+), 17 deletions(-) (limited to 'zenserver/cache/structuredcache.cpp') diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 0f385116b..cb29b3645 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -55,6 +56,13 @@ struct AttachmentCount uint32_t Total = 0; }; +struct PutRequestData +{ + CacheKey Key; + CbObjectView RecordObject; + CacheRecordPolicy Policy; +}; + ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, @@ -787,27 +795,44 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) const HttpContentType ContentType = Request.RequestContentType(); const HttpContentType AcceptType = Request.AcceptContentType(); - if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage) + if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) || + AcceptType != HttpContentType::kCbPackage) { return Request.WriteResponse(HttpResponseCode::BadRequest); } - Request.WriteResponseAsync( - [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) { - const std::string_view Method = RpcRequest["Method"sv].AsString(); - if (Method == "GetCacheRecords"sv) - { - HandleRpcGetCacheRecords(AsyncRequest, RpcRequest); - } - else if (Method == "GetCacheValues"sv) - { - HandleRpcGetCachePayloads(AsyncRequest, RpcRequest); - } - else - { - AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); - } - }); + Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable { + CbPackage Package; + CbObjectView Object; + CbObject ObjectBuffer; + if (ContentType == HttpContentType::kCbObject) + { + ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body)); + Object = ObjectBuffer; + } + else + { + Package = ParsePackageMessage(Body); + Object = Package.GetObject(); + } + const std::string_view Method = Object["Method"sv].AsString(); + if (Method == "PutCacheRecords"sv) + { + HandleRpcPutCacheRecords(AsyncRequest, Package); + } + else if (Method == "GetCacheRecords"sv) + { + HandleRpcGetCacheRecords(AsyncRequest, Object); + } + else if (Method == "GetCacheValues"sv) + { + HandleRpcGetCachePayloads(AsyncRequest, Object); + } + else + { + AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + } + }); } break; default: @@ -816,6 +841,142 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) } } +void +HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest) +{ + ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); + CbObjectView BatchObject = BatchRequest.GetObject(); + + CbObjectView Params = BatchObject["Params"sv].AsObjectView(); + CachePolicy DefaultPolicy; + + ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); + + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::vector Results; + for (CbFieldView RequestField : Params["Requests"sv]) + { + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView(); + CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); + CbFieldView BucketField = KeyView["Bucket"sv]; + CbFieldView HashField = KeyView["Hash"sv]; + CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); + if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + CacheRecordPolicy Policy = CacheRecordPolicy::Load(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); + PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)}; + + PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); + + if (Result == PutResult::Invalid) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + Results.push_back(Result == PutResult::Success); + } + if (Results.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + CbObjectWriter ResponseObject; + ResponseObject.BeginArray("Result"sv); + for (bool Value : Results) + { + ResponseObject.AddBool(Value); + } + ResponseObject.EndArray(); + + CbPackage RpcResponse; + RpcResponse.SetObject(ResponseObject.Save()); + + BinaryWriter MemStream; + RpcResponse.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +HttpStructuredCacheService::PutResult +HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) +{ + std::vector ValidAttachments; + AttachmentCount Count; + CbObjectView Record = Request.RecordObject; + uint64_t RecordObjectSize = Record.GetSize(); + uint64_t TransferredSize = RecordObjectSize; + + Request.RecordObject.IterateAttachments([this, &Request, Package, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) + { + if (Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + + ValidAttachments.emplace_back(InsertResult.DecompressedId); + + if (InsertResult.New) + { + Count.New++; + } + Count.Valid++; + TransferredSize += Chunk.GetCompressedSize(); + } + else + { + ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Request.Key.Bucket, + Request.Key.Hash, + ToString(HttpContentType::kCbPackage), + ValueHash); + Count.Invalid++; + } + } + else if (m_CidStore.ContainsChunk(ValueHash)) + { + ValidAttachments.emplace_back(ValueHash); + Count.Valid++; + } + Count.Total++; + }); + + if (Count.Invalid > 0) + { + return PutResult::Invalid; + } + + ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", + Request.Key.Bucket, + Request.Key.Hash, + NiceBytes(TransferredSize), + Count.New, + Count.Valid, + Count.Total); + + ZenCacheValue CacheValue; + MemoryView RecordView = Record.GetView(); + CacheValue.Value = IoBuffer(Record.GetSize()); + Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); + CacheValue.Value.SetContentType(ZenContentType::kCbObject); + m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, CacheValue); + + const bool IsPartialRecord = Count.Valid != Count.Total; + + if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) + { + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .PayloadIds = std::move(ValidAttachments)}); + } + return PutResult::Success; +} + void HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest) { -- cgit v1.2.3