aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-10-13 10:40:26 +0200
committerGitHub <[email protected]>2022-10-13 01:40:26 -0700
commit0c7fbe43ed582cd791191d6c0935cd8693e1208e (patch)
tree77757d193734f8927d474c5a072ffe4af8579513 /zenserver/cache/structuredcache.cpp
parentdisable project store GC (#179) (diff)
downloadzen-0c7fbe43ed582cd791191d6c0935cd8693e1208e.tar.xz
zen-0c7fbe43ed582cd791191d6c0935cd8693e1208e.zip
Add "Accept" field in RPC request to gracefully handle requests from older instances (#180)
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp118
1 files changed, 97 insertions, 21 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 75fb8ac41..0e2462a4a 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -1159,12 +1159,13 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
{
ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
CbObjectView BatchObject = BatchRequest.GetObject();
+ ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
+
+ uint32_t AcceptMagic = BatchObject["Accept"sv].AsUInt32();
CbObjectView Params = BatchObject["Params"sv].AsObjectView();
CachePolicy DefaultPolicy;
- ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
-
std::string_view PolicyText = Params["DefaultPolicy"].AsString();
std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
if (!Namespace)
@@ -1211,9 +1212,20 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
+ if (AcceptMagic == kCbPkgMagic)
+ {
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
}
HttpStructuredCacheService::PutResult
@@ -1302,9 +1314,12 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
{
ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
+ uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32();
+
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+
struct ValueRequestData
{
Oid ValueId;
@@ -1632,9 +1647,20 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
ResponseObject.EndArray();
ResponsePackage.SetObject(ResponseObject.Save());
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage);
+ if (AcceptMagic == kCbPkgMagic)
+ {
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage);
+ HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ else
+ {
+ BinaryWriter MemStream;
+ ResponsePackage.Save(MemStream);
- HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ HttpRequest.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
}
void
@@ -1642,10 +1668,11 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
{
ZEN_TRACE_CPU("Z$::RpcPutCacheValues");
CbObjectView BatchObject = BatchRequest.GetObject();
+ ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv);
- CbObjectView Params = BatchObject["Params"sv].AsObjectView();
+ uint32_t AcceptMagic = BatchObject["Accept"sv].AsUInt32();
- ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv);
+ CbObjectView Params = BatchObject["Params"sv].AsObjectView();
std::string_view PolicyText = Params["DefaultPolicy"].AsString();
CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
@@ -1742,9 +1769,20 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
+ if (AcceptMagic == kCbPkgMagic)
+ {
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
}
void
@@ -1752,6 +1790,10 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
{
ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
+
+ uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32();
+
CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
@@ -1771,8 +1813,6 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
};
std::vector<RequestData> Requests;
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
-
std::vector<size_t> RemoteRequestIndexes;
for (CbFieldView RequestField : Params["Requests"sv])
@@ -1934,9 +1974,21 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
ResponseObject.EndArray();
RpcResponse.SetObject(ResponseObject.Save());
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
- HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ if (AcceptMagic == kCbPkgMagic)
+ {
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
+ HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
+
+ HttpRequest.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
}
namespace cache::detail {
@@ -1990,9 +2042,18 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http
std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key
std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key
std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
+ uint32_t AcceptMagic = 0;
// Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
- if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
+ if (!ParseGetCacheChunksRequest(AcceptMagic,
+ Namespace,
+ RecordKeys,
+ Records,
+ RequestKeys,
+ Requests,
+ RecordRequests,
+ ValueRequests,
+ RpcRequest))
{
return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
}
@@ -2008,11 +2069,12 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http
GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests);
// Send the payload and descriptive data about each chunk to the client
- WriteGetCacheChunksResponse(Namespace, Requests, HttpRequest);
+ WriteGetCacheChunksResponse(AcceptMagic, Namespace, Requests, HttpRequest);
}
bool
-HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace,
+HttpStructuredCacheService::ParseGetCacheChunksRequest(uint32_t& AcceptMagic,
+ std::string& Namespace,
std::vector<CacheKeyRequest>& RecordKeys,
std::vector<cache::detail::RecordBody>& Records,
std::vector<CacheChunkRequest>& RequestKeys,
@@ -2025,6 +2087,8 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Nam
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
+ AcceptMagic = RpcRequest["Accept"sv].AsUInt32();
+
CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
@@ -2375,7 +2439,8 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
}
void
-HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace,
+HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t AcceptMagic,
+ std::string_view Namespace,
std::vector<cache::detail::ChunkRequest>& Requests,
zen::HttpServerRequest& HttpRequest)
{
@@ -2438,9 +2503,20 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view
RpcResponse.SetObject(Writer.Save());
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
+ if (AcceptMagic == kCbPkgMagic)
+ {
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
+ HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
- HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ HttpRequest.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
}
void