diff options
| author | Dan Engelbrecht <[email protected]> | 2022-09-29 09:25:29 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-09-29 00:25:29 -0700 |
| commit | f87631dd0d283ad04eb3873c1a7510a9d1423792 (patch) | |
| tree | fb6da8ce9b1e55beff73c82d8943dc0303c7ae05 | |
| parent | De/more upstream details (#168) (diff) | |
| download | zen-f87631dd0d283ad04eb3873c1a7510a9d1423792.tar.xz zen-f87631dd0d283ad04eb3873c1a7510a9d1423792.zip | |
Format all rpc package responses using `FormatPackageMessageBuffer` to avoid memory copy (#174)
When reading upstream, fall back to old rpc response to handle older instances.
| -rw-r--r-- | CHANGELOG.md | 3 | ||||
| -rw-r--r-- | zenhttp/httpshared.cpp | 11 | ||||
| -rw-r--r-- | zenhttp/include/zenhttp/httpshared.h | 2 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 21 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 36 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 32 |
6 files changed, 60 insertions, 45 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 57b655c00..9ae0116d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,10 @@ ## +- Change: All RPC responses are now formatted using dedicated wire format, Zen server has fallback to enable compatability with legacy upstreams - Feature: Adding a `.json` extension to the `--abslog` option will make zenserver log in json format to file - Feature: Create release in Sentry and use `sentry_options_set_release` to associate the executable - Bugfix: CompactBinary: Fixed LoadCompactBinary to gracefully handle read failures and sizes larger than the archive. From https://p4-swarm.epicgames.net/changes/21983905 +- Improvement: Logging: don't do formatting of messages the will not be logged +- Improvement: Logging: Timing and upstream source information in upstream logging when debug level logging is enabled ## v0.1.5 - Bugfix: Don't fail entire request if GetCacheValue from Horde fails for a single value diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp index 769b5e3f3..2b96f32fc 100644 --- a/zenhttp/httpshared.cpp +++ b/zenhttp/httpshared.cpp @@ -339,6 +339,17 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint return Package; } +bool +ParsePackageMessageWithLegacyFallback(const IoBuffer& Response, CbPackage& OutPackage) +{ + if (IsPackageMessage(Response)) + { + OutPackage = ParsePackageMessage(Response); + return true; + } + return OutPackage.TryLoad(Response); +} + CbPackageReader::CbPackageReader() : m_CreateBuffer([](const IoHash&, uint64_t Size) -> IoBuffer { return IoBuffer{Size}; }) { } diff --git a/zenhttp/include/zenhttp/httpshared.h b/zenhttp/include/zenhttp/httpshared.h index 24ce0c85a..0265d8d1e 100644 --- a/zenhttp/include/zenhttp/httpshared.h +++ b/zenhttp/include/zenhttp/httpshared.h @@ -93,6 +93,8 @@ CbPackage ParsePackageMessage( }); bool IsPackageMessage(IoBuffer Payload); +bool ParsePackageMessageWithLegacyFallback(const IoBuffer& Response, CbPackage& OutPackage); + std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data); CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data); diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 0c519dc7e..61c1e2df1 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1389,8 +1389,8 @@ TEST_CASE("zcache.rpc") if (Result.status_code == 200) { - CbPackage Response; - if (Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()))) + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + if (!Response.IsNull()) { OutResult.Response = std::move(Response); CHECK(OutResult.Result.Parse(OutResult.Response)); @@ -1651,8 +1651,8 @@ TEST_CASE("zcache.failing.upstream") if (Result.status_code == 200) { - CbPackage Response; - if (Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()))) + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + if (!Response.IsNull()) { OutResult.Response = std::move(Response); CHECK(OutResult.Result.Parse(OutResult.Response)); @@ -2087,8 +2087,8 @@ TEST_CASE("zcache.rpc.allpolicies") cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); CHECK_MESSAGE(Result.status_code == 200, "GetCacheRecords unexpectedly failed."); - CbPackage Response; - bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + bool Loaded = !Response.IsNull(); CHECK_MESSAGE(Loaded, "GetCacheRecords response failed to load."); cacherequests::GetCacheRecordsResult RequestResult; CHECK(RequestResult.Parse(Response)); @@ -2166,8 +2166,9 @@ TEST_CASE("zcache.rpc.allpolicies") cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); CHECK_MESSAGE(Result.status_code == 200, "GetCacheValues unexpectedly failed."); - CbPackage Response; - bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + IoBuffer MessageBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + CbPackage Response = ParsePackageMessage(MessageBuffer); + bool Loaded = !Response.IsNull(); CHECK_MESSAGE(Loaded, "GetCacheValues response failed to load."); cacherequests::GetCacheValuesResult GetCacheValuesResult; CHECK(GetCacheValuesResult.Parse(Response)); @@ -2234,8 +2235,8 @@ TEST_CASE("zcache.rpc.allpolicies") cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); CHECK_MESSAGE(Result.status_code == 200, "GetCacheChunks unexpectedly failed."); - CbPackage Response; - bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + bool Loaded = !Response.IsNull(); CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load."); cacherequests::GetCacheChunksResult GetCacheChunksResult; CHECK(GetCacheChunksResult.Parse(Response)); diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index cd679d186..75fb8ac41 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1211,12 +1211,9 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); - BinaryWriter MemStream; - RpcResponse.Save(MemStream); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); - Request.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } HttpStructuredCacheService::PutResult @@ -1635,12 +1632,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt ResponseObject.EndArray(); ResponsePackage.SetObject(ResponseObject.Save()); - BinaryWriter MemStream; - ResponsePackage.Save(MemStream); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage); - HttpRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } void @@ -1748,12 +1742,9 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); - BinaryWriter MemStream; - RpcResponse.Save(MemStream); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); - Request.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } void @@ -1943,13 +1934,9 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http ResponseObject.EndArray(); RpcResponse.SetObject(ResponseObject.Save()); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); - BinaryWriter MemStream; - RpcResponse.Save(MemStream); - - HttpRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } namespace cache::detail { @@ -2451,12 +2438,9 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view RpcResponse.SetObject(Writer.Save()); - BinaryWriter MemStream; - RpcResponse.Save(MemStream); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); - HttpRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } void diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index c1c0395e7..610946385 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -15,6 +15,8 @@ #include <zencore/timer.h> #include <zencore/trace.h> +#include <zenhttp/httpshared.h> + #include <zenstore/cidstore.h> #include <auth/authmgr.h> @@ -887,7 +889,6 @@ namespace detail { } BatchRequest.EndObject(); - CbPackage BatchResponse; ZenCacheResult Result; { @@ -899,12 +900,13 @@ namespace detail { if (Result.Success) { - if (BatchResponse.TryLoad(Result.Response)) + CbPackage BatchResponse; + if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse)) { CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); if (Results.Num() != Requests.size()) { - ZEN_WARN("Upstream::Zen::GetCacheRecords invalid number of Requests from Upstream."); + ZEN_WARN("Upstream::Zen::GetCacheRecords invalid number of Response results from Upstream."); } else { @@ -921,6 +923,10 @@ namespace detail { return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } + else + { + ZEN_WARN("Upstream::Zen::GetCacheRecords invalid Response from Upstream."); + } } for (CacheKeyRequest* Request : Requests) @@ -1004,7 +1010,6 @@ namespace detail { } BatchRequest.EndObject(); - CbPackage BatchResponse; ZenCacheResult Result; { @@ -1016,12 +1021,13 @@ namespace detail { if (Result.Success) { - if (BatchResponse.TryLoad(Result.Response)) + CbPackage BatchResponse; + if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse)) { CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); if (CacheValueRequests.size() != Results.Num()) { - ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream."); + ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Response results from Upstream."); } else { @@ -1068,6 +1074,10 @@ namespace detail { return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } + else + { + ZEN_WARN("Upstream::Zen::GetCacheValues invalid Response from Upstream."); + } } for (CacheValueRequest* RequestPtr : CacheValueRequests) @@ -1135,7 +1145,6 @@ namespace detail { } BatchRequest.EndObject(); - CbPackage BatchResponse; ZenCacheResult Result; { @@ -1147,12 +1156,13 @@ namespace detail { if (Result.Success) { - if (BatchResponse.TryLoad(Result.Response)) + CbPackage BatchResponse; + if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse)) { CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); if (CacheChunkRequests.size() != Results.Num()) { - ZEN_WARN("Upstream::Zen::GetCacheChunks invalid number of Requests from Upstream."); + ZEN_WARN("Upstream::Zen::GetCacheChunks invalid number of Response results from Upstream."); } else { @@ -1199,6 +1209,10 @@ namespace detail { return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } + else + { + ZEN_WARN("Upstream::Zen::GetCacheChunks invalid Response from Upstream."); + } } for (CacheChunkRequest* RequestPtr : CacheChunkRequests) |