diff options
| author | Dan Engelbrecht <[email protected]> | 2022-10-13 10:40:26 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-10-13 01:40:26 -0700 |
| commit | 0c7fbe43ed582cd791191d6c0935cd8693e1208e (patch) | |
| tree | 77757d193734f8927d474c5a072ffe4af8579513 /zenserver/cache/structuredcache.cpp | |
| parent | disable project store GC (#179) (diff) | |
| download | zen-0c7fbe43ed582cd791191d6c0935cd8693e1208e.tar.xz zen-0c7fbe43ed582cd791191d6c0935cd8693e1208e.zip | |
Add "Accept" field in RPC request to gracefully handle requests from older instances (#180)
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 118 |
1 files changed, 97 insertions, 21 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 75fb8ac41..0e2462a4a 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1159,12 +1159,13 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req { ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); CbObjectView BatchObject = BatchRequest.GetObject(); + ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); + + uint32_t AcceptMagic = BatchObject["Accept"sv].AsUInt32(); CbObjectView Params = BatchObject["Params"sv].AsObjectView(); CachePolicy DefaultPolicy; - ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); - std::string_view PolicyText = Params["DefaultPolicy"].AsString(); std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); if (!Namespace) @@ -1211,9 +1212,20 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); + if (AcceptMagic == kCbPkgMagic) + { + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else + { + BinaryWriter MemStream; + RpcResponse.Save(MemStream); - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } } HttpStructuredCacheService::PutResult @@ -1302,9 +1314,12 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt { ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); + uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32(); + + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + struct ValueRequestData { Oid ValueId; @@ -1632,9 +1647,20 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt ResponseObject.EndArray(); ResponsePackage.SetObject(ResponseObject.Save()); - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage); + if (AcceptMagic == kCbPkgMagic) + { + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage); + HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else + { + BinaryWriter MemStream; + ResponsePackage.Save(MemStream); - HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + HttpRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } } void @@ -1642,10 +1668,11 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ { ZEN_TRACE_CPU("Z$::RpcPutCacheValues"); CbObjectView BatchObject = BatchRequest.GetObject(); + ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv); - CbObjectView Params = BatchObject["Params"sv].AsObjectView(); + uint32_t AcceptMagic = BatchObject["Accept"sv].AsUInt32(); - ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv); + CbObjectView Params = BatchObject["Params"sv].AsObjectView(); std::string_view PolicyText = Params["DefaultPolicy"].AsString(); CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; @@ -1742,9 +1769,20 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); + if (AcceptMagic == kCbPkgMagic) + { + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else + { + BinaryWriter MemStream; + RpcResponse.Save(MemStream); - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } } void @@ -1752,6 +1790,10 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http { ZEN_TRACE_CPU("Z$::RpcGetCacheValues"); + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); + + uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32(); + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; @@ -1771,8 +1813,6 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http }; std::vector<RequestData> Requests; - ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); - std::vector<size_t> RemoteRequestIndexes; for (CbFieldView RequestField : Params["Requests"sv]) @@ -1934,9 +1974,21 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http ResponseObject.EndArray(); RpcResponse.SetObject(ResponseObject.Save()); - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); - HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + if (AcceptMagic == kCbPkgMagic) + { + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); + HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else + { + BinaryWriter MemStream; + RpcResponse.Save(MemStream); + + HttpRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } } namespace cache::detail { @@ -1990,9 +2042,18 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream + uint32_t AcceptMagic = 0; // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests - if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) + if (!ParseGetCacheChunksRequest(AcceptMagic, + Namespace, + RecordKeys, + Records, + RequestKeys, + Requests, + RecordRequests, + ValueRequests, + RpcRequest)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } @@ -2008,11 +2069,12 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests); // Send the payload and descriptive data about each chunk to the client - WriteGetCacheChunksResponse(Namespace, Requests, HttpRequest); + WriteGetCacheChunksResponse(AcceptMagic, Namespace, Requests, HttpRequest); } bool -HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace, +HttpStructuredCacheService::ParseGetCacheChunksRequest(uint32_t& AcceptMagic, + std::string& Namespace, std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<CacheChunkRequest>& RequestKeys, @@ -2025,6 +2087,8 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Nam ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); + AcceptMagic = RpcRequest["Accept"sv].AsUInt32(); + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; @@ -2375,7 +2439,8 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names } void -HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, +HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t AcceptMagic, + std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests, zen::HttpServerRequest& HttpRequest) { @@ -2438,9 +2503,20 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view RpcResponse.SetObject(Writer.Save()); - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); + if (AcceptMagic == kCbPkgMagic) + { + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); + HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else + { + BinaryWriter MemStream; + RpcResponse.Save(MemStream); - HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + HttpRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } } void |