aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-09-29 09:25:29 +0200
committerGitHub <[email protected]>2022-09-29 00:25:29 -0700
commitf87631dd0d283ad04eb3873c1a7510a9d1423792 (patch)
treefb6da8ce9b1e55beff73c82d8943dc0303c7ae05
parentDe/more upstream details (#168) (diff)
downloadzen-f87631dd0d283ad04eb3873c1a7510a9d1423792.tar.xz
zen-f87631dd0d283ad04eb3873c1a7510a9d1423792.zip
Format all rpc package responses using `FormatPackageMessageBuffer` to avoid memory copy (#174)
When reading upstream, fall back to old rpc response to handle older instances.
-rw-r--r--CHANGELOG.md3
-rw-r--r--zenhttp/httpshared.cpp11
-rw-r--r--zenhttp/include/zenhttp/httpshared.h2
-rw-r--r--zenserver-test/zenserver-test.cpp21
-rw-r--r--zenserver/cache/structuredcache.cpp36
-rw-r--r--zenserver/upstream/upstreamcache.cpp32
6 files changed, 60 insertions, 45 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 57b655c00..9ae0116d9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,7 +1,10 @@
##
+- Change: All RPC responses are now formatted using dedicated wire format, Zen server has fallback to enable compatability with legacy upstreams
- Feature: Adding a `.json` extension to the `--abslog` option will make zenserver log in json format to file
- Feature: Create release in Sentry and use `sentry_options_set_release` to associate the executable
- Bugfix: CompactBinary: Fixed LoadCompactBinary to gracefully handle read failures and sizes larger than the archive. From https://p4-swarm.epicgames.net/changes/21983905
+- Improvement: Logging: don't do formatting of messages the will not be logged
+- Improvement: Logging: Timing and upstream source information in upstream logging when debug level logging is enabled
## v0.1.5
- Bugfix: Don't fail entire request if GetCacheValue from Horde fails for a single value
diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp
index 769b5e3f3..2b96f32fc 100644
--- a/zenhttp/httpshared.cpp
+++ b/zenhttp/httpshared.cpp
@@ -339,6 +339,17 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
return Package;
}
+bool
+ParsePackageMessageWithLegacyFallback(const IoBuffer& Response, CbPackage& OutPackage)
+{
+ if (IsPackageMessage(Response))
+ {
+ OutPackage = ParsePackageMessage(Response);
+ return true;
+ }
+ return OutPackage.TryLoad(Response);
+}
+
CbPackageReader::CbPackageReader() : m_CreateBuffer([](const IoHash&, uint64_t Size) -> IoBuffer { return IoBuffer{Size}; })
{
}
diff --git a/zenhttp/include/zenhttp/httpshared.h b/zenhttp/include/zenhttp/httpshared.h
index 24ce0c85a..0265d8d1e 100644
--- a/zenhttp/include/zenhttp/httpshared.h
+++ b/zenhttp/include/zenhttp/httpshared.h
@@ -93,6 +93,8 @@ CbPackage ParsePackageMessage(
});
bool IsPackageMessage(IoBuffer Payload);
+bool ParsePackageMessageWithLegacyFallback(const IoBuffer& Response, CbPackage& OutPackage);
+
std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data);
CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data);
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 0c519dc7e..61c1e2df1 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -1389,8 +1389,8 @@ TEST_CASE("zcache.rpc")
if (Result.status_code == 200)
{
- CbPackage Response;
- if (Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())))
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ if (!Response.IsNull())
{
OutResult.Response = std::move(Response);
CHECK(OutResult.Result.Parse(OutResult.Response));
@@ -1651,8 +1651,8 @@ TEST_CASE("zcache.failing.upstream")
if (Result.status_code == 200)
{
- CbPackage Response;
- if (Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())))
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ if (!Response.IsNull())
{
OutResult.Response = std::move(Response);
CHECK(OutResult.Result.Parse(OutResult.Response));
@@ -2087,8 +2087,8 @@ TEST_CASE("zcache.rpc.allpolicies")
cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
CHECK_MESSAGE(Result.status_code == 200, "GetCacheRecords unexpectedly failed.");
- CbPackage Response;
- bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ bool Loaded = !Response.IsNull();
CHECK_MESSAGE(Loaded, "GetCacheRecords response failed to load.");
cacherequests::GetCacheRecordsResult RequestResult;
CHECK(RequestResult.Parse(Response));
@@ -2166,8 +2166,9 @@ TEST_CASE("zcache.rpc.allpolicies")
cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
CHECK_MESSAGE(Result.status_code == 200, "GetCacheValues unexpectedly failed.");
- CbPackage Response;
- bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ IoBuffer MessageBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+ CbPackage Response = ParsePackageMessage(MessageBuffer);
+ bool Loaded = !Response.IsNull();
CHECK_MESSAGE(Loaded, "GetCacheValues response failed to load.");
cacherequests::GetCacheValuesResult GetCacheValuesResult;
CHECK(GetCacheValuesResult.Parse(Response));
@@ -2234,8 +2235,8 @@ TEST_CASE("zcache.rpc.allpolicies")
cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
CHECK_MESSAGE(Result.status_code == 200, "GetCacheChunks unexpectedly failed.");
- CbPackage Response;
- bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ bool Loaded = !Response.IsNull();
CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load.");
cacherequests::GetCacheChunksResult GetCacheChunksResult;
CHECK(GetCacheChunksResult.Parse(Response));
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index cd679d186..75fb8ac41 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -1211,12 +1211,9 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
- BinaryWriter MemStream;
- RpcResponse.Save(MemStream);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
- Request.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
HttpStructuredCacheService::PutResult
@@ -1635,12 +1632,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
ResponseObject.EndArray();
ResponsePackage.SetObject(ResponseObject.Save());
- BinaryWriter MemStream;
- ResponsePackage.Save(MemStream);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage);
- HttpRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
void
@@ -1748,12 +1742,9 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
- BinaryWriter MemStream;
- RpcResponse.Save(MemStream);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
- Request.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
void
@@ -1943,13 +1934,9 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
ResponseObject.EndArray();
RpcResponse.SetObject(ResponseObject.Save());
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
- BinaryWriter MemStream;
- RpcResponse.Save(MemStream);
-
- HttpRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
namespace cache::detail {
@@ -2451,12 +2438,9 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view
RpcResponse.SetObject(Writer.Save());
- BinaryWriter MemStream;
- RpcResponse.Save(MemStream);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
- HttpRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
void
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index c1c0395e7..610946385 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -15,6 +15,8 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zenhttp/httpshared.h>
+
#include <zenstore/cidstore.h>
#include <auth/authmgr.h>
@@ -887,7 +889,6 @@ namespace detail {
}
BatchRequest.EndObject();
- CbPackage BatchResponse;
ZenCacheResult Result;
{
@@ -899,12 +900,13 @@ namespace detail {
if (Result.Success)
{
- if (BatchResponse.TryLoad(Result.Response))
+ CbPackage BatchResponse;
+ if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse))
{
CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView();
if (Results.Num() != Requests.size())
{
- ZEN_WARN("Upstream::Zen::GetCacheRecords invalid number of Requests from Upstream.");
+ ZEN_WARN("Upstream::Zen::GetCacheRecords invalid number of Response results from Upstream.");
}
else
{
@@ -921,6 +923,10 @@ namespace detail {
return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
}
}
+ else
+ {
+ ZEN_WARN("Upstream::Zen::GetCacheRecords invalid Response from Upstream.");
+ }
}
for (CacheKeyRequest* Request : Requests)
@@ -1004,7 +1010,6 @@ namespace detail {
}
BatchRequest.EndObject();
- CbPackage BatchResponse;
ZenCacheResult Result;
{
@@ -1016,12 +1021,13 @@ namespace detail {
if (Result.Success)
{
- if (BatchResponse.TryLoad(Result.Response))
+ CbPackage BatchResponse;
+ if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse))
{
CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView();
if (CacheValueRequests.size() != Results.Num())
{
- ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream.");
+ ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Response results from Upstream.");
}
else
{
@@ -1068,6 +1074,10 @@ namespace detail {
return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
}
}
+ else
+ {
+ ZEN_WARN("Upstream::Zen::GetCacheValues invalid Response from Upstream.");
+ }
}
for (CacheValueRequest* RequestPtr : CacheValueRequests)
@@ -1135,7 +1145,6 @@ namespace detail {
}
BatchRequest.EndObject();
- CbPackage BatchResponse;
ZenCacheResult Result;
{
@@ -1147,12 +1156,13 @@ namespace detail {
if (Result.Success)
{
- if (BatchResponse.TryLoad(Result.Response))
+ CbPackage BatchResponse;
+ if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse))
{
CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView();
if (CacheChunkRequests.size() != Results.Num())
{
- ZEN_WARN("Upstream::Zen::GetCacheChunks invalid number of Requests from Upstream.");
+ ZEN_WARN("Upstream::Zen::GetCacheChunks invalid number of Response results from Upstream.");
}
else
{
@@ -1199,6 +1209,10 @@ namespace detail {
return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
}
}
+ else
+ {
+ ZEN_WARN("Upstream::Zen::GetCacheChunks invalid Response from Upstream.");
+ }
}
for (CacheChunkRequest* RequestPtr : CacheChunkRequests)