aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp570
1 files changed, 385 insertions, 185 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 0df7472ac..baaf94dd0 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -14,8 +14,10 @@
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
#include <zenhttp/httpserver.h>
#include <zenhttp/httpshared.h>
+#include <zenutil/basicfile.h>
#include <zenutil/cache/cache.h>
//#include "cachekey.h"
@@ -44,6 +46,183 @@ namespace zen {
using namespace std::literals;
+namespace cache::detail {
+ struct RecordedRequest
+ {
+ uint64_t Offset;
+ uint64_t Length;
+ ZenContentType ContentType;
+ };
+
+ const uint64_t RecordedRequestBlockSize = 1ull << 31u;
+
+ struct RecordedRequestsWriter
+ {
+ void BeginWrite(const std::filesystem::path& BasePath)
+ {
+ m_BasePath = BasePath;
+ std::filesystem::create_directories(m_BasePath);
+ }
+
+ void EndWrite()
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_BlockFiles.clear();
+
+ IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ IndexFile.WriteAll(IndexBuffer, Ec);
+ IndexFile.Close();
+ m_Entries.clear();
+ }
+
+ uint64_t WriteRequest(ZenContentType ContentType, const IoBuffer& RequestBuffer)
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ uint64_t RequestIndex = m_Entries.size();
+ RecordedRequest& Entry =
+ m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType});
+ if (Entry.Length < 1 * 1024 * 1024)
+ {
+ uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+ if (BlockIndex == m_BlockFiles.size())
+ {
+ std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
+ NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
+ m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
+ }
+ ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
+ BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
+ ZEN_ASSERT(BlockFile != nullptr);
+
+ Entry.Offset = m_ChunkOffset;
+ m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u);
+ Lock.ReleaseNow();
+
+ std::error_code Ec;
+ BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+ return RequestIndex;
+ }
+ Lock.ReleaseNow();
+
+ BasicFile RequestFile;
+ RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ RequestFile.WriteAll(RequestBuffer, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+ return RequestIndex;
+ }
+
+ std::filesystem::path m_BasePath;
+ mutable RwLock m_Lock;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
+ uint64_t m_ChunkOffset;
+ };
+
+ struct RecordedRequestsReader
+ {
+ uint64_t BeginRead(const std::filesystem::path& BasePath)
+ {
+ m_BasePath = BasePath;
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead);
+ m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest));
+ IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0);
+ uint64_t MaxChunkPosition = 0;
+ for (const RecordedRequest& R : m_Entries)
+ {
+ if (R.Offset != ~0ull)
+ {
+ MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length);
+ }
+ }
+ uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
+ m_IoBuffers.resize(BlockCount);
+ for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex)
+ {
+ m_IoBuffers[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex));
+ }
+ return m_Entries.size();
+ }
+ void EndRead() { m_IoBuffers.clear(); }
+
+ ZenContentType ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+ {
+ if (RequestIndex >= m_Entries.size())
+ {
+ return ZenContentType::kUnknownContentType;
+ }
+ const RecordedRequest& Entry = m_Entries[RequestIndex];
+ if (Entry.Length == 0)
+ {
+ return ZenContentType::kUnknownContentType;
+ }
+ if (Entry.Offset != ~0ull)
+ {
+ uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
+ uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
+ OutBuffer = IoBuffer(m_IoBuffers[BlockIndex], ChunkOffset, Entry.Length);
+ return Entry.ContentType;
+ }
+ OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
+ return Entry.ContentType;
+ }
+
+ std::filesystem::path m_BasePath;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
+ std::vector<IoBuffer> m_IoBuffers;
+ };
+
+ class DiskRequestRecorder : public IRequestRecorder
+ {
+ public:
+ DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
+ virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
+
+ private:
+ virtual uint64_t IRequestRecorder_RecordRequest(ZenContentType ContentType, const IoBuffer& RequestBuffer) override
+ {
+ return m_RecordedRequests.WriteRequest(ContentType, RequestBuffer);
+ }
+ virtual void IRequestRecorder_RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
+ virtual void IRequestRecorder_RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+ RecordedRequestsWriter m_RecordedRequests;
+ };
+
+ class DiskRequestReplayer : public IRequestReplayer
+ {
+ public:
+ DiskRequestReplayer(const std::filesystem::path& BasePath) { m_RequestCount = m_RequestBuffer.BeginRead(BasePath); }
+ virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
+
+ private:
+ virtual uint64_t IRequestReplayer_GetRequestCount() const override { return m_RequestCount; }
+
+ virtual ZenContentType IRequestReplayer_GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
+ {
+ return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
+ }
+ virtual ZenContentType IRequestRecorder_GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
+
+ std::uint64_t m_RequestCount;
+ RecordedRequestsReader m_RequestBuffer;
+ };
+
+} // namespace cache::detail
+
//////////////////////////////////////////////////////////////////////////
CachePolicy
@@ -77,7 +256,10 @@ struct PutRequestData
};
namespace {
- static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv;
+ static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv;
+ static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv;
+ static constinit std::string_view HttpZCacheUtilStopRecording = "exec$/stop-recording"sv;
+ static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv;
struct HttpRequestData
{
@@ -326,6 +508,7 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheS
HttpStructuredCacheService::~HttpStructuredCacheService()
{
ZEN_INFO("closing structured cache");
+ m_RequestRecorder.reset();
m_StatsService.UnregisterHandler("z$", *this);
m_StatusService.UnregisterHandler("z$", *this);
@@ -367,6 +550,40 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
return HandleRpcRequest(Request);
}
+ if (Key == HttpZCacheUtilStartRecording)
+ {
+ m_RequestRecorder.reset();
+ HttpServerRequest::QueryParams Params = Request.GetQueryParams();
+ std::string_view RecordPath = Params.GetValue("path");
+ m_RequestRecorder = std::make_unique<cache::detail::DiskRequestRecorder>(RecordPath);
+ Request.WriteResponse(HttpResponseCode::OK);
+ return;
+ }
+ if (Key == HttpZCacheUtilStopRecording)
+ {
+ m_RequestRecorder.reset();
+ Request.WriteResponse(HttpResponseCode::OK);
+ return;
+ }
+ if (Key == HttpZCacheUtilReplayRecording)
+ {
+ m_RequestRecorder.reset();
+ HttpServerRequest::QueryParams Params = Request.GetQueryParams();
+ std::string_view RecordPath = Params.GetValue("path");
+ uint32_t ThreadCount = std::thread::hardware_concurrency();
+ if (auto Param = Params.GetValue("thread_count"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<uint64_t>(Param))
+ {
+ ThreadCount = gsl::narrow<uint32_t>(Value.value());
+ }
+ }
+ cache::detail::DiskRequestReplayer Replayer(RecordPath);
+ ReplayRequestRecorder(Replayer, ThreadCount < 1 ? 1 : ThreadCount);
+ Request.WriteResponse(HttpResponseCode::OK);
+ return;
+ }
+
HttpRequestData RequestData;
if (!HttpRequestParseRelativeUri(Key, RequestData))
{
@@ -1126,6 +1343,101 @@ HttpStructuredCacheService::HandlePutCacheChunk(zen::HttpServerRequest& Request,
Request.WriteResponse(ResponseCode);
}
+CbPackage
+HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
+ IoBuffer&& Body,
+ uint32_t& OutAcceptMagic,
+ RpcAcceptOptions& OutAcceptFlags)
+{
+ CbPackage Package;
+ CbObjectView Object;
+ CbObject ObjectBuffer;
+ if (ContentType == ZenContentType::kCbObject)
+ {
+ ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body));
+ Object = ObjectBuffer;
+ }
+ else
+ {
+ Package = ParsePackageMessage(Body);
+ Object = Package.GetObject();
+ }
+ OutAcceptMagic = Object["Accept"sv].AsUInt32();
+ OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u));
+
+ const std::string_view Method = Object["Method"sv].AsString();
+
+ if (Method == "PutCacheRecords"sv)
+ {
+ return HandleRpcPutCacheRecords(Package);
+ }
+ else if (Method == "GetCacheRecords"sv)
+ {
+ return HandleRpcGetCacheRecords(Object);
+ }
+ else if (Method == "PutCacheValues"sv)
+ {
+ return HandleRpcPutCacheValues(Package);
+ }
+ else if (Method == "GetCacheValues"sv)
+ {
+ return HandleRpcGetCacheValues(Object);
+ }
+ else if (Method == "GetCacheChunks"sv)
+ {
+ return HandleRpcGetCacheChunks(Object);
+ }
+ return CbPackage{};
+}
+
+void
+HttpStructuredCacheService::ReplayRequestRecorder(cache::detail::IRequestReplayer& Replayer, uint32_t ThreadCount)
+{
+ WorkerThreadPool WorkerPool(ThreadCount);
+ uint64_t RequestCount = Replayer.IRequestReplayer_GetRequestCount();
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
+ Latch JobLatch(RequestCount);
+ ZEN_INFO("Replaying {} requests", RequestCount);
+ for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
+ {
+ WorkerPool.ScheduleWork([this, &JobLatch, &Replayer, RequestIndex]() {
+ IoBuffer Body;
+ ZenContentType ContentType = Replayer.IRequestReplayer_GetRequest(RequestIndex, Body);
+ if (Body)
+ {
+ uint32_t AcceptMagic = 0;
+ RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
+ CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags);
+ if (AcceptMagic == kCbPkgMagic)
+ {
+ bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
+ RpcResult,
+ AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences
+ : FormatFlags::kDefault);
+ ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0);
+ }
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResult.Save(MemStream);
+ IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize());
+ ZEN_ASSERT(RpcResponseBuffer.Size() > 0);
+ }
+ }
+ JobLatch.CountDown();
+ });
+ }
+ while (!JobLatch.Wait(10000))
+ {
+ ZEN_INFO("Replayed {} of {} requests, elapsed {}",
+ RequestCount - JobLatch.Remaining(),
+ RequestCount,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ }
+}
+
void
HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
{
@@ -1143,43 +1455,48 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
}
Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable {
- CbPackage Package;
- CbObjectView Object;
- CbObject ObjectBuffer;
- if (ContentType == HttpContentType::kCbObject)
- {
- ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body));
- Object = ObjectBuffer;
- }
- else
- {
- Package = ParsePackageMessage(Body);
- Object = Package.GetObject();
- }
- const std::string_view Method = Object["Method"sv].AsString();
- if (Method == "PutCacheRecords"sv)
- {
- HandleRpcPutCacheRecords(AsyncRequest, Package);
- }
- else if (Method == "GetCacheRecords"sv)
- {
- HandleRpcGetCacheRecords(AsyncRequest, Object);
- }
- else if (Method == "PutCacheValues"sv)
- {
- HandleRpcPutCacheValues(AsyncRequest, Package);
- }
- else if (Method == "GetCacheValues"sv)
+ std::uint64_t RequestIndex =
+ m_RequestRecorder ? m_RequestRecorder->IRequestRecorder_RecordRequest(ContentType, Body) : ~0ull;
+ uint32_t AcceptMagic = 0;
+ RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
+ CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags);
+ if (RpcResult.IsNull())
{
- HandleRpcGetCacheValues(AsyncRequest, Object);
+ AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return;
}
- else if (Method == "GetCacheChunks"sv)
+ if (AcceptMagic == kCbPkgMagic)
{
- HandleRpcGetCacheChunks(AsyncRequest, Object);
+ bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
+ RpcResult,
+ AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences
+ : FormatFlags::kDefault);
+ if (RequestIndex != ~0ull)
+ {
+ ZEN_ASSERT(m_RequestRecorder);
+ m_RequestRecorder->IRequestRecorder_RecordResponse(RequestIndex,
+ HttpContentType::kCbPackage,
+ RpcResponseBuffer);
+ }
+ AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
else
{
- AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ BinaryWriter MemStream;
+ RpcResult.Save(MemStream);
+
+ if (RequestIndex != ~0ull)
+ {
+ ZEN_ASSERT(m_RequestRecorder);
+ m_RequestRecorder->IRequestRecorder_RecordResponse(
+ RequestIndex,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
+ AsyncRequest.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
}
});
}
@@ -1190,15 +1507,13 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
}
}
-void
-HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest)
+CbPackage
+HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest)
{
ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
CbObjectView BatchObject = BatchRequest.GetObject();
ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
- uint32_t AcceptMagic = BatchObject["Accept"sv].AsUInt32();
-
CbObjectView Params = BatchObject["Params"sv].AsObjectView();
CachePolicy DefaultPolicy;
@@ -1206,7 +1521,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
if (!Namespace)
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
std::vector<bool> Results;
@@ -1219,7 +1534,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
CacheKey Key;
if (!GetRpcRequestCacheKey(KeyView, Key))
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
PutRequestData PutRequest{*Namespace, std::move(Key), RecordObject, std::move(Policy)};
@@ -1228,13 +1543,13 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
if (Result == PutResult::Invalid)
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
Results.push_back(Result == PutResult::Success);
}
if (Results.empty())
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
CbObjectWriter ResponseObject;
@@ -1247,21 +1562,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
-
- if (AcceptMagic == kCbPkgMagic)
- {
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse, FormatFlags::kDefault);
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResponse.Save(MemStream);
-
- Request.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
+ return RpcResponse;
}
HttpStructuredCacheService::PutResult
@@ -1354,17 +1655,13 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
return PutResult::Success;
}
-void
-HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
+CbPackage
+HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
{
ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
- uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32();
- RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(RpcRequest["AcceptFlags"sv].AsUInt16(0u));
- bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
-
CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
struct ValueRequestData
@@ -1393,7 +1690,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
if (!Namespace)
{
- return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
std::vector<RecordRequestData> Requests;
std::vector<size_t> UpstreamIndexes;
@@ -1427,7 +1724,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
CacheKey& Key = Request.Upstream.Key;
if (!GetRpcRequestCacheKey(KeyObject, Key))
{
- return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
@@ -1520,7 +1817,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
}
if (Requests.empty())
{
- return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
if (!UpstreamIndexes.empty())
@@ -1693,42 +1990,21 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
}
ResponseObject.EndArray();
ResponsePackage.SetObject(ResponseObject.Save());
-
- if (AcceptMagic == kCbPkgMagic)
- {
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
- ResponsePackage,
- AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault);
- HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- ResponsePackage.Save(MemStream);
-
- HttpRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
+ return ResponsePackage;
}
-void
-HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest)
+CbPackage
+HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchRequest)
{
- ZEN_TRACE_CPU("Z$::RpcPutCacheValues");
CbObjectView BatchObject = BatchRequest.GetObject();
- ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv);
-
- uint32_t AcceptMagic = BatchObject["Accept"sv].AsUInt32();
-
- CbObjectView Params = BatchObject["Params"sv].AsObjectView();
+ CbObjectView Params = BatchObject["Params"sv].AsObjectView();
std::string_view PolicyText = Params["DefaultPolicy"].AsString();
CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
if (!Namespace)
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
std::vector<bool> Results;
for (CbFieldView RequestField : Params["Requests"sv])
@@ -1741,7 +2017,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
CacheKey Key;
if (!GetRpcRequestCacheKey(KeyView, Key))
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
PolicyText = RequestObject["Policy"sv].AsString();
@@ -1774,7 +2050,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
else
{
ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash);
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
}
else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
@@ -1804,7 +2080,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
}
if (Results.empty())
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
CbObjectWriter ResponseObject;
@@ -1818,40 +2094,21 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
- if (AcceptMagic == kCbPkgMagic)
- {
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse, FormatFlags::kDefault);
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResponse.Save(MemStream);
-
- Request.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
+ return RpcResponse;
}
-void
-HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
+CbPackage
+HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
{
- ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
-
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
- uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32();
- RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(RpcRequest["AcceptFlags"sv].AsUInt16(0u));
- bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
-
CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
if (!Namespace)
{
- return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
struct RequestData
@@ -1876,7 +2133,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
{
- return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
PolicyText = RequestObject["Policy"sv].AsString();
@@ -1991,7 +2248,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
if (Requests.empty())
{
- return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
CbPackage RpcResponse;
@@ -2026,23 +2283,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
ResponseObject.EndArray();
RpcResponse.SetObject(ResponseObject.Save());
-
- if (AcceptMagic == kCbPkgMagic)
- {
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
- RpcResponse,
- AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault);
- HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResponse.Save(MemStream);
-
- HttpRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
+ return RpcResponse;
}
namespace cache::detail {
@@ -2081,13 +2322,11 @@ namespace cache::detail {
} // namespace cache::detail
-void
-HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
+CbPackage
+HttpStructuredCacheService::HandleRpcGetCacheChunks(CbObjectView RpcRequest)
{
using namespace cache::detail;
- ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
-
std::string Namespace;
std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream
std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests
@@ -2096,22 +2335,11 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http
std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key
std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key
std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
- uint32_t AcceptMagic = 0;
- uint16_t AcceptFlags = 0;
// Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
- if (!ParseGetCacheChunksRequest(AcceptMagic,
- AcceptFlags,
- Namespace,
- RecordKeys,
- Records,
- RequestKeys,
- Requests,
- RecordRequests,
- ValueRequests,
- RpcRequest))
+ if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
{
- return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
// For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
@@ -2125,13 +2353,11 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http
GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests);
// Send the payload and descriptive data about each chunk to the client
- WriteGetCacheChunksResponse(AcceptMagic, AcceptFlags, Namespace, Requests, HttpRequest);
+ return WriteGetCacheChunksResponse(Namespace, Requests);
}
bool
-HttpStructuredCacheService::ParseGetCacheChunksRequest(uint32_t& AcceptMagic,
- uint16_t& AcceptFlags,
- std::string& Namespace,
+HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace,
std::vector<CacheKeyRequest>& RecordKeys,
std::vector<cache::detail::RecordBody>& Records,
std::vector<CacheChunkRequest>& RequestKeys,
@@ -2144,9 +2370,6 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(uint32_t& Accept
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
- AcceptMagic = RpcRequest["Accept"sv].AsUInt32();
- AcceptFlags = RpcRequest["AcceptFlags"sv].AsUInt16(0u);
-
CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
@@ -2496,18 +2719,11 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
}
}
-void
-HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t AcceptMagic,
- uint16_t AcceptFlags,
- std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest>& Requests,
- zen::HttpServerRequest& HttpRequest)
+CbPackage
+HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests)
{
using namespace cache::detail;
- RpcAcceptOptions AcceptOptions = static_cast<RpcAcceptOptions>(AcceptFlags);
- bool AllowFileReferences = EnumHasAllFlags(AcceptOptions, RpcAcceptOptions::kAllowLocalReferences);
-
CbPackage RpcResponse;
CbObjectWriter Writer;
@@ -2564,23 +2780,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t Accept
Writer.EndArray();
RpcResponse.SetObject(Writer.Save());
-
- if (AcceptMagic == kCbPkgMagic)
- {
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
- RpcResponse,
- AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault);
- HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResponse.Save(MemStream);
-
- HttpRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
+ return RpcResponse;
}
void