diff options
| author | mattpetersepic <[email protected]> | 2022-01-27 09:16:05 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-01-27 09:16:05 -0700 |
| commit | 9f60e3a53ea082e4dd2553d50c4c03fa9e9a42c8 (patch) | |
| tree | b304de1c944480adcd3fda442afa8734dd3cd589 | |
| parent | Fixed ZenServerEntry::*ListenPort compile errors (diff) | |
| download | zen-9f60e3a53ea082e4dd2553d50c4c03fa9e9a42c8.tar.xz zen-9f60e3a53ea082e4dd2553d50c4c03fa9e9a42c8.zip | |
Add batched CacheRecord put rpc (#38)
* Add batched CacheRecord put rpc
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 91 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 195 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 9 |
3 files changed, 248 insertions, 47 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index f51fd1504..85393aed2 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1814,7 +1814,12 @@ TEST_CASE("zcache.rpc") { using namespace std::literals; - auto CreateCacheRecord = [](const zen::CacheKey& CacheKey, size_t PayloadSize) -> zen::CbPackage { + auto AppendCacheRecord = [](CbPackage& Package, + CbWriter& Writer, + const zen::CacheKey& CacheKey, + size_t PayloadSize, + CachePolicy /* BatchDefaultPolicy */, + CachePolicy RecordPolicy) { std::vector<uint8_t> Data; Data.resize(PayloadSize); for (size_t Idx = 0; Idx < PayloadSize; ++Idx) @@ -1824,17 +1829,24 @@ TEST_CASE("zcache.rpc") zen::CbAttachment Attachment(zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()))); - zen::CbObjectWriter CacheRecord; - CacheRecord.BeginObject("CacheKey"sv); - CacheRecord << "Bucket"sv << CacheKey.Bucket << "Hash"sv << CacheKey.Hash; - CacheRecord.EndObject(); - CacheRecord << "Data"sv << Attachment; + Writer.BeginObject(); + { + Writer.BeginObject("Record"sv); + { + Writer.BeginObject("Key"sv); + { + Writer << "Bucket"sv << CacheKey.Bucket << "Hash"sv << CacheKey.Hash; + } + Writer.EndObject(); + Writer << "Data"sv << Attachment; + } + Writer.EndObject(); + Writer.SetName("Policy"sv); + CacheRecordPolicy(RecordPolicy).Save(Writer); + } + Writer.EndObject(); - zen::CbPackage Package; - Package.SetObject(CacheRecord.Save()); Package.AddAttachment(Attachment); - - return Package; }; auto ToIoBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { @@ -1843,27 +1855,46 @@ TEST_CASE("zcache.rpc") return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); }; - auto PutCacheRecords = [&CreateCacheRecord, &ToIoBuffer](std::string_view BaseUri, - std::string_view Query, - std::string_view Bucket, - size_t Num, - size_t PayloadSize = 1024) -> std::vector<CacheKey> { + auto PutCacheRecords = + [&AppendCacheRecord, + &ToIoBuffer](std::string_view BaseUri, std::string_view Bucket, size_t Num, size_t PayloadSize = 1024) -> std::vector<CacheKey> { std::vector<zen::CacheKey> OutKeys; for (uint32_t Key = 1; Key <= Num; ++Key) { - const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, zen::IoHash::HashBuffer(&Key, sizeof(uint32_t))); - CbPackage CacheRecord = CreateCacheRecord(CacheKey, PayloadSize); + zen::IoHash KeyHash; + ((uint32_t*)(KeyHash.Hash))[0] = Key; + const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); + CbPackage Package; + CbWriter Writer; - OutKeys.push_back(CacheKey); + Writer.BeginObject(); + { + Writer << "Method"sv + << "PutCacheRecords"sv; + Writer.BeginObject("Params"sv); + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer.BeginArray("Requests"sv); + { + AppendCacheRecord(Package, Writer, CacheKey, PayloadSize, BatchDefaultPolicy, CachePolicy::Default); + } + Writer.EndArray(); + } + Writer.EndObject(); + } + Writer.EndObject(); + Package.SetObject(Writer.Save().AsObject()); - IoBuffer Payload = ToIoBuffer(CacheRecord); + OutKeys.push_back(CacheKey); - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}{}", BaseUri, CacheKey.Bucket, CacheKey.Hash, Query)}, - cpr::Body{(const char*)Payload.Data(), Payload.Size()}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); - CHECK(Result.status_code == 201); + CHECK(Result.status_code == 200); } return OutKeys; @@ -1948,7 +1979,7 @@ TEST_CASE("zcache.rpc") Inst.WaitUntilReady(); CacheRecordPolicy Policy; - std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128); + std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "mastodon"sv, 128); GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy); CHECK(Result.Records.size() == Keys.size()); @@ -1958,7 +1989,7 @@ TEST_CASE("zcache.rpc") const CacheKey& ExpectedKey = Keys[Index++]; CbObjectView RecordObj = RecordView.AsObjectView(); - CbObjectView KeyObj = RecordObj["CacheKey"sv].AsObjectView(); + CbObjectView KeyObj = RecordObj["Key"sv].AsObjectView(); const CacheKey Key = CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash()); const IoHash AttachmentHash = RecordObj["Data"sv].AsHash(); const CbAttachment* Attachment = Result.Response.FindAttachment(AttachmentHash); @@ -1980,7 +2011,7 @@ TEST_CASE("zcache.rpc") Inst.WaitUntilReady(); CacheRecordPolicy Policy; - std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128); + std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "mastodon"sv, 128); std::vector<zen::CacheKey> Keys; for (const zen::CacheKey& Key : ExistingKeys) @@ -2006,7 +2037,7 @@ TEST_CASE("zcache.rpc") { const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++]; CbObjectView RecordObj = RecordView.AsObjectView(); - zen::CacheKey Key = LoadKey(RecordObj["CacheKey"sv]); + zen::CacheKey Key = LoadKey(RecordObj["Key"sv]); const IoHash AttachmentHash = RecordObj["Data"sv].AsHash(); const CbAttachment* Attachment = Result.Response.FindAttachment(AttachmentHash); @@ -2028,7 +2059,7 @@ TEST_CASE("zcache.rpc") SpawnServer(UpstreamServer, UpstreamCfg); SpawnServer(LocalServer, LocalCfg); - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4); + std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "mastodon"sv, 4); CacheRecordPolicy Policy(CachePolicy::QueryLocal); GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy); @@ -2053,7 +2084,7 @@ TEST_CASE("zcache.rpc") SpawnServer(UpstreamServer, UpstreamCfg); SpawnServer(LocalServer, LocalCfg); - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4); + std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "mastodon"sv, 4); CacheRecordPolicy Policy(CachePolicy::QueryLocal | CachePolicy::QueryRemote); GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy); @@ -2064,7 +2095,7 @@ TEST_CASE("zcache.rpc") { const zen::CacheKey& ExpectedKey = Keys[Index++]; CbObjectView RecordObj = RecordView.AsObjectView(); - zen::CacheKey Key = LoadKey(RecordObj["CacheKey"sv]); + zen::CacheKey Key = LoadKey(RecordObj["Key"sv]); CHECK(Key == ExpectedKey); } } diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 0f385116b..cb29b3645 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -15,6 +15,7 @@ #include <zencore/timer.h> #include <zencore/trace.h> #include <zenhttp/httpserver.h> +#include <zenhttp/httpshared.h> #include <zenstore/cas.h> #include <zenutil/cache/cache.h> @@ -55,6 +56,13 @@ struct AttachmentCount uint32_t Total = 0; }; +struct PutRequestData +{ + CacheKey Key; + CbObjectView RecordObject; + CacheRecordPolicy Policy; +}; + ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, @@ -787,27 +795,44 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) const HttpContentType ContentType = Request.RequestContentType(); const HttpContentType AcceptType = Request.AcceptContentType(); - if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage) + if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) || + AcceptType != HttpContentType::kCbPackage) { return Request.WriteResponse(HttpResponseCode::BadRequest); } - Request.WriteResponseAsync( - [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) { - const std::string_view Method = RpcRequest["Method"sv].AsString(); - if (Method == "GetCacheRecords"sv) - { - HandleRpcGetCacheRecords(AsyncRequest, RpcRequest); - } - else if (Method == "GetCacheValues"sv) - { - HandleRpcGetCachePayloads(AsyncRequest, RpcRequest); - } - else - { - AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); - } - }); + Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable { + CbPackage Package; + CbObjectView Object; + CbObject ObjectBuffer; + if (ContentType == HttpContentType::kCbObject) + { + ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body)); + Object = ObjectBuffer; + } + else + { + Package = ParsePackageMessage(Body); + Object = Package.GetObject(); + } + const std::string_view Method = Object["Method"sv].AsString(); + if (Method == "PutCacheRecords"sv) + { + HandleRpcPutCacheRecords(AsyncRequest, Package); + } + else if (Method == "GetCacheRecords"sv) + { + HandleRpcGetCacheRecords(AsyncRequest, Object); + } + else if (Method == "GetCacheValues"sv) + { + HandleRpcGetCachePayloads(AsyncRequest, Object); + } + else + { + AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + } + }); } break; default: @@ -817,6 +842,142 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) } void +HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest) +{ + ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); + CbObjectView BatchObject = BatchRequest.GetObject(); + + CbObjectView Params = BatchObject["Params"sv].AsObjectView(); + CachePolicy DefaultPolicy; + + ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); + + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::vector<bool> Results; + for (CbFieldView RequestField : Params["Requests"sv]) + { + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView(); + CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); + CbFieldView BucketField = KeyView["Bucket"sv]; + CbFieldView HashField = KeyView["Hash"sv]; + CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); + if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + CacheRecordPolicy Policy = CacheRecordPolicy::Load(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); + PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)}; + + PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); + + if (Result == PutResult::Invalid) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + Results.push_back(Result == PutResult::Success); + } + if (Results.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + CbObjectWriter ResponseObject; + ResponseObject.BeginArray("Result"sv); + for (bool Value : Results) + { + ResponseObject.AddBool(Value); + } + ResponseObject.EndArray(); + + CbPackage RpcResponse; + RpcResponse.SetObject(ResponseObject.Save()); + + BinaryWriter MemStream; + RpcResponse.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +HttpStructuredCacheService::PutResult +HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) +{ + std::vector<IoHash> ValidAttachments; + AttachmentCount Count; + CbObjectView Record = Request.RecordObject; + uint64_t RecordObjectSize = Record.GetSize(); + uint64_t TransferredSize = RecordObjectSize; + + Request.RecordObject.IterateAttachments([this, &Request, Package, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) + { + if (Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + + ValidAttachments.emplace_back(InsertResult.DecompressedId); + + if (InsertResult.New) + { + Count.New++; + } + Count.Valid++; + TransferredSize += Chunk.GetCompressedSize(); + } + else + { + ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Request.Key.Bucket, + Request.Key.Hash, + ToString(HttpContentType::kCbPackage), + ValueHash); + Count.Invalid++; + } + } + else if (m_CidStore.ContainsChunk(ValueHash)) + { + ValidAttachments.emplace_back(ValueHash); + Count.Valid++; + } + Count.Total++; + }); + + if (Count.Invalid > 0) + { + return PutResult::Invalid; + } + + ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", + Request.Key.Bucket, + Request.Key.Hash, + NiceBytes(TransferredSize), + Count.New, + Count.Valid, + Count.Total); + + ZenCacheValue CacheValue; + MemoryView RecordView = Record.GetView(); + CacheValue.Value = IoBuffer(Record.GetSize()); + Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); + CacheValue.Value.SetContentType(ZenContentType::kCbObject); + m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, CacheValue); + + const bool IsPartialRecord = Count.Valid != Count.Total; + + if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) + { + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .PayloadIds = std::move(ValidAttachments)}); + } + return PutResult::Success; +} + +void HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest) { ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index a7ecba845..5b6b10898 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -19,6 +19,7 @@ namespace zen { class CasStore; class CidStore; class CbObjectView; +struct PutRequestData; class ScrubContext; class UpstreamCache; class ZenCacheStore; @@ -82,6 +83,12 @@ private: std::atomic_uint64_t UpstreamHitCount{}; std::atomic_uint64_t MissCount{}; }; + enum class PutResult + { + Success, + Fail, + Invalid, + }; [[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef); void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); @@ -91,11 +98,13 @@ private: void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); void HandleRpcRequest(zen::HttpServerRequest& Request); + void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest); void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest); void HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; + PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; |