From 10c141fece26f9946595028afb069cbee1502067 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 7 Dec 2022 10:55:57 +0100 Subject: Cache request record/replay (#198) This adds recording and playback of cache request with full data - both get and put operations can be replayed. Invoke via web request. `/z$/exec$/start-recording?` `/z$/exec$/stop-recording` `/z$/exec$/replay-recording?&` --- zenserver/cache/structuredcache.cpp | 570 ++++++++++++++++++++++++------------ 1 file changed, 385 insertions(+), 185 deletions(-) (limited to 'zenserver/cache/structuredcache.cpp') diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 0df7472ac..baaf94dd0 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -14,8 +14,10 @@ #include #include #include +#include #include #include +#include #include //#include "cachekey.h" @@ -44,6 +46,183 @@ namespace zen { using namespace std::literals; +namespace cache::detail { + struct RecordedRequest + { + uint64_t Offset; + uint64_t Length; + ZenContentType ContentType; + }; + + const uint64_t RecordedRequestBlockSize = 1ull << 31u; + + struct RecordedRequestsWriter + { + void BeginWrite(const std::filesystem::path& BasePath) + { + m_BasePath = BasePath; + std::filesystem::create_directories(m_BasePath); + } + + void EndWrite() + { + RwLock::ExclusiveLockScope _(m_Lock); + m_BlockFiles.clear(); + + IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); + std::error_code Ec; + IndexFile.WriteAll(IndexBuffer, Ec); + IndexFile.Close(); + m_Entries.clear(); + } + + uint64_t WriteRequest(ZenContentType ContentType, const IoBuffer& RequestBuffer) + { + RwLock::ExclusiveLockScope Lock(m_Lock); + uint64_t RequestIndex = m_Entries.size(); + RecordedRequest& Entry = + m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType}); + if (Entry.Length < 1 * 1024 * 1024) + { + uint32_t BlockIndex = gsl::narrow((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + if (BlockIndex == m_BlockFiles.size()) + { + std::unique_ptr& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique()); + NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); + m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; + } + ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); + BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); + ZEN_ASSERT(BlockFile != nullptr); + + Entry.Offset = m_ChunkOffset; + m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); + Lock.ReleaseNow(); + + std::error_code Ec; + BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); + if (Ec) + { + Entry.Length = 0; + return ~0ull; + } + return RequestIndex; + } + Lock.ReleaseNow(); + + BasicFile RequestFile; + RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); + std::error_code Ec; + RequestFile.WriteAll(RequestBuffer, Ec); + if (Ec) + { + Entry.Length = 0; + return ~0ull; + } + return RequestIndex; + } + + std::filesystem::path m_BasePath; + mutable RwLock m_Lock; + std::vector m_Entries; + std::vector> m_BlockFiles; + uint64_t m_ChunkOffset; + }; + + struct RecordedRequestsReader + { + uint64_t BeginRead(const std::filesystem::path& BasePath) + { + m_BasePath = BasePath; + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); + m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); + IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); + uint64_t MaxChunkPosition = 0; + for (const RecordedRequest& R : m_Entries) + { + if (R.Offset != ~0ull) + { + MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); + } + } + uint32_t BlockCount = gsl::narrow(MaxChunkPosition / RecordedRequestBlockSize) + 1; + m_IoBuffers.resize(BlockCount); + for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) + { + m_IoBuffers[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); + } + return m_Entries.size(); + } + void EndRead() { m_IoBuffers.clear(); } + + ZenContentType ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const + { + if (RequestIndex >= m_Entries.size()) + { + return ZenContentType::kUnknownContentType; + } + const RecordedRequest& Entry = m_Entries[RequestIndex]; + if (Entry.Length == 0) + { + return ZenContentType::kUnknownContentType; + } + if (Entry.Offset != ~0ull) + { + uint32_t BlockIndex = gsl::narrow((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); + uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); + OutBuffer = IoBuffer(m_IoBuffers[BlockIndex], ChunkOffset, Entry.Length); + return Entry.ContentType; + } + OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); + return Entry.ContentType; + } + + std::filesystem::path m_BasePath; + std::vector m_Entries; + std::vector> m_BlockFiles; + std::vector m_IoBuffers; + }; + + class DiskRequestRecorder : public IRequestRecorder + { + public: + DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } + virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } + + private: + virtual uint64_t IRequestRecorder_RecordRequest(ZenContentType ContentType, const IoBuffer& RequestBuffer) override + { + return m_RecordedRequests.WriteRequest(ContentType, RequestBuffer); + } + virtual void IRequestRecorder_RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} + virtual void IRequestRecorder_RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} + RecordedRequestsWriter m_RecordedRequests; + }; + + class DiskRequestReplayer : public IRequestReplayer + { + public: + DiskRequestReplayer(const std::filesystem::path& BasePath) { m_RequestCount = m_RequestBuffer.BeginRead(BasePath); } + virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } + + private: + virtual uint64_t IRequestReplayer_GetRequestCount() const override { return m_RequestCount; } + + virtual ZenContentType IRequestReplayer_GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override + { + return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); + } + virtual ZenContentType IRequestRecorder_GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } + + std::uint64_t m_RequestCount; + RecordedRequestsReader m_RequestBuffer; + }; + +} // namespace cache::detail + ////////////////////////////////////////////////////////////////////////// CachePolicy @@ -77,7 +256,10 @@ struct PutRequestData }; namespace { - static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv; + static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv; + static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv; + static constinit std::string_view HttpZCacheUtilStopRecording = "exec$/stop-recording"sv; + static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv; struct HttpRequestData { @@ -326,6 +508,7 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheS HttpStructuredCacheService::~HttpStructuredCacheService() { ZEN_INFO("closing structured cache"); + m_RequestRecorder.reset(); m_StatsService.UnregisterHandler("z$", *this); m_StatusService.UnregisterHandler("z$", *this); @@ -367,6 +550,40 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) return HandleRpcRequest(Request); } + if (Key == HttpZCacheUtilStartRecording) + { + m_RequestRecorder.reset(); + HttpServerRequest::QueryParams Params = Request.GetQueryParams(); + std::string_view RecordPath = Params.GetValue("path"); + m_RequestRecorder = std::make_unique(RecordPath); + Request.WriteResponse(HttpResponseCode::OK); + return; + } + if (Key == HttpZCacheUtilStopRecording) + { + m_RequestRecorder.reset(); + Request.WriteResponse(HttpResponseCode::OK); + return; + } + if (Key == HttpZCacheUtilReplayRecording) + { + m_RequestRecorder.reset(); + HttpServerRequest::QueryParams Params = Request.GetQueryParams(); + std::string_view RecordPath = Params.GetValue("path"); + uint32_t ThreadCount = std::thread::hardware_concurrency(); + if (auto Param = Params.GetValue("thread_count"); Param.empty() == false) + { + if (auto Value = ParseInt(Param)) + { + ThreadCount = gsl::narrow(Value.value()); + } + } + cache::detail::DiskRequestReplayer Replayer(RecordPath); + ReplayRequestRecorder(Replayer, ThreadCount < 1 ? 1 : ThreadCount); + Request.WriteResponse(HttpResponseCode::OK); + return; + } + HttpRequestData RequestData; if (!HttpRequestParseRelativeUri(Key, RequestData)) { @@ -1126,6 +1343,101 @@ HttpStructuredCacheService::HandlePutCacheChunk(zen::HttpServerRequest& Request, Request.WriteResponse(ResponseCode); } +CbPackage +HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, + IoBuffer&& Body, + uint32_t& OutAcceptMagic, + RpcAcceptOptions& OutAcceptFlags) +{ + CbPackage Package; + CbObjectView Object; + CbObject ObjectBuffer; + if (ContentType == ZenContentType::kCbObject) + { + ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body)); + Object = ObjectBuffer; + } + else + { + Package = ParsePackageMessage(Body); + Object = Package.GetObject(); + } + OutAcceptMagic = Object["Accept"sv].AsUInt32(); + OutAcceptFlags = static_cast(Object["AcceptFlags"sv].AsUInt16(0u)); + + const std::string_view Method = Object["Method"sv].AsString(); + + if (Method == "PutCacheRecords"sv) + { + return HandleRpcPutCacheRecords(Package); + } + else if (Method == "GetCacheRecords"sv) + { + return HandleRpcGetCacheRecords(Object); + } + else if (Method == "PutCacheValues"sv) + { + return HandleRpcPutCacheValues(Package); + } + else if (Method == "GetCacheValues"sv) + { + return HandleRpcGetCacheValues(Object); + } + else if (Method == "GetCacheChunks"sv) + { + return HandleRpcGetCacheChunks(Object); + } + return CbPackage{}; +} + +void +HttpStructuredCacheService::ReplayRequestRecorder(cache::detail::IRequestReplayer& Replayer, uint32_t ThreadCount) +{ + WorkerThreadPool WorkerPool(ThreadCount); + uint64_t RequestCount = Replayer.IRequestReplayer_GetRequestCount(); + Stopwatch Timer; + auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); + Latch JobLatch(RequestCount); + ZEN_INFO("Replaying {} requests", RequestCount); + for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) + { + WorkerPool.ScheduleWork([this, &JobLatch, &Replayer, RequestIndex]() { + IoBuffer Body; + ZenContentType ContentType = Replayer.IRequestReplayer_GetRequest(RequestIndex, Body); + if (Body) + { + uint32_t AcceptMagic = 0; + RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; + CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags); + if (AcceptMagic == kCbPkgMagic) + { + bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer( + RpcResult, + AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences + : FormatFlags::kDefault); + ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); + } + else + { + BinaryWriter MemStream; + RpcResult.Save(MemStream); + IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); + ZEN_ASSERT(RpcResponseBuffer.Size() > 0); + } + } + JobLatch.CountDown(); + }); + } + while (!JobLatch.Wait(10000)) + { + ZEN_INFO("Replayed {} of {} requests, elapsed {}", + RequestCount - JobLatch.Remaining(), + RequestCount, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + } +} + void HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) { @@ -1143,43 +1455,48 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) } Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable { - CbPackage Package; - CbObjectView Object; - CbObject ObjectBuffer; - if (ContentType == HttpContentType::kCbObject) - { - ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body)); - Object = ObjectBuffer; - } - else - { - Package = ParsePackageMessage(Body); - Object = Package.GetObject(); - } - const std::string_view Method = Object["Method"sv].AsString(); - if (Method == "PutCacheRecords"sv) - { - HandleRpcPutCacheRecords(AsyncRequest, Package); - } - else if (Method == "GetCacheRecords"sv) - { - HandleRpcGetCacheRecords(AsyncRequest, Object); - } - else if (Method == "PutCacheValues"sv) - { - HandleRpcPutCacheValues(AsyncRequest, Package); - } - else if (Method == "GetCacheValues"sv) + std::uint64_t RequestIndex = + m_RequestRecorder ? m_RequestRecorder->IRequestRecorder_RecordRequest(ContentType, Body) : ~0ull; + uint32_t AcceptMagic = 0; + RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; + CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags); + if (RpcResult.IsNull()) { - HandleRpcGetCacheValues(AsyncRequest, Object); + AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + return; } - else if (Method == "GetCacheChunks"sv) + if (AcceptMagic == kCbPkgMagic) { - HandleRpcGetCacheChunks(AsyncRequest, Object); + bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer( + RpcResult, + AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences + : FormatFlags::kDefault); + if (RequestIndex != ~0ull) + { + ZEN_ASSERT(m_RequestRecorder); + m_RequestRecorder->IRequestRecorder_RecordResponse(RequestIndex, + HttpContentType::kCbPackage, + RpcResponseBuffer); + } + AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } else { - AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + BinaryWriter MemStream; + RpcResult.Save(MemStream); + + if (RequestIndex != ~0ull) + { + ZEN_ASSERT(m_RequestRecorder); + m_RequestRecorder->IRequestRecorder_RecordResponse( + RequestIndex, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } + AsyncRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } }); } @@ -1190,15 +1507,13 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) } } -void -HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest) +CbPackage +HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest) { ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); CbObjectView BatchObject = BatchRequest.GetObject(); ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); - uint32_t AcceptMagic = BatchObject["Accept"sv].AsUInt32(); - CbObjectView Params = BatchObject["Params"sv].AsObjectView(); CachePolicy DefaultPolicy; @@ -1206,7 +1521,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req std::optional Namespace = GetRpcRequestNamespace(Params); if (!Namespace) { - return Request.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; std::vector Results; @@ -1219,7 +1534,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req CacheKey Key; if (!GetRpcRequestCacheKey(KeyView, Key)) { - return Request.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); PutRequestData PutRequest{*Namespace, std::move(Key), RecordObject, std::move(Policy)}; @@ -1228,13 +1543,13 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req if (Result == PutResult::Invalid) { - return Request.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } Results.push_back(Result == PutResult::Success); } if (Results.empty()) { - return Request.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } CbObjectWriter ResponseObject; @@ -1247,21 +1562,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); - - if (AcceptMagic == kCbPkgMagic) - { - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse, FormatFlags::kDefault); - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else - { - BinaryWriter MemStream; - RpcResponse.Save(MemStream); - - Request.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } + return RpcResponse; } HttpStructuredCacheService::PutResult @@ -1354,17 +1655,13 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack return PutResult::Success; } -void -HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) +CbPackage +HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) { ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); - uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32(); - RpcAcceptOptions AcceptFlags = static_cast(RpcRequest["AcceptFlags"sv].AsUInt16(0u)); - bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences); - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); struct ValueRequestData @@ -1393,7 +1690,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt std::optional Namespace = GetRpcRequestNamespace(Params); if (!Namespace) { - return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } std::vector Requests; std::vector UpstreamIndexes; @@ -1427,7 +1724,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt CacheKey& Key = Request.Upstream.Key; if (!GetRpcRequestCacheKey(KeyObject, Key)) { - return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); @@ -1520,7 +1817,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } if (Requests.empty()) { - return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } if (!UpstreamIndexes.empty()) @@ -1693,42 +1990,21 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } ResponseObject.EndArray(); ResponsePackage.SetObject(ResponseObject.Save()); - - if (AcceptMagic == kCbPkgMagic) - { - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer( - ResponsePackage, - AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault); - HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else - { - BinaryWriter MemStream; - ResponsePackage.Save(MemStream); - - HttpRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } + return ResponsePackage; } -void -HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest) +CbPackage +HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchRequest) { - ZEN_TRACE_CPU("Z$::RpcPutCacheValues"); CbObjectView BatchObject = BatchRequest.GetObject(); - ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv); - - uint32_t AcceptMagic = BatchObject["Accept"sv].AsUInt32(); - - CbObjectView Params = BatchObject["Params"sv].AsObjectView(); + CbObjectView Params = BatchObject["Params"sv].AsObjectView(); std::string_view PolicyText = Params["DefaultPolicy"].AsString(); CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; std::optional Namespace = GetRpcRequestNamespace(Params); if (!Namespace) { - return Request.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } std::vector Results; for (CbFieldView RequestField : Params["Requests"sv]) @@ -1741,7 +2017,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ CacheKey Key; if (!GetRpcRequestCacheKey(KeyView, Key)) { - return Request.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } PolicyText = RequestObject["Policy"sv].AsString(); @@ -1774,7 +2050,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ else { ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash); - return Request.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } } else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) @@ -1804,7 +2080,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ } if (Results.empty()) { - return Request.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } CbObjectWriter ResponseObject; @@ -1818,40 +2094,21 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); - if (AcceptMagic == kCbPkgMagic) - { - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse, FormatFlags::kDefault); - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else - { - BinaryWriter MemStream; - RpcResponse.Save(MemStream); - - Request.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } + return RpcResponse; } -void -HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) +CbPackage +HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) { - ZEN_TRACE_CPU("Z$::RpcGetCacheValues"); - ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); - uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32(); - RpcAcceptOptions AcceptFlags = static_cast(RpcRequest["AcceptFlags"sv].AsUInt16(0u)); - bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences); - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; std::optional Namespace = GetRpcRequestNamespace(Params); if (!Namespace) { - return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } struct RequestData @@ -1876,7 +2133,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http if (!GetRpcRequestCacheKey(KeyObject, Request.Key)) { - return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } PolicyText = RequestObject["Policy"sv].AsString(); @@ -1991,7 +2248,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http if (Requests.empty()) { - return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } CbPackage RpcResponse; @@ -2026,23 +2283,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http ResponseObject.EndArray(); RpcResponse.SetObject(ResponseObject.Save()); - - if (AcceptMagic == kCbPkgMagic) - { - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer( - RpcResponse, - AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault); - HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else - { - BinaryWriter MemStream; - RpcResponse.Save(MemStream); - - HttpRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } + return RpcResponse; } namespace cache::detail { @@ -2081,13 +2322,11 @@ namespace cache::detail { } // namespace cache::detail -void -HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) +CbPackage +HttpStructuredCacheService::HandleRpcGetCacheChunks(CbObjectView RpcRequest) { using namespace cache::detail; - ZEN_TRACE_CPU("Z$::RpcGetCacheChunks"); - std::string Namespace; std::vector RecordKeys; // Data about a Record necessary to identify it to the upstream std::vector Records; // Scratch-space data about a Record when fulfilling RecordRequests @@ -2096,22 +2335,11 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http std::vector RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key std::vector ValueRequests; // The ChunkRequests that are requesting a Value Key std::vector UpstreamChunks; // ChunkRequests that we need to send to the upstream - uint32_t AcceptMagic = 0; - uint16_t AcceptFlags = 0; // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests - if (!ParseGetCacheChunksRequest(AcceptMagic, - AcceptFlags, - Namespace, - RecordKeys, - Records, - RequestKeys, - Requests, - RecordRequests, - ValueRequests, - RpcRequest)) + if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) { - return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + return CbPackage{}; } // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we @@ -2125,13 +2353,11 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests); // Send the payload and descriptive data about each chunk to the client - WriteGetCacheChunksResponse(AcceptMagic, AcceptFlags, Namespace, Requests, HttpRequest); + return WriteGetCacheChunksResponse(Namespace, Requests); } bool -HttpStructuredCacheService::ParseGetCacheChunksRequest(uint32_t& AcceptMagic, - uint16_t& AcceptFlags, - std::string& Namespace, +HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace, std::vector& RecordKeys, std::vector& Records, std::vector& RequestKeys, @@ -2144,9 +2370,6 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(uint32_t& Accept ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); - AcceptMagic = RpcRequest["Accept"sv].AsUInt32(); - AcceptFlags = RpcRequest["AcceptFlags"sv].AsUInt16(0u); - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; @@ -2496,18 +2719,11 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names } } -void -HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t AcceptMagic, - uint16_t AcceptFlags, - std::string_view Namespace, - std::vector& Requests, - zen::HttpServerRequest& HttpRequest) +CbPackage +HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, std::vector& Requests) { using namespace cache::detail; - RpcAcceptOptions AcceptOptions = static_cast(AcceptFlags); - bool AllowFileReferences = EnumHasAllFlags(AcceptOptions, RpcAcceptOptions::kAllowLocalReferences); - CbPackage RpcResponse; CbObjectWriter Writer; @@ -2564,23 +2780,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t Accept Writer.EndArray(); RpcResponse.SetObject(Writer.Save()); - - if (AcceptMagic == kCbPkgMagic) - { - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer( - RpcResponse, - AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault); - HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else - { - BinaryWriter MemStream; - RpcResponse.Save(MemStream); - - HttpRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } + return RpcResponse; } void -- cgit v1.2.3